Skip to content

Commit

Permalink
DPL: add SendingPolicy for the case destination is expendable
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Feb 9, 2024
1 parent eb9aadc commit 7897c37
Showing 1 changed file with 18 additions and 1 deletion.
19 changes: 18 additions & 1 deletion Framework/Core/src/SendingPolicy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
{
return {SendingPolicy{
.name = "dispatcher",
.matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
.matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
if (source.name == "Dispatcher") {
return true;
}
Expand Down Expand Up @@ -103,6 +103,23 @@ std::vector<SendingPolicy> SendingPolicy::createDefaultPolicies()
} else if (res == (size_t) fair::mq::TransferCode::error) {
LOGP(fatal, "Error while sending on channel {}", channel->GetName());
} }},
SendingPolicy{
.name = "expendable",
.matcher = [](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
auto has_label = [](DataProcessorLabel const& label) {
return label.value == "expendable";
};
return std::find_if(dest.labels.begin(), dest.labels.end(), has_label) != dest.labels.end(); },
.send = [](fair::mq::Parts& parts, ChannelIndex channelIndex, ServiceRegistryRef registry) {
auto &proxy = registry.get<FairMQDeviceProxy>();
auto *channel = proxy.getOutputChannel(channelIndex);
auto timeout = 1000;
auto res = channel->Send(parts, timeout);
if (res == (size_t)fair::mq::TransferCode::timeout) {
LOGP(warning, "Timed out sending after {}s. Downstream backpressure detected on expendable channel {}.", timeout/1000, channel->GetName());
} else if (res == (size_t) fair::mq::TransferCode::error) {
LOGP(info, "Error while sending on channel {}", channel->GetName());
} }},
SendingPolicy{
.name = "default",
.matcher = [](DataProcessorSpec const&, DataProcessorSpec const&, ConfigContext const&) { return true; },
Expand Down

0 comments on commit 7897c37

Please sign in to comment.