diff --git a/Framework/AnalysisSupport/CMakeLists.txt b/Framework/AnalysisSupport/CMakeLists.txt index eb5706817704b..5fb1282469711 100644 --- a/Framework/AnalysisSupport/CMakeLists.txt +++ b/Framework/AnalysisSupport/CMakeLists.txt @@ -20,6 +20,7 @@ o2_add_library(FrameworkAnalysisSupport SOURCES src/Plugin.cxx src/DataInputDirector.cxx src/AODJAlienReaderHelpers.cxx + src/AODWriterHelpers.cxx PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS} ROOT::TreePlayer) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx new file mode 100644 index 0000000000000..fa10d4661f537 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -0,0 +1,414 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/AnalysisContext.h" +#include "Framework/ConfigContext.h" +#include "Framework/ControlService.h" +#include "AODWriterHelpers.h" +#include "Framework/OutputObjHeader.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/ProcessingContext.h" +#include "Framework/InitContext.h" +#include "Framework/CallbackService.h" +#include "Framework/AnalysisSupportHelpers.h" +#include "Framework/TableConsumer.h" +#include "Framework/DataOutputDirector.h" +#include "Framework/TableTreeHelpers.h" + +#include +#include +#include +#include +#include +#include + +namespace o2::framework::writers +{ + +struct InputObjectRoute { + std::string name; + uint32_t uniqueId; + std::string directory; + uint32_t taskHash; + OutputObjHandlingPolicy policy; + OutputObjSourceType sourceType; +}; + +struct InputObject { + TClass* kind = nullptr; + void* obj = nullptr; + std::string name; + int count = -1; +}; + +const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, + {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; + +AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); + int compressionLevel = 505; + if (ctx.options().hasOption("aod-writer-compression")) { + compressionLevel = ctx.options().get("aod-writer-compression"); + } + return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function { + LOGP(debug, "======== getGlobalAODSink::Init =========="); + + // find out if any table needs to be saved + bool hasOutputsToWrite = false; + for (auto& outobj : outputInputs) { + auto ds = dod->getDataOutputDescriptors(outobj); + if (ds.size() > 0) { + hasOutputsToWrite = true; + break; + } + } + + // if nothing needs to be saved then return a trivial functor + // this happens when nothing needs to be saved but there are dangling outputs + if (!hasOutputsToWrite) { + return [](ProcessingContext&) mutable -> void { + static bool once = false; + if (!once) { + LOG(info) << "No AODs to be saved."; + once = true; + } + }; + } + + // end of data functor is called at the end of the data stream + auto endofdatacb = [dod](EndOfStreamContext& context) { + dod->closeDataFiles(); + context.services().get().readyToQuit(QuitRequest::Me); + }; + + auto& callbacks = ic.services().get(); + callbacks.set(endofdatacb); + + // prepare map(startTime, tfNumber) + std::map tfNumbers; + std::map tfFilenames; + + std::vector aodMetaDataKeys; + std::vector aodMetaDataVals; + + // this functor is called once per time frame + return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { + LOGP(debug, "======== getGlobalAODSink::processing =========="); + LOGP(debug, " processing data set with {} entries", pc.inputs().size()); + + // return immediately if pc.inputs() is empty. This should never happen! + if (pc.inputs().size() == 0) { + LOGP(info, "No inputs available!"); + return; + } + + // update tfNumbers + uint64_t startTime = 0; + uint64_t tfNumber = 0; + auto ref = pc.inputs().get("tfn"); + if (ref.spec && ref.payload) { + startTime = DataRefUtils::getHeader(ref)->startTime; + tfNumber = pc.inputs().get("tfn"); + tfNumbers.insert(std::pair(startTime, tfNumber)); + } + // update tfFilenames + std::string aodInputFile; + auto ref2 = pc.inputs().get("tff"); + if (ref2.spec && ref2.payload) { + startTime = DataRefUtils::getHeader(ref2)->startTime; + aodInputFile = pc.inputs().get("tff"); + tfFilenames.insert(std::pair(startTime, aodInputFile)); + } + + // close all output files if one has reached size limit + dod->checkFileSizes(); + + // loop over the DataRefs which are contained in pc.inputs() + for (const auto& ref : pc.inputs()) { + if (!ref.spec) { + LOGP(debug, "Invalid input will be skipped!"); + continue; + } + + // get metadata + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { + aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); + } + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { + aodMetaDataVals = pc.inputs().get>(ref.spec->binding); + } + + // skip non-AOD refs + if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { + continue; + } + startTime = DataRefUtils::getHeader(ref)->startTime; + + // does this need to be saved? + auto dh = DataRefUtils::getHeader(ref); + auto tableName = dh->dataDescription.as(); + auto ds = dod->getDataOutputDescriptors(*dh); + if (ds.empty()) { + continue; + } + + // get TF number from startTime + auto it = tfNumbers.find(startTime); + if (it != tfNumbers.end()) { + tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); + } else { + LOGP(fatal, "No time frame number found for output with start time {}", startTime); + throw std::runtime_error("Processing is stopped!"); + } + // get aod input file from startTime + auto it2 = tfFilenames.find(startTime); + if (it2 != tfFilenames.end()) { + aodInputFile = it2->second; + } + + // get the TableConsumer and corresponding arrow table + auto msg = pc.inputs().get(ref.spec->binding); + if (msg.header == nullptr) { + LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); + continue; + } + auto s = pc.inputs().get(ref.spec->binding); + auto table = s->asArrowTable(); + if (!table->Validate().ok()) { + LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); + continue; + } + if (table->schema()->fields().empty()) { + LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); + } + + // loop over all DataOutputDescriptors + // a table can be saved in multiple ways + // e.g. different selections of columns to different files + for (auto d : ds) { + auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); + auto treename = fileAndFolder.folderName + "/" + d->treename; + TableToTree ta2tr(table, + fileAndFolder.file, + treename.c_str()); + + // update metadata + if (fileAndFolder.file->FindObjectAny("metaData")) { + LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); + } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { + TMap aodMetaDataMap; + for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { + aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); + } + fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); + } + + if (!d->colnames.empty()) { + for (auto& cn : d->colnames) { + auto idx = table->schema()->GetFieldIndex(cn); + auto col = table->column(idx); + auto field = table->schema()->field(idx); + if (idx != -1) { + ta2tr.addBranch(col, field); + } + } + } else { + ta2tr.addAllBranches(); + } + ta2tr.process(); + } + } + }; + } + + }; +} + +AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto tskmap = ac.outTskMap; + auto objmap = ac.outObjHistMap; + + return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function { + auto& callbacks = ic.services().get(); + auto inputObjects = std::make_shared>>(); + + static TFile* f[OutputObjHandlingPolicy::numPolicies]; + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + f[i] = nullptr; + } + + static std::string currentDirectory = ""; + static std::string currentFile = ""; + + auto endofdatacb = [inputObjects](EndOfStreamContext& context) { + LOG(debug) << "Writing merged objects and histograms to file"; + if (inputObjects->empty()) { + LOG(error) << "Output object map is empty!"; + context.services().get().readyToQuit(QuitRequest::Me); + return; + } + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + if (f[i] != nullptr) { + f[i]->Close(); + } + } + LOG(debug) << "All outputs merged in their respective target files"; + context.services().get().readyToQuit(QuitRequest::Me); + }; + + callbacks.set(endofdatacb); + return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { + auto const& ref = pc.inputs().get("x"); + if (!ref.header) { + LOG(error) << "Header not found"; + return; + } + if (!ref.payload) { + LOG(error) << "Payload not found"; + return; + } + auto datah = o2::header::get(ref.header); + if (!datah) { + LOG(error) << "No data header in stack"; + return; + } + + auto objh = o2::header::get(ref.header); + if (!objh) { + LOG(error) << "No output object header in stack"; + return; + } + + InputObject obj; + FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); + tm.InitMap(); + obj.kind = tm.ReadClass(); + tm.SetBufferOffset(0); + tm.ResetMap(); + if (obj.kind == nullptr) { + LOG(error) << "Cannot read class info from buffer."; + return; + } + + auto policy = objh->mPolicy; + auto sourceType = objh->mSourceType; + auto hash = objh->mTaskHash; + + obj.obj = tm.ReadObjectAny(obj.kind); + auto* named = static_cast(obj.obj); + obj.name = named->GetName(); + auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); + if (hpos == tskmap.end()) { + LOG(error) << "No task found for hash " << hash; + return; + } + auto taskname = hpos->name; + auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); + if (opos == objmap.end()) { + LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; + return; + } + auto objects = opos->bindings; + if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { + LOG(error) << "No object " << obj.name << " in map for task " << taskname; + return; + } + auto nameHash = runtime_hash(obj.name.c_str()); + InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; + auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); + // If it's the first one, we just add it to the list. + if (existing == inputObjects->end()) { + obj.count = objh->mPipelineSize; + inputObjects->push_back(std::make_pair(key, obj)); + existing = inputObjects->end() - 1; + } else { + obj.count = existing->second.count; + // Otherwise, we merge it with the existing one. + auto merger = existing->second.kind->GetMerge(); + if (!merger) { + LOG(error) << "Already one unmergeable object found for " << obj.name; + return; + } + TList coll; + coll.Add(static_cast(obj.obj)); + merger(existing->second.obj, &coll, nullptr); + } + // We expect as many objects as the pipeline size, for + // a given object name and task hash. + existing->second.count -= 1; + + if (existing->second.count != 0) { + return; + } + // Write the object here. + auto route = existing->first; + auto entry = existing->second; + auto file = ROOTfileNames.find(route.policy); + if (file == ROOTfileNames.end()) { + return; + } + auto filename = file->second; + if (f[route.policy] == nullptr) { + f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); + } + auto nextDirectory = route.directory; + if ((nextDirectory != currentDirectory) || (filename != currentFile)) { + if (!f[route.policy]->FindKey(nextDirectory.c_str())) { + f[route.policy]->mkdir(nextDirectory.c_str()); + } + currentDirectory = nextDirectory; + currentFile = filename; + } + + // translate the list-structure created by the registry into a directory structure within the file + std::function writeListToFile; + writeListToFile = [&](TList* list, TDirectory* parentDir) { + TIter next(list); + TObject* object = nullptr; + while ((object = next())) { + if (object->InheritsFrom(TList::Class())) { + writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); + } else { + parentDir->WriteObjectAny(object, object->Class(), object->GetName()); + auto* written = list->Remove(object); + delete written; + } + } + }; + + TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); + if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { + auto* outputList = static_cast(entry.obj); + outputList->SetOwner(false); + + // if registry should live in dedicated folder a TNamed object is appended to the list + if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { + delete outputList->Last(); + outputList->RemoveLast(); + currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); + } + + writeListToFile(outputList, currentDir); + outputList->SetOwner(); + delete outputList; + entry.obj = nullptr; + } else { + currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); + delete (TObject*)entry.obj; + entry.obj = nullptr; + } + }; + }}; +} +} // namespace o2::framework::writers diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.h b/Framework/AnalysisSupport/src/AODWriterHelpers.h new file mode 100644 index 0000000000000..7ae59a5cf3b01 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.h @@ -0,0 +1,28 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ +#define O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ + +#include "Framework/AlgorithmSpec.h" +#include + +namespace o2::framework::writers +{ + +struct AODWriterHelpers { + static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context); + static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context); +}; + +} // namespace o2::framework::writers + +#endif // O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index bba3499286e08..52435375d7e9e 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -16,6 +16,7 @@ #include "Framework/Capability.h" #include "Framework/Signpost.h" #include "AODJAlienReaderHelpers.h" +#include "AODWriterHelpers.h" #include #include #include @@ -33,6 +34,20 @@ struct ROOTFileReader : o2::framework::AlgorithmPlugin { } }; +struct ROOTObjWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputObjHistWriter(config); + } +}; + +struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputTTreeWriter(config); + } +}; + using namespace o2::framework; struct RunSummary : o2::framework::ServicePlugin { o2::framework::ServiceSpec* create() final @@ -211,6 +226,8 @@ struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService); DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery); DEFINE_DPL_PLUGINS_END diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/AnalysisContext.h new file mode 100644 index 0000000000000..0f62f952d0aaa --- /dev/null +++ b/Framework/Core/include/Framework/AnalysisContext.h @@ -0,0 +1,58 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_ANALYSISCONTEXT_H_ +#define O2_FRAMEWORK_ANALYSISCONTEXT_H_ + +#include +#include "Framework/InputSpec.h" +#include "Framework/OutputSpec.h" + +namespace o2::framework +{ +class DataOutputDirector; + +struct OutputTaskInfo { + uint32_t id; + std::string name; +}; + +struct OutputObjectInfo { + uint32_t id; + std::vector bindings; +}; + +// +struct AnalysisContext { + std::vector requestedAODs; + std::vector providedAODs; + std::vector requestedDYNs; + std::vector providedDYNs; + std::vector requestedIDXs; + std::vector providedOutputObjHist; + std::vector spawnerInputs; + + // Needed to created the hist writer + std::vector outTskMap; + std::vector outObjHistMap; + + // Needed to create the output director + std::vector outputsInputs; + std::vector isDangling; + + // Needed to create the aod writer + std::vector outputsInputsAOD; +}; +} // namespace o2::framework + +extern template class std::vector; +extern template class std::vector; + +#endif // O2_FRAMEWORK_ANALYSISCONTEXT_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h similarity index 71% rename from Framework/Core/src/AnalysisSupportHelpers.h rename to Framework/Core/include/Framework/AnalysisSupportHelpers.h index ba5bcedb4bc67..4ae601dc9e4a2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -14,6 +14,7 @@ #include "Framework/OutputSpec.h" #include "Framework/InputSpec.h" #include "Framework/DataProcessorSpec.h" +#include "Framework/AnalysisContext.h" #include "Headers/DataHeader.h" #include @@ -24,36 +25,7 @@ static constexpr std::array extendedAODOrigins{header::Da static constexpr std::array writableAODOrigins{header::DataOrigin{"AOD"}, header::DataOrigin{"AOD1"}, header::DataOrigin{"AOD2"}, header::DataOrigin{"DYN"}}; class DataOutputDirector; - -struct OutputTaskInfo { - uint32_t id; - std::string name; -}; - -struct OutputObjectInfo { - uint32_t id; - std::vector bindings; -}; -} // namespace o2::framework - -extern template class std::vector; -extern template class std::vector; - -namespace o2::framework -{ -// -struct AnalysisContext { - std::vector requestedAODs; - std::vector providedAODs; - std::vector requestedDYNs; - std::vector providedDYNs; - std::vector requestedIDXs; - std::vector providedOutputObjHist; - std::vector spawnerInputs; - - std::vector outTskMap; - std::vector outObjHistMap; -}; +class ConfigContext; // Helper class to be moved in the AnalysisSupport plugin at some point struct AnalysisSupportHelpers { @@ -74,11 +46,11 @@ struct AnalysisSupportHelpers { /// Match all inputs of kind ATSK and write them to a ROOT file, /// one root file per originating task. - static DataProcessorSpec getOutputObjHistSink(std::vector const& objmap, - std::vector const& tskmap); + static DataProcessorSpec getOutputObjHistSink(ConfigContext const&); /// writes inputs of kind AOD to file - static DataProcessorSpec getGlobalAODSink(std::shared_ptr dod, - std::vector const& outputInputs, int compression); + static DataProcessorSpec getGlobalAODSink(ConfigContext const&); + /// Get the data director + static std::shared_ptr getDataOutputDirector(ConfigContext const& ctx); }; }; // namespace o2::framework diff --git a/Framework/Core/include/Framework/ConfigContext.h b/Framework/Core/include/Framework/ConfigContext.h index 5790699fe68bb..87259f0519915 100644 --- a/Framework/Core/include/Framework/ConfigContext.h +++ b/Framework/Core/include/Framework/ConfigContext.h @@ -8,11 +8,11 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_CONFIG_CONTEXT_H -#define FRAMEWORK_CONFIG_CONTEXT_H +#ifndef O2_FRAMEWORK_CONFIG_CONTEXT_H_ +#define O2_FRAMEWORK_CONFIG_CONTEXT_H_ #include "Framework/ConfigParamRegistry.h" -#include "Framework/ServiceRegistry.h" +#include "Framework/ServiceRegistryRef.h" namespace o2::framework { @@ -23,9 +23,10 @@ namespace o2::framework class ConfigContext { public: - ConfigContext(ConfigParamRegistry& options, int argc, char** argv) : mOptions{options}, mArgc{argc}, mArgv{argv} {} + ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv); [[nodiscard]] ConfigParamRegistry& options() const { return mOptions; } + [[nodiscard]] ServiceRegistryRef services() const { return mServices; } [[nodiscard]] bool helpOnCommandLine() const; @@ -34,11 +35,13 @@ class ConfigContext private: ConfigParamRegistry& mOptions; + + ServiceRegistryRef mServices; // additionaly keep information about the original command line int mArgc = 0; char** mArgv = nullptr; }; -} // namespace o2 +} // namespace o2::framework -#endif +#endif // O2_FRAMEWORK_CONFIG_CONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index eee4c4b6583d3..8293bf0cf7039 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -30,6 +30,7 @@ #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "Framework/ConfigParamDiscovery.h" +#include "ServiceRegistryRef.h" #include namespace o2::framework @@ -198,7 +199,8 @@ int mainNoCatch(int argc, char** argv) workflowOptions.push_back(extra); } - ConfigContext configContext(workflowOptionsRegistry, argc, argv); + ServiceRegistry configRegistry; + ConfigContext configContext(workflowOptionsRegistry, ServiceRegistryRef{configRegistry}, argc, argv); o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); overrideCloning(configContext, specs); overridePipeline(configContext, specs); diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index e949f27a6eed6..eb17566fd6d31 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -9,18 +9,16 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" #include "Framework/OutputObjHeader.h" #include "Framework/ControlService.h" #include "Framework/EndOfStreamContext.h" #include "Framework/DeviceSpec.h" #include "Framework/TableTreeHelpers.h" - -#include "TFile.h" -#include "TTree.h" -#include "TMap.h" -#include "TObjString.h" +#include "Framework/PluginManager.h" +#include "Framework/ConfigContext.h" +#include "WorkflowHelpers.h" template class std::vector; template class std::vector; @@ -28,21 +26,105 @@ template class std::vector; namespace o2::framework { -struct InputObjectRoute { - std::string name; - uint32_t uniqueId; - std::string directory; - uint32_t taskHash; - OutputObjHandlingPolicy policy; - OutputObjSourceType sourceType; -}; +std::shared_ptr AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx) +{ + auto const& options = ctx.options(); + auto const& OutputsInputs = ctx.services().get().outputsInputs; + auto const& isDangling = ctx.services().get().isDangling; + + std::shared_ptr dod = std::make_shared(); + + // analyze options and take actions accordingly + // default values + std::string rdn, resdir("./"); + std::string fnb, fnbase("AnalysisResults_trees"); + float mfs, maxfilesize(-1.); + std::string fmo, filemode("RECREATE"); + int ntfm, ntfmerge = 1; + + // values from json + if (options.isSet("aod-writer-json")) { + auto fnjson = options.get("aod-writer-json"); + if (!fnjson.empty()) { + std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); + if (!rdn.empty()) { + resdir = rdn; + } + if (!fnb.empty()) { + fnbase = fnb; + } + if (!fmo.empty()) { + filemode = fmo; + } + if (mfs > 0.) { + maxfilesize = mfs; + } + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + } + + // values from command line options, information from json is overwritten + if (options.isSet("aod-writer-resdir")) { + rdn = options.get("aod-writer-resdir"); + if (!rdn.empty()) { + resdir = rdn; + } + } + if (options.isSet("aod-writer-resfile")) { + fnb = options.get("aod-writer-resfile"); + if (!fnb.empty()) { + fnbase = fnb; + } + } + if (options.isSet("aod-writer-resmode")) { + fmo = options.get("aod-writer-resmode"); + if (!fmo.empty()) { + filemode = fmo; + } + } + if (options.isSet("aod-writer-maxfilesize")) { + mfs = options.get("aod-writer-maxfilesize"); + if (mfs > 0) { + maxfilesize = mfs; + } + } + if (options.isSet("aod-writer-ntfmerge")) { + ntfm = options.get("aod-writer-ntfmerge"); + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + // parse the keepString + if (options.isSet("aod-writer-keep")) { + auto keepString = options.get("aod-writer-keep"); + if (!keepString.empty()) { + dod->reset(); + std::string d("dangling"); + if (d.find(keepString) == 0) { + // use the dangling outputs + std::vector danglingOutputs; + for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { + danglingOutputs.emplace_back(OutputsInputs[ii]); + } + } + dod->readSpecs(danglingOutputs); + } else { + // use the keep string + dod->readString(keepString); + } + } + } + dod->setResultDir(resdir); + dod->setFilenameBase(fnbase); + dod->setFileMode(filemode); + dod->setMaximumFileSize(maxfilesize); + dod->setNumberTimeFramesToMerge(ntfmerge); -struct InputObject { - TClass* kind = nullptr; - void* obj = nullptr; - std::string name; - int count = -1; -}; + return dod; +} void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector const& providedOutputs, std::vector const& requestedInputs, @@ -125,191 +207,16 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c } } -const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, - {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; - // ============================================================================= -DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector const& objmap, std::vector const& tskmap) +DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx) { - auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function { - auto& callbacks = ic.services().get(); - auto inputObjects = std::make_shared>>(); - - static TFile* f[OutputObjHandlingPolicy::numPolicies]; - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - f[i] = nullptr; - } - - static std::string currentDirectory = ""; - static std::string currentFile = ""; - - auto endofdatacb = [inputObjects](EndOfStreamContext& context) { - LOG(debug) << "Writing merged objects and histograms to file"; - if (inputObjects->empty()) { - LOG(error) << "Output object map is empty!"; - context.services().get().readyToQuit(QuitRequest::Me); - return; - } - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - if (f[i] != nullptr) { - f[i]->Close(); - } - } - LOG(debug) << "All outputs merged in their respective target files"; - context.services().get().readyToQuit(QuitRequest::Me); - }; - - callbacks.set(endofdatacb); - return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { - auto const& ref = pc.inputs().get("x"); - if (!ref.header) { - LOG(error) << "Header not found"; - return; - } - if (!ref.payload) { - LOG(error) << "Payload not found"; - return; - } - auto datah = o2::header::get(ref.header); - if (!datah) { - LOG(error) << "No data header in stack"; - return; - } - - auto objh = o2::header::get(ref.header); - if (!objh) { - LOG(error) << "No output object header in stack"; - return; - } - - InputObject obj; - FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); - tm.InitMap(); - obj.kind = tm.ReadClass(); - tm.SetBufferOffset(0); - tm.ResetMap(); - if (obj.kind == nullptr) { - LOG(error) << "Cannot read class info from buffer."; - return; - } - - auto policy = objh->mPolicy; - auto sourceType = objh->mSourceType; - auto hash = objh->mTaskHash; - - obj.obj = tm.ReadObjectAny(obj.kind); - auto* named = static_cast(obj.obj); - obj.name = named->GetName(); - auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); - if (hpos == tskmap.end()) { - LOG(error) << "No task found for hash " << hash; - return; - } - auto taskname = hpos->name; - auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); - if (opos == objmap.end()) { - LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; - return; - } - auto objects = opos->bindings; - if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { - LOG(error) << "No object " << obj.name << " in map for task " << taskname; - return; - } - auto nameHash = runtime_hash(obj.name.c_str()); - InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; - auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); - // If it's the first one, we just add it to the list. - if (existing == inputObjects->end()) { - obj.count = objh->mPipelineSize; - inputObjects->push_back(std::make_pair(key, obj)); - existing = inputObjects->end() - 1; - } else { - obj.count = existing->second.count; - // Otherwise, we merge it with the existing one. - auto merger = existing->second.kind->GetMerge(); - if (!merger) { - LOG(error) << "Already one unmergeable object found for " << obj.name; - return; - } - TList coll; - coll.Add(static_cast(obj.obj)); - merger(existing->second.obj, &coll, nullptr); - } - // We expect as many objects as the pipeline size, for - // a given object name and task hash. - existing->second.count -= 1; - - if (existing->second.count != 0) { - return; - } - // Write the object here. - auto route = existing->first; - auto entry = existing->second; - auto file = ROOTfileNames.find(route.policy); - if (file == ROOTfileNames.end()) { - return; - } - auto filename = file->second; - if (f[route.policy] == nullptr) { - f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); - } - auto nextDirectory = route.directory; - if ((nextDirectory != currentDirectory) || (filename != currentFile)) { - if (!f[route.policy]->FindKey(nextDirectory.c_str())) { - f[route.policy]->mkdir(nextDirectory.c_str()); - } - currentDirectory = nextDirectory; - currentFile = filename; - } - - // translate the list-structure created by the registry into a directory structure within the file - std::function writeListToFile; - writeListToFile = [&](TList* list, TDirectory* parentDir) { - TIter next(list); - TObject* object = nullptr; - while ((object = next())) { - if (object->InheritsFrom(TList::Class())) { - writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); - } else { - parentDir->WriteObjectAny(object, object->Class(), object->GetName()); - auto* written = list->Remove(object); - delete written; - } - } - }; - - TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); - if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { - auto* outputList = static_cast(entry.obj); - outputList->SetOwner(false); - - // if registry should live in dedicated folder a TNamed object is appended to the list - if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { - delete outputList->Last(); - outputList->RemoveLast(); - currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); - } - - writeListToFile(outputList, currentDir); - outputList->SetOwner(); - delete outputList; - entry.obj = nullptr; - } else { - currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); - delete (TObject*)entry.obj; - entry.obj = nullptr; - } - }; - }; - - char const* name = "internal-dpl-aod-global-analysis-file-sink"; // Lifetime is sporadic because we do not ask each analysis task to send its // results every timeframe. DataProcessorSpec spec{ - .name = name, + .name = "internal-dpl-aod-global-analysis-file-sink", .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)}, - .algorithm = {writerFunction}, + .outputs = {}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx), }; return spec; @@ -317,188 +224,17 @@ DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector dod, - std::vector const& outputInputs, int compressionLevel) + AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx) { - - auto writerFunction = [dod, outputInputs, compressionLevel](InitContext& ic) -> std::function { - LOGP(debug, "======== getGlobalAODSink::Init =========="); - - // find out if any table needs to be saved - bool hasOutputsToWrite = false; - for (auto& outobj : outputInputs) { - auto ds = dod->getDataOutputDescriptors(outobj); - if (ds.size() > 0) { - hasOutputsToWrite = true; - break; - } - } - - // if nothing needs to be saved then return a trivial functor - // this happens when nothing needs to be saved but there are dangling outputs - if (!hasOutputsToWrite) { - return [](ProcessingContext&) mutable -> void { - static bool once = false; - if (!once) { - LOG(info) << "No AODs to be saved."; - once = true; - } - }; - } - - // end of data functor is called at the end of the data stream - auto endofdatacb = [dod](EndOfStreamContext& context) { - dod->closeDataFiles(); - context.services().get().readyToQuit(QuitRequest::Me); - }; - - auto& callbacks = ic.services().get(); - callbacks.set(endofdatacb); - - // prepare map(startTime, tfNumber) - std::map tfNumbers; - std::map tfFilenames; - - std::vector aodMetaDataKeys; - std::vector aodMetaDataVals; - - // this functor is called once per time frame - return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { - LOGP(debug, "======== getGlobalAODSink::processing =========="); - LOGP(debug, " processing data set with {} entries", pc.inputs().size()); - - // return immediately if pc.inputs() is empty. This should never happen! - if (pc.inputs().size() == 0) { - LOGP(info, "No inputs available!"); - return; - } - - // update tfNumbers - uint64_t startTime = 0; - uint64_t tfNumber = 0; - auto ref = pc.inputs().get("tfn"); - if (ref.spec && ref.payload) { - startTime = DataRefUtils::getHeader(ref)->startTime; - tfNumber = pc.inputs().get("tfn"); - tfNumbers.insert(std::pair(startTime, tfNumber)); - } - // update tfFilenames - std::string aodInputFile; - auto ref2 = pc.inputs().get("tff"); - if (ref2.spec && ref2.payload) { - startTime = DataRefUtils::getHeader(ref2)->startTime; - aodInputFile = pc.inputs().get("tff"); - tfFilenames.insert(std::pair(startTime, aodInputFile)); - } - - // close all output files if one has reached size limit - dod->checkFileSizes(); - - // loop over the DataRefs which are contained in pc.inputs() - for (const auto& ref : pc.inputs()) { - if (!ref.spec) { - LOGP(debug, "Invalid input will be skipped!"); - continue; - } - - // get metadata - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { - aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); - } - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { - aodMetaDataVals = pc.inputs().get>(ref.spec->binding); - } - - // skip non-AOD refs - if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { - continue; - } - startTime = DataRefUtils::getHeader(ref)->startTime; - - // does this need to be saved? - auto dh = DataRefUtils::getHeader(ref); - auto tableName = dh->dataDescription.as(); - auto ds = dod->getDataOutputDescriptors(*dh); - if (ds.empty()) { - continue; - } - - // get TF number from startTime - auto it = tfNumbers.find(startTime); - if (it != tfNumbers.end()) { - tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); - } else { - LOGP(fatal, "No time frame number found for output with start time {}", startTime); - throw std::runtime_error("Processing is stopped!"); - } - // get aod input file from startTime - auto it2 = tfFilenames.find(startTime); - if (it2 != tfFilenames.end()) { - aodInputFile = it2->second; - } - - // get the TableConsumer and corresponding arrow table - auto msg = pc.inputs().get(ref.spec->binding); - if (msg.header == nullptr) { - LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); - continue; - } - auto s = pc.inputs().get(ref.spec->binding); - auto table = s->asArrowTable(); - if (!table->Validate().ok()) { - LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); - continue; - } - if (table->schema()->fields().empty()) { - LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); - } - - // loop over all DataOutputDescriptors - // a table can be saved in multiple ways - // e.g. different selections of columns to different files - for (auto d : ds) { - auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); - auto treename = fileAndFolder.folderName + "/" + d->treename; - TableToTree ta2tr(table, - fileAndFolder.file, - treename.c_str()); - - // update metadata - if (fileAndFolder.file->FindObjectAny("metaData")) { - LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); - } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { - TMap aodMetaDataMap; - for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { - aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); - } - fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); - } - - if (!d->colnames.empty()) { - for (auto& cn : d->colnames) { - auto idx = table->schema()->GetFieldIndex(cn); - auto col = table->column(idx); - auto field = table->schema()->field(idx); - if (idx != -1) { - ta2tr.addBranch(col, field); - } - } - } else { - ta2tr.addAllBranches(); - } - ta2tr.process(); - } - } - }; - }; // end of writerFunction + auto& ac = ctx.services().get(); // the command line options relevant for the writer are global // see runDataProcessing.h DataProcessorSpec spec{ .name = "internal-dpl-aod-writer", - .inputs = outputInputs, + .inputs = ac.outputsInputsAOD, .outputs = {}, - .algorithm = AlgorithmSpec{writerFunction}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx), }; return spec; diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 1a656e4d60080..230d708b47dc7 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -30,7 +30,7 @@ #include "Framework/ServiceMetricsInfo.h" #include "WorkflowHelpers.h" #include "Framework/WorkflowSpecNode.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "CommonMessageBackendsHelpers.h" #include @@ -516,7 +516,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); // create DataOutputDescriptor - std::shared_ptr dod = WorkflowHelpers::getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink @@ -537,11 +537,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // add TFNumber and TFFilename as input to the writer outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); - int compression = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compression = ctx.options().get("aod-writer-compression"); - } - workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compression)); + workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); } // Move the dummy sink at the end, if needed for (size_t i = 0; i < workflow.size(); ++i) { diff --git a/Framework/Core/src/ConfigContext.cxx b/Framework/Core/src/ConfigContext.cxx index 726332e1d0ae3..9b121b1884998 100644 --- a/Framework/Core/src/ConfigContext.cxx +++ b/Framework/Core/src/ConfigContext.cxx @@ -14,6 +14,9 @@ namespace o2::framework { +ConfigContext::ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv) + : mOptions{options}, mServices{services}, mArgc{argc}, mArgv{argv} {} + bool ConfigContext::helpOnCommandLine() const { bool helpasked = false; diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index da9a135dc5eb8..3782c48e81c56 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -9,7 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "WorkflowHelpers.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/AlgorithmSpec.h" #include "Framework/AODReaderHelpers.h" #include "Framework/ConfigParamSpec.h" @@ -153,7 +153,7 @@ int defaultConditionQueryRateMultiplier() return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1; } -void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx) +void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { LOG(info) << "This is not a real device, merely a placeholder for external inputs"; @@ -241,7 +241,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}}); } - AnalysisContext ac; + ctx.services().registerService(ServiceRegistryHelpers::handleForService(new AnalysisContext)); + auto& ac = ctx.services().get(); + std::vector requestedCCDBs; std::vector providedCCDBs; @@ -573,7 +575,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // This is to inject a file sink so that any dangling ATSK object is written // to a ROOT file. if (ac.providedOutputObjHist.empty() == false) { - auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ac.outObjHistMap, ac.outTskMap); + auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ctx); extraSpecs.push_back(rootSink); } @@ -581,41 +583,38 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.clear(); /// Analyze all ouputs - auto [outputsInputs, isDangling] = analyzeOutputs(workflow); + auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow); + ac.isDangling = isDanglingTmp; + ac.outputsInputs = outputsInputsTmp; // create DataOutputDescriptor - std::shared_ptr dod = getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - std::vector outputsInputsAOD; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); - if (ds.size() > 0 || isDangling[ii]) { - outputsInputsAOD.emplace_back(outputsInputs[ii]); + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { + auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]); + if (ds.size() > 0 || ac.isDangling[ii]) { + ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]); } } } // file sink for any AOD output - if (outputsInputsAOD.size() > 0) { + if (ac.outputsInputsAOD.size() > 0) { // add TFNumber and TFFilename as input to the writer - outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); - outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); - int compressionLevel = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compressionLevel = ctx.options().get("aod-writer-compression"); - } - auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compressionLevel); + ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); + ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); + auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); extraSpecs.push_back(fileSink); - auto it = std::find_if(outputsInputs.begin(), outputsInputs.end(), [](InputSpec& spec) -> bool { + auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool { return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); - size_t ii = std::distance(outputsInputs.begin(), it); - isDangling[ii] = false; + size_t ii = std::distance(ac.outputsInputs.begin(), it); + ac.isDangling[ii] = false; } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); @@ -623,20 +622,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // Select dangling outputs which are not of type AOD std::vector redirectedOutputsInputs; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { if (ctx.options().get("forwarding-policy") == "none") { continue; } // We forward to the output proxy all the inputs only if they are dangling // or if the forwarding policy is "proxy". - if (!isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { + if (!ac.isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { continue; } // AODs are skipped in any case. - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { continue; } - redirectedOutputsInputs.emplace_back(outputsInputs[ii]); + redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]); } std::vector unmatched; @@ -985,102 +984,6 @@ struct DataMatcherId { size_t id; }; -std::shared_ptr WorkflowHelpers::getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& isDangling) -{ - std::shared_ptr dod = std::make_shared(); - - // analyze options and take actions accordingly - // default values - std::string rdn, resdir("./"); - std::string fnb, fnbase("AnalysisResults_trees"); - float mfs, maxfilesize(-1.); - std::string fmo, filemode("RECREATE"); - int ntfm, ntfmerge = 1; - - // values from json - if (options.isSet("aod-writer-json")) { - auto fnjson = options.get("aod-writer-json"); - if (!fnjson.empty()) { - std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); - if (!rdn.empty()) { - resdir = rdn; - } - if (!fnb.empty()) { - fnbase = fnb; - } - if (!fmo.empty()) { - filemode = fmo; - } - if (mfs > 0.) { - maxfilesize = mfs; - } - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - } - - // values from command line options, information from json is overwritten - if (options.isSet("aod-writer-resdir")) { - rdn = options.get("aod-writer-resdir"); - if (!rdn.empty()) { - resdir = rdn; - } - } - if (options.isSet("aod-writer-resfile")) { - fnb = options.get("aod-writer-resfile"); - if (!fnb.empty()) { - fnbase = fnb; - } - } - if (options.isSet("aod-writer-resmode")) { - fmo = options.get("aod-writer-resmode"); - if (!fmo.empty()) { - filemode = fmo; - } - } - if (options.isSet("aod-writer-maxfilesize")) { - mfs = options.get("aod-writer-maxfilesize"); - if (mfs > 0) { - maxfilesize = mfs; - } - } - if (options.isSet("aod-writer-ntfmerge")) { - ntfm = options.get("aod-writer-ntfmerge"); - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - // parse the keepString - if (options.isSet("aod-writer-keep")) { - auto keepString = options.get("aod-writer-keep"); - if (!keepString.empty()) { - dod->reset(); - std::string d("dangling"); - if (d.find(keepString) == 0) { - // use the dangling outputs - std::vector danglingOutputs; - for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { - danglingOutputs.emplace_back(OutputsInputs[ii]); - } - } - dod->readSpecs(danglingOutputs); - } else { - // use the keep string - dod->readString(keepString); - } - } - } - dod->setResultDir(resdir); - dod->setFilenameBase(fnbase); - dod->setFileMode(filemode); - dod->setMaximumFileSize(maxfilesize); - dod->setNumberTimeFramesToMerge(ntfmerge); - - return dod; -} - std::tuple, std::vector> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow) { // compute total number of input/output diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index b20249b99edc8..b2a4d4cab55df 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -180,7 +180,7 @@ struct WorkflowHelpers { // dangling inputs are satisfied. // @a workflow the workflow to decorate // @a ctx the context for the configuration phase - static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx); + static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx); // Final adjustments to @a workflow after service devices have been injected. static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx); @@ -204,8 +204,6 @@ struct WorkflowHelpers { const std::vector& edges, const std::vector& index); - static std::shared_ptr getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& outputTypes); - /// Given @a workflow it gathers all the OutputSpec and in addition provides /// the information whether and output is dangling and/or of type AOD /// An Output is dangling if it does not have a corresponding InputSpec. diff --git a/Framework/Core/test/Mocking.h b/Framework/Core/test/Mocking.h index b3e48ad3b2d0f..a42a1b30a662f 100644 --- a/Framework/Core/test/Mocking.h +++ b/Framework/Core/test/Mocking.h @@ -34,7 +34,10 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static std::unique_ptr services; + // We need to reset it because we will inject services into it. + services = std::make_unique(); + auto context = std::make_unique(registry, ServiceRegistryRef{*services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/benchmark_WorkflowHelpers.cxx b/Framework/Core/test/benchmark_WorkflowHelpers.cxx index f1c070d8a0f4e..09a9ae0cca923 100644 --- a/Framework/Core/test/benchmark_WorkflowHelpers.cxx +++ b/Framework/Core/test/benchmark_WorkflowHelpers.cxx @@ -30,7 +30,8 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/test_OverrideLabels.cxx b/Framework/Core/test/test_OverrideLabels.cxx index 573bd13be797a..c5134c0c169c0 100644 --- a/Framework/Core/test/test_OverrideLabels.cxx +++ b/Framework/Core/test/test_OverrideLabels.cxx @@ -31,7 +31,8 @@ std::unique_ptr mockupLabels(std::string labelArg) store->preload(); store->activate(); registry = ConfigParamRegistry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index 9986f52a1d940..efac16f6da4f0 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -17,6 +17,7 @@ #include "Framework/AnalysisTask.h" #include #include +#include using namespace o2; using namespace o2::framework; @@ -43,7 +44,7 @@ struct EtaAndClsHistogramsSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } } @@ -57,7 +58,7 @@ struct EtaAndClsHistogramsIUSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } }