Skip to content

Commit

Permalink
DPL: support for Signposts in CCDB
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Feb 1, 2024
1 parent 393608c commit 627172c
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
#include "Framework/DataSpecUtils.h"
#include "CCDB/CcdbApi.h"
#include "CommonConstants/LHCConstants.h"
#include "Framework/Signpost.h"
#include <typeinfo>
#include <TError.h>
#include <TMemFile.h>
#include <functional>

O2_DECLARE_DYNAMIC_LOG(ccdb);

namespace o2::framework
{

Expand Down Expand Up @@ -181,8 +184,12 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
{
std::string ccdbMetadataPrefix = "ccdb-metadata-";
int objCnt = -1;
// We use the timeslice, so that we hook into the same interval as the rest of the
// callback.
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
for (auto& route : helper->routes) {
LOGP(debug, "Fetching object for route {}", DataSpecUtils::describe(route.matcher));
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data());
objCnt++;
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
Output output{concrete.origin, concrete.description, concrete.subSpec};
Expand All @@ -201,7 +208,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
} else if (isPrefix(ccdbMetadataPrefix, meta.name)) {
std::string key = meta.name.substr(ccdbMetadataPrefix.size());
auto value = meta.defaultValue.get<std::string>();
LOGP(debug, "Adding metadata {}: {} to the request", key, value);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
metadata[key] = value;
} else if (meta.name == "ccdb-query-rate") {
chRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
Expand All @@ -215,7 +222,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
checkValidity = true; // never skip check if the cache is empty
}

LOGP(debug, "checkValidity is {} for tfID {} of {}", checkValidity, timingInfo.tfCounter, path);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());

const auto& api = helper->getAPI(path);
if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
Expand All @@ -238,7 +245,7 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
LOGP(debug, "Caching {} for {} (DPL id {})", path, headers["ETag"], cacheId.value);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
continue;
}
if (v.size()) { // but should be overridden by fresh object
Expand All @@ -249,19 +256,20 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
LOGP(debug, "Caching {} for {} (DPL id {})", path, headers["ETag"], cacheId.value);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
continue;
}
}
// cached object is fine
auto cacheId = helper->mapURL2DPLCache[path];
LOGP(debug, "Reusing {} for {}", cacheId.value, path);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
helper->mapURL2UUID[path].cacheHit++;
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
// the outputBuffer was not used, can we destroy it?
}
O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
};

AlgorithmSpec CCDBHelpers::fetchFromCCDB()
Expand All @@ -276,6 +284,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
helper->queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
helper->queryPeriodFactor = checkMult > 0 ? checkMult : 1;
LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper->queryPeriodGlo, helper->queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper->queryPeriodFactor));
LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
auto remapString = options.get<std::string>("condition-remap");
ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str());
if (!result.error.empty()) {
Expand Down Expand Up @@ -322,6 +331,8 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
});

return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
static Long64_t orbitResetTime = -1;
static size_t lastTimeUsed = -1;
if (timingInfo.creation & DataProcessingHeader::DUMMY_CREATION_TIME_OFFSET) {
Expand All @@ -342,7 +353,8 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
} else {
checkValidity = true; // never skip check if the cache is empty
}
LOG(debug) << "checkValidity = " << checkValidity << " for TF " << timingInfo.timeslice;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tfID %d of %{public}s",
checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
Output output{"CTP", "OrbitReset", 0};
Long64_t newOrbitResetTime = orbitResetTime;
auto&& v = allocator.makeVector<char>(output);
Expand All @@ -363,7 +375,7 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
newOrbitResetTime = getOrbitResetTime(v);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
helper->mapURL2DPLCache[path] = cacheId;
LOGP(debug, "Caching {} for {} (DPL id {})", path, headers["ETag"], cacheId.value);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
} else if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
Expand All @@ -373,19 +385,19 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
newOrbitResetTime = getOrbitResetTime(v);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
helper->mapURL2DPLCache[path] = cacheId;
LOGP(debug, "Caching {} for {} (DPL id {})", path, headers["ETag"], cacheId.value);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
}
// cached object is fine
}
auto cacheId = helper->mapURL2DPLCache[path];
LOGP(debug, "Reusing {} for {}", cacheId.value, path);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
helper->mapURL2UUID[path].cacheHit++;
allocator.adoptFromCache(output, cacheId, header::gSerializationMethodNone);

if (newOrbitResetTime != orbitResetTime) {
LOGP(debug, "Orbit reset time now at {} (was {})", newOrbitResetTime, orbitResetTime);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
orbitResetTime = newOrbitResetTime;
dtc.orbitResetTimeMUS = orbitResetTime;
}
Expand All @@ -403,10 +415,11 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
timestamp = timingInfo.creation;
}
// Fetch the rest of the objects.
LOGP(debug, "Fetching objects. Run: {}. OrbitResetTime: {}, Creation: {}, Timestamp: {}, firstTForbit: {}",
dtc.runNumber, orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);

populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
}); });
}

Expand Down

0 comments on commit 627172c

Please sign in to comment.