Skip to content

Commit

Permalink
DPL: thread safe activity detection
Browse files Browse the repository at this point in the history
This makes sure that the detection of active data processors in
a device is threadsafe by recording the last active one.

The main loop will then short circuit libuv in case the last active
dataprocessor was reported and it will reset the last active pointer
to nullptr if it has not changed in the meanwhile (meaning that another
data processor was actually able to process something).
  • Loading branch information
ktf committed Sep 23, 2024
1 parent d55ec92 commit 2f7add5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 26 deletions.
4 changes: 1 addition & 3 deletions Framework/Core/include/Framework/DataProcessingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ struct DataProcessorSpec;
struct DataProcessorContext {
DataProcessorContext(DataProcessorContext const&) = delete;
DataProcessorContext() = default;
// These are specific of a given context and therefore
// not shared by threads.
bool* wasActive = nullptr;

bool allDone = false;
/// Latest run number we processed globally for this DataProcessor.
int64_t lastRunNumberProcessed = -1;
Expand Down
1 change: 0 additions & 1 deletion Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class DataProcessingDevice : public fair::mq::Device
std::vector<fair::mq::RegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
std::mutex mRegionInfoMutex;
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
bool mWasActive = false; /// Whether or not the device was active at last iteration.
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
/// Handle to wake up the main loop from other threads
Expand Down
7 changes: 7 additions & 0 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ typedef struct uv_async_s uv_async_t;
namespace o2::framework
{

struct DataProcessorContext;

/// Running state information of a given device
struct DeviceState {
/// Motivation for the loop being triggered.
Expand Down Expand Up @@ -108,6 +110,11 @@ struct DeviceState {
/// the bits we are interested in.
std::vector<int> severityStack;
TransitionHandlingState transitionHandling = TransitionHandlingState::NoTransition;

// The DataProcessorContext which was most recently active.
// We use this to determine if we should trigger the loop without
// waiting for some events.
std::atomic<DataProcessorContext*> lastActiveDataProcessor = nullptr;
};

} // namespace o2::framework
Expand Down
60 changes: 38 additions & 22 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask()
// Whenever we InitTask, we consider as if the previous iteration
// was successful, so that even if there is no timer or receiving
// channel, we can still start an enumeration.
mWasActive = true;
DataProcessorContext* initialContext = nullptr;
bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
if (!idle) {
LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
}

// We should be ready to run here. Therefore we copy all the
// required parts in the DataProcessorContext. Eventually we should
Expand Down Expand Up @@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask()

void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext)
{
context.wasActive = &mWasActive;

context.isSink = false;
// If nothing is a sink, the rate limiting simply does not trigger.
bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
Expand Down Expand Up @@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run()
{
ServiceRegistryRef ref{mServiceRegistry};
ref.get<DriverClient>().flushPending(mServiceRegistry);
auto shouldNotWait = (mWasActive &&
DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
// Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
// In such case we will take care of it at next iteration.
state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);

auto shouldNotWait = (lastActive != nullptr &&
(state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
(state.streaming == StreamingState::EndOfStreaming);
if (firstLoop) {
shouldNotWait = true;
firstLoop = false;
}
if (mWasActive) {
if (lastActive != nullptr) {
state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE;
}
if (NewStatePending()) {
Expand Down Expand Up @@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run()
} else {
auto ref = ServiceRegistryRef{mServiceRegistry};
ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
mWasActive = false;
}
} else {
mWasActive = false;
}
}

Expand All @@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");

*context.wasActive = false;
{
ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
}
Expand Down Expand Up @@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
socket.Events(&info.hasPendingEvents);
if (info.hasPendingEvents) {
info.readPolled = false;
*context.wasActive |= newMessages;
// In case there were messages, we consider it as activity
if (newMessages) {
state.lastActiveDataProcessor.store(&context);
}
}
O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
channelSpec.name.c_str(), info.id.value);
Expand All @@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
auto& spec = ref.get<DeviceSpec const>();

if (state.streaming == StreamingState::Idle) {
*context.wasActive = false;
return;
}

context.completed.clear();
context.completed.reserve(16);
*context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed);
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
state.lastActiveDataProcessor.store(&context);
}
DanglingContext danglingContext{*context.registry};

context.preDanglingCallbacks(danglingContext);
if (*context.wasActive == false) {
if (state.lastActiveDataProcessor.load() == nullptr) {
ref.get<CallbackService>().call<CallbackService::Id::Idle>();
}
auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
*context.wasActive |= activity.expiredSlots > 0;
if (activity.expiredSlots > 0) {
state.lastActiveDataProcessor = &context;
}

context.completed.clear();
*context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed);
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
state.lastActiveDataProcessor = &context;
}

context.postDanglingCallbacks(danglingContext);

Expand All @@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// framework itself.
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
switchState(StreamingState::EndOfStreaming);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
}

if (state.streaming == StreamingState::EndOfStreaming) {
Expand Down Expand Up @@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// This is needed because the transport is deleted before the device.
relayer.clear();
switchState(StreamingState::Idle);
*context.wasActive = shouldProcess;
// In case we should process, note the data processor responsible for it
if (shouldProcess) {
state.lastActiveDataProcessor = &context;
}
// On end of stream we shut down all output pollers.
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
for (auto& poller : state.activeOutputPollers) {
Expand Down Expand Up @@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
auto ref = ServiceRegistryRef{*context.registry};
auto& stats = ref.get<DataProcessingStats>();
auto& state = ref.get<DeviceState>();
auto& parts = info.parts;
stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});

Expand All @@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
info.state = sih->state;
insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
continue;
}
auto dih = o2::header::get<DomainInfoHeader*>(headerData);
if (dih) {
O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
continue;
}
auto dh = o2::header::get<DataHeader*>(headerData);
Expand Down Expand Up @@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&

auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
auto& relayer = ref.get<DataRelayer>();
auto& state = ref.get<DeviceState>();
static WaitBackpressurePolicy policy;
auto& parts = info.parts;
// We relay execution to make sure we have a complete set of parts
Expand Down Expand Up @@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
case InputType::SourceInfo: {
LOGP(detail, "Received SourceInfo");
auto& context = ref.get<DataProcessorContext>();
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
auto headerIndex = input.position;
auto payloadIndex = input.position + 1;
assert(payloadIndex < parts.Size());
Expand All @@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
/// We have back pressure, therefore we do not process DomainInfo anymore.
/// until the previous message are processed.
auto& context = ref.get<DataProcessorContext>();
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
auto headerIndex = input.position;
auto payloadIndex = input.position + 1;
assert(payloadIndex < parts.Size());
Expand Down Expand Up @@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
auto& context = ref.get<DataProcessorContext>();
context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
}
auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
parts.fParts.erase(it, parts.end());
Expand Down

0 comments on commit 2f7add5

Please sign in to comment.