From 976e46008822c619e15f4b47a6701dc0fcaa6eef Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sat, 10 Feb 2024 01:17:29 +0100 Subject: [PATCH] DPL: add SendingPolicy for the case destination is expendable --- Framework/Core/src/SendingPolicy.cxx | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/SendingPolicy.cxx b/Framework/Core/src/SendingPolicy.cxx index 3d65975919676..3aeccb680c150 100644 --- a/Framework/Core/src/SendingPolicy.cxx +++ b/Framework/Core/src/SendingPolicy.cxx @@ -29,7 +29,7 @@ std::vector SendingPolicy::createDefaultPolicies() { return {SendingPolicy{ .name = "dispatcher", - .matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) { + .matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) { if (source.name == "Dispatcher") { return true; } @@ -103,6 +103,23 @@ std::vector SendingPolicy::createDefaultPolicies() } else if (res == (size_t) fair::mq::TransferCode::error) { LOGP(fatal, "Error while sending on channel {}", channel->GetName()); } }}, + SendingPolicy{ + .name = "expendable", + .matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) { + auto has_label = [](DataProcessorLabel const& label) { + return label.value == "expendable"; + }; + return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); }, + .send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) { + auto &proxy = registry.get(); + auto *channel = proxy.getOutputChannel(channelIndex); + auto timeout = 1000; + auto res = channel->Send(parts, timeout); + if (res == (size_t)fair::mq::TransferCode::timeout) { + LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}.", timeout/1000, channel->GetName()); + } else if (res == (size_t) fair::mq::TransferCode::error) { + LOGP(info, "Error while sending on channel {}", channel->GetName()); + } }}, SendingPolicy{ .name = "default", .matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return true; },