Skip to content

Commit

Permalink
DPL: Use AnalysisContext also in the case of amended topologies
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Nov 15, 2024
1 parent e260639 commit 3486413
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -420,24 +420,26 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
std::vector<InputSpec> requestedAODs;
std::vector<InputSpec> requestedDYNs;
std::vector<OutputSpec> providedDYNs;
auto &ac = ctx.services().get<AnalysisContext>();
ac.requestedAODs.clear();
ac.requestedDYNs.clear();
ac.providedDYNs.clear();


auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };

if (builder != workflow.end()) {
// collect currently requested IDXs
std::vector<InputSpec> requestedIDXs;
ac.requestedIDXs.clear();
for (auto& d : workflow) {
if (d.name == builder->name) {
continue;
}
for (auto& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) {
auto copy = i;
DataSpecUtils::updateInputList(requestedIDXs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
}
}
}
Expand All @@ -446,8 +448,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
builder->outputs.clear();
// replace AlgorithmSpec
// FIXME: it should be made more generic, so it does not need replacement...
builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(requestedIDXs);
AnalysisSupportHelpers::addMissingOutputsToBuilder(requestedIDXs, requestedAODs, requestedDYNs, *builder);
builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
}

if (spawner != workflow.end()) {
Expand All @@ -459,20 +461,20 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) {
auto copy = i;
DataSpecUtils::updateInputList(requestedDYNs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
}
}
for (auto const& o : d.outputs) {
if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) {
providedDYNs.emplace_back(o);
ac.providedDYNs.emplace_back(o);
}
}
}
std::sort(requestedDYNs.begin(), requestedDYNs.end(), inputSpecLessThan);
std::sort(providedDYNs.begin(), providedDYNs.end(), outputSpecLessThan);
std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
std::vector<InputSpec> spawnerInputs;
for (auto& input : requestedDYNs) {
if (std::none_of(providedDYNs.begin(), providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
for (auto& input : ac.requestedDYNs) {
if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
spawnerInputs.emplace_back(input);
}
}
Expand All @@ -482,7 +484,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// replace AlgorithmSpec
// FIXME: it should be made more generic, so it does not need replacement...
spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(spawnerInputs);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, *spawner);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, ac.requestedAODs, *spawner);
}

if (writer != workflow.end()) {
Expand All @@ -496,14 +498,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, AODOrigins)) {
auto copy = i;
DataSpecUtils::updateInputList(requestedAODs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
}
}
}

// remove unmatched outputs
auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(requestedAODs.begin(), requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
});
reader->outputs.erase(o_end, reader->outputs.end());
if (reader->outputs.empty()) {
Expand All @@ -521,22 +523,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// select outputs of type AOD which need to be saved
// ATTENTION: if there are dangling outputs the getGlobalAODSink
// has to be created in any case!
std::vector<InputSpec> outputsInputsAOD;
ac.outputsInputsAOD.clear();

for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
if (!ds.empty() || isDangling[ii]) {
outputsInputsAOD.emplace_back(outputsInputs[ii]);
ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
}
}
}

// file sink for any AOD output
if (!outputsInputsAOD.empty()) {
if (!ac.outputsInputsAOD.empty()) {
// add TFNumber and TFFilename as input to the writer
outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
}
// Move the dummy sink at the end, if needed
Expand Down

0 comments on commit 3486413

Please sign in to comment.