From 3486413113c892d7293ec5dcddac58db5301acc0 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:17:55 +0100 Subject: [PATCH] DPL: Use AnalysisContext also in the case of amended topologies --- Framework/Core/src/ArrowSupport.cxx | 44 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 230d708b47dc7..e6f8fb90c7af9 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -420,16 +420,18 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; }); auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; }); auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; }); - std::vector requestedAODs; - std::vector requestedDYNs; - std::vector providedDYNs; + auto &ac = ctx.services().get(); + ac.requestedAODs.clear(); + ac.requestedDYNs.clear(); + ac.providedDYNs.clear(); + auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; if (builder != workflow.end()) { // collect currently requested IDXs - std::vector requestedIDXs; + ac.requestedIDXs.clear(); for (auto& d : workflow) { if (d.name == builder->name) { continue; @@ -437,7 +439,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (auto& i : d.inputs) { if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) { auto copy = i; - DataSpecUtils::updateInputList(requestedIDXs, std::move(copy)); + DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy)); } } } @@ -446,8 +448,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() builder->outputs.clear(); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... - builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(requestedIDXs); - AnalysisSupportHelpers::addMissingOutputsToBuilder(requestedIDXs, requestedAODs, requestedDYNs, *builder); + builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs); + AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder); } if (spawner != workflow.end()) { @@ -459,20 +461,20 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (auto const& i : d.inputs) { if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) { auto copy = i; - DataSpecUtils::updateInputList(requestedDYNs, std::move(copy)); + DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy)); } } for (auto const& o : d.outputs) { if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) { - providedDYNs.emplace_back(o); + ac.providedDYNs.emplace_back(o); } } } - std::sort(requestedDYNs.begin(), requestedDYNs.end(), inputSpecLessThan); - std::sort(providedDYNs.begin(), providedDYNs.end(), outputSpecLessThan); + std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); + std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); std::vector spawnerInputs; - for (auto& input : requestedDYNs) { - if (std::none_of(providedDYNs.begin(), providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { + for (auto& input : ac.requestedDYNs) { + if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { spawnerInputs.emplace_back(input); } } @@ -482,7 +484,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(spawnerInputs); - AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, *spawner); + AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, ac.requestedAODs, *spawner); } if (writer != workflow.end()) { @@ -496,14 +498,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() for (auto const& i : d.inputs) { if (DataSpecUtils::partialMatch(i, AODOrigins)) { auto copy = i; - DataSpecUtils::updateInputList(requestedAODs, std::move(copy)); + DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy)); } } } // remove unmatched outputs auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) { - return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(requestedAODs.begin(), requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); }); + return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); }); }); reader->outputs.erase(o_end, reader->outputs.end()); if (reader->outputs.empty()) { @@ -521,22 +523,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - std::vector outputsInputsAOD; + ac.outputsInputsAOD.clear(); for (auto ii = 0u; ii < outputsInputs.size(); ii++) { if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); if (!ds.empty() || isDangling[ii]) { - outputsInputsAOD.emplace_back(outputsInputs[ii]); + ac.outputsInputsAOD.emplace_back(outputsInputs[ii]); } } } // file sink for any AOD output - if (!outputsInputsAOD.empty()) { + if (!ac.outputsInputsAOD.empty()) { // add TFNumber and TFFilename as input to the writer - outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); - outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); + ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); + ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); } // Move the dummy sink at the end, if needed