diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index 5050d4112..bac884cab 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -20,6 +20,7 @@ set(JANA2_SOURCES Topology/JEventProcessorArrow.cc Topology/JEventSourceArrow.cc Topology/JEventMapArrow.cc + Topology/JEventTapArrow.cc Topology/JTopologyBuilder.cc Services/JComponentManager.cc diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index e45fa0f1d..5f717db7b 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -159,7 +159,7 @@ bool JArrowProcessingController::is_timed_out() { auto metrics = measure_performance(); int timeout_s; - if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) { + if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) { // We are at the beginning and not all events have necessarily had a chance to warm up timeout_s = m_warmup_timeout_s; } diff --git a/src/libraries/JANA/Engine/JPerfSummary.cc b/src/libraries/JANA/Engine/JPerfSummary.cc index a82a7435c..8774ca488 100644 --- a/src/libraries/JANA/Engine/JPerfSummary.cc +++ b/src/libraries/JANA/Engine/JPerfSummary.cc @@ -24,9 +24,9 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl; os << std::endl; - os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; - os << " | Name | Type | Par | Threads | Thresh | Pending | Completed |" << std::endl; - os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl; + os << " | Name | Type | Par | Threads | Pending | Completed |" << std::endl; + os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl; for (auto as : s.arrows) { os << " | " @@ -37,17 +37,15 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { if (!as.is_source) { - os << std::setw(7) << as.threshold << " |" - << std::setw(8) << as.messages_pending << " |"; + os << std::setw(8) << as.messages_pending << " |"; } else { - - os << " - | - |"; + os << " - |"; } os << std::setw(12) << as.total_messages_completed << " |" << std::endl; } - os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl; os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl; diff --git a/src/libraries/JANA/Engine/JPerfSummary.h b/src/libraries/JANA/Engine/JPerfSummary.h index ed45c1acd..71ee2c75e 100644 --- a/src/libraries/JANA/Engine/JPerfSummary.h +++ b/src/libraries/JANA/Engine/JPerfSummary.h @@ -16,7 +16,6 @@ struct ArrowSummary { int running_upstreams; bool has_backpressure; size_t messages_pending; - size_t threshold; size_t total_messages_completed; size_t last_messages_completed; diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index 2d5ee6016..bbae230c9 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -411,7 +411,6 @@ void JScheduler::summarize_arrows(std::vector& summaries) { summary.is_source = as.arrow->is_source(); summary.is_sink = as.arrow->is_sink(); summary.messages_pending = as.arrow->get_pending(); - summary.threshold = as.arrow->get_threshold(); summary.thread_count = as.thread_count; summary.running_upstreams = as.active_or_draining_upstream_arrow_count; diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index 6d0429060..f4ba1051e 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -73,7 +73,7 @@ class JEventProcessor : public jana::components::JComponent, // any contention. if (m_status == Status::Uninitialized) { - DoInitialize(); + throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()"); } else if (m_status == Status::Finalized) { throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); @@ -121,15 +121,16 @@ class JEventProcessor : public jana::components::JComponent, auto run_number = event->GetRunNumber(); - if (m_status == Status::Uninitialized) { - DoInitialize(); - } - else if (m_status == Status::Finalized) { - throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); - } { // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun(). std::lock_guard lock(m_mutex); + + if (m_status == Status::Uninitialized) { + throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()"); + } + else if (m_status == Status::Finalized) { + throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()"); + } if (m_last_run_number != run_number) { if (m_last_run_number != -1) { CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 40f6ef264..1b5111f86 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -36,6 +36,7 @@ class JEventSource : public jana::components::JComponent, uint64_t m_nevents = 0; bool m_enable_finish_event = false; bool m_enable_get_objects = false; + bool m_enable_preprocess = false; public: @@ -145,11 +146,11 @@ class JEventSource : public jana::components::JComponent, bool IsGetObjectsEnabled() const { return m_enable_get_objects; } bool IsFinishEventEnabled() const { return m_enable_finish_event; } + bool IsPreprocessEnabled() const { return m_enable_preprocess; } uint64_t GetNSkip() { return m_nskip; } uint64_t GetNEvents() { return m_nevents; } - // TODO: Deprecate me virtual std::string GetVDescription() const { return ""; } ///< Optional for getting description via source rather than JEventSourceGenerator @@ -166,6 +167,7 @@ class JEventSource : public jana::components::JComponent, /// which will hurt performance. Conceptually, FinishEvent isn't great, and so should be avoided when possible. void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; } void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; } + void EnablePreprocess(bool enable=true) { m_enable_preprocess = enable; } void SetNEvents(uint64_t nevents) { m_nevents = nevents; }; void SetNSkip(uint64_t nskip) { m_nskip = nskip; }; diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 4a3deed81..f47c7329d 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -3,8 +3,6 @@ // Subject to the terms in the LICENSE file found in the top-level directory. #pragma once -#include -#include #include #include @@ -29,8 +27,6 @@ class JArrow { bool m_is_sink; // Whether or not tnis arrow contributes to the final event count JArrowMetrics m_metrics; // Performance information accumulated over all workers - mutable std::mutex m_arrow_mutex; // Protects access to arrow properties - friend class JScheduler; std::vector m_listeners; // Downstream Arrows @@ -77,11 +73,6 @@ class JArrow { // TODO: Make no longer virtual virtual size_t get_pending(); - // TODO: Get rid of me - virtual size_t get_threshold(); - - virtual void set_threshold(size_t /* threshold */); - void attach(JArrow* downstream) { m_listeners.push_back(downstream); }; @@ -113,18 +104,11 @@ struct PlaceRefBase { size_t max_item_count = 1; virtual size_t get_pending() { return 0; } - virtual size_t get_threshold() { return 0; } - virtual void set_threshold(size_t) {} }; template struct PlaceRef : public PlaceRefBase { - PlaceRef(JArrow* parent) { - assert(parent != nullptr); - parent->attach(this); - } - PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) { assert(parent != nullptr); parent->attach(this); @@ -133,28 +117,6 @@ struct PlaceRef : public PlaceRefBase { this->max_item_count = max_item_count; } - PlaceRef(JArrow* parent, JMailbox* queue, bool is_input, size_t min_item_count, size_t max_item_count) { - assert(parent != nullptr); - assert(queue != nullptr); - parent->attach(this); - this->place_ref = queue; - this->is_queue = true; - this->is_input = is_input; - this->min_item_count = min_item_count; - this->max_item_count = max_item_count; - } - - PlaceRef(JArrow* parent, JPool* pool, bool is_input, size_t min_item_count, size_t max_item_count) { - assert(parent != nullptr); - assert(pool != nullptr); - parent->attach(this); - this->place_ref = pool; - this->is_queue = false; - this->is_input = is_input; - this->min_item_count = min_item_count; - this->max_item_count = max_item_count; - } - void set_queue(JMailbox* queue) { assert(queue != nullptr); this->place_ref = queue; @@ -176,23 +138,6 @@ struct PlaceRef : public PlaceRefBase { return 0; } - size_t get_threshold() override { - assert(place_ref != nullptr); - if (is_input && is_queue) { - auto queue = static_cast*>(place_ref); - return queue->get_threshold(); - } - return -1; - } - - void set_threshold(size_t threshold) override { - assert(place_ref != nullptr); - if (is_input && is_queue) { - auto queue = static_cast*>(place_ref); - queue->set_threshold(threshold); - } - } - bool pull(Data& data) { assert(place_ref != nullptr); if (is_input) { // Actually pull the data @@ -267,19 +212,4 @@ inline size_t JArrow::get_pending() { return sum; } -inline size_t JArrow::get_threshold() { - size_t result = -1; - for (PlaceRefBase* place : m_places) { - result = std::min(result, place->get_threshold()); - } - return result; - -} - -inline void JArrow::set_threshold(size_t threshold) { - for (PlaceRefBase* place : m_places) { - place->set_threshold(threshold); - } -} - diff --git a/src/libraries/JANA/Topology/JArrowMetrics.h b/src/libraries/JANA/Topology/JArrowMetrics.h index fde19d6fc..12e783d79 100644 --- a/src/libraries/JANA/Topology/JArrowMetrics.h +++ b/src/libraries/JANA/Topology/JArrowMetrics.h @@ -5,6 +5,7 @@ #pragma once #include #include +#include class JArrowMetrics { diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index 107768740..dc4b48936 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -27,7 +27,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) { void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { - LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; for (JEventSource* source : m_sources) { JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope source->Preprocess(**event); @@ -45,16 +45,32 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& processor->DoMap(*event); } } - LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; success = true; status = JArrowMetrics::Status::KeepGoing; } void JEventMapArrow::initialize() { LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + processor->DoInitialize(); + LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + } + } + LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END; } void JEventMapArrow::finalize() { LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END; + processor->DoFinalize(); + LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END; + } + } + LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END; } diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 0c5bc48a7..c0394da18 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -133,7 +133,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St } else if (source_status == JEventSource::Result::FailureTryAgain){ // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater - LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureTryAgain"<< LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; success = false; arrow_status = JArrowMetrics::Status::ComeBackLater; return event; diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc index 412cfac07..d02a2c797 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -18,14 +18,14 @@ void JEventTapArrow::add_processor(JEventProcessor* proc) { void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { - LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; for (JEventProcessor* proc : m_procs) { JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { proc->DoTap(*event); } } - LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END; success = true; status = JArrowMetrics::Status::KeepGoing; } diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index 8c1b027c1..f416a488a 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -24,58 +24,16 @@ class JFoldArrow : public JArrow { public: JFoldArrow( std::string name, - //JEventFolder* folder, JEventLevel parent_level, - JEventLevel child_level, - JMailbox* child_in, - JEventPool* child_out, - JMailbox* parent_out) + JEventLevel child_level) : JArrow(std::move(name), false, false, false), // m_folder(folder), m_parent_level(parent_level), m_child_level(child_level), - m_child_in(this, child_in, true, 1, 1), - m_child_out(this, child_out, false, 1, 1), - m_parent_out(this, parent_out, false, 1, 1) - { - } - - JFoldArrow( - std::string name, - //JEventFolder* folder, - JEventLevel parent_level, - JEventLevel child_level, - JMailbox* child_in, - JMailbox* child_out, - JMailbox* parent_out) - - : JArrow(std::move(name), false, false, false), - // m_folder(folder), - m_parent_level(parent_level), - m_child_level(child_level), - m_child_in(this, child_in, true, 1, 1), - m_child_out(this, child_out, false, 1, 1), - m_parent_out(this, parent_out, false, 1, 1) - { - } - - JFoldArrow( - std::string name, - //JEventFolder* folder, - JEventLevel parent_level, - JEventLevel child_level, - JMailbox* child_in, - JEventPool* child_out, - JEventPool* parent_out) - - : JArrow(std::move(name), false, false, false), - // m_folder(folder), - m_parent_level(parent_level), - m_child_level(child_level), - m_child_in(this, child_in, true, 1, 1), - m_child_out(this, child_out, false, 1, 1), - m_parent_out(this, parent_out, false, 1, 1) + m_child_in(this, true, 1, 1), + m_child_out(this, false, 1, 1), + m_parent_out(this, false, 1, 1) { } diff --git a/src/libraries/JANA/Topology/JJunctionArrow.h b/src/libraries/JANA/Topology/JJunctionArrow.h index fffd02893..c8c07c6bf 100644 --- a/src/libraries/JANA/Topology/JJunctionArrow.h +++ b/src/libraries/JANA/Topology/JJunctionArrow.h @@ -13,10 +13,10 @@ template class JJunctionArrow : public JArrow { protected: - PlaceRef first_input {this}; - PlaceRef first_output {this}; - PlaceRef second_input {this}; - PlaceRef second_output {this}; + PlaceRef first_input {this, true, 1, 1}; + PlaceRef first_output {this, false, 1, 1}; + PlaceRef second_input {this, true, 1, 1}; + PlaceRef second_output {this, false, 1, 1}; public: using Status = JArrowMetrics::Status; diff --git a/src/libraries/JANA/Topology/JSubeventArrow.h b/src/libraries/JANA/Topology/JSubeventArrow.h index e90648b1e..8aeab67db 100644 --- a/src/libraries/JANA/Topology/JSubeventArrow.h +++ b/src/libraries/JANA/Topology/JSubeventArrow.h @@ -63,9 +63,6 @@ class JSubeventArrow : public JArrow { } size_t get_pending() final { return m_inbox->size(); }; - size_t get_threshold() final { return m_inbox->get_threshold(); }; - void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);}; - void execute(JArrowMetrics&, size_t location_id) override; }; @@ -83,9 +80,6 @@ class JSplitArrow : public JArrow { } size_t get_pending() final { return m_inbox->size(); }; - size_t get_threshold() final { return m_inbox->get_threshold(); }; - void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);}; - void execute(JArrowMetrics&, size_t location_id) override; }; @@ -104,8 +98,6 @@ class JMergeArrow : public JArrow { } size_t get_pending() final { return m_inbox->size(); }; - size_t get_threshold() final { return m_inbox->get_threshold(); }; - void set_threshold(size_t threshold) final { m_inbox->set_threshold(threshold);}; void execute(JArrowMetrics&, size_t location_id) override; }; diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index d55ae85b7..f581e072f 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -8,6 +8,7 @@ #include "JEventSourceArrow.h" #include "JEventProcessorArrow.h" #include "JEventMapArrow.h" +#include "JEventTapArrow.h" #include "JUnfoldArrow.h" #include "JFoldArrow.h" #include @@ -94,7 +95,7 @@ void JTopologyBuilder::create_topology() { static_cast(m_locality)); event_pool = new JEventPool(m_components, - m_event_pool_size, + m_pool_capacity, m_location_count, m_limit_total_events_in_flight); event_pool->init(); @@ -104,7 +105,7 @@ void JTopologyBuilder::create_topology() { LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << print_topology() << LOG_END; } else { - attach_top_level(JEventLevel::Run); + attach_level(JEventLevel::Run, nullptr, nullptr); LOG_INFO(GetLogger()) << "Arrow topology is:\n" << print_topology() << LOG_END; } int id=0; @@ -127,19 +128,20 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) { // We parse the 'nthreads' parameter two different ways for backwards compatibility. if (m_params->Exists("nthreads")) { if (m_params->GetParameterValue("nthreads") == "Ncores") { - m_event_pool_size = JCpuInfo::GetNumCpus(); + m_pool_capacity = JCpuInfo::GetNumCpus(); } else { - m_event_pool_size = m_params->GetParameterValue("nthreads"); + m_pool_capacity = m_params->GetParameterValue("nthreads"); } + m_queue_capacity = m_pool_capacity; } - m_params->SetDefaultParameter("jana:event_pool_size", m_event_pool_size, + m_params->SetDefaultParameter("jana:event_pool_size", m_pool_capacity, "Sets the initial size of the event pool. Having too few events starves the workers; having too many consumes memory and introduces overhead from extra factory initializations") ->SetIsAdvanced(true); m_params->SetDefaultParameter("jana:limit_total_events_in_flight", m_limit_total_events_in_flight, "Controls whether the event pool is allowed to automatically grow beyond jana:event_pool_size") ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_queue_threshold", m_event_queue_threshold, + m_params->SetDefaultParameter("jana:event_queue_threshold", m_queue_capacity, "Max number of events allowed on the main event queue. Higher => Better load balancing; Lower => Fewer events in flight") ->SetIsAdvanced(true); m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing, @@ -154,220 +156,254 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) { }; -void JTopologyBuilder::attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink) { - - std::stringstream ss; - ss << current_level; - - LOG_DEBUG(GetLogger()) << "JTopologyBuilder: Attaching components at lower level = " << current_level << LOG_END; - - JEventPool* pool = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight, - current_level); - pool->init(); - pools.push_back(pool); // Transfers ownership +void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downstream, size_t down_index) { + auto queue = new EventQueue(m_queue_capacity, mapping.get_loc_count(), m_enable_stealing); + queues.push_back(queue); - std::vector sources_at_level; - for (JEventSource* source : m_components->get_evt_srces()) { - if (source->GetLevel() == current_level) { - sources_at_level.push_back(source); - } - } - std::vector procs_at_level; - for (JEventProcessor* proc : m_components->get_evt_procs()) { - if (proc->GetLevel() == current_level) { - procs_at_level.push_back(proc); + size_t i = 0; + for (PlaceRefBase* place : upstream->m_places) { + if (!place->is_input) { + if (i++ == up_index) { + // Found the correct output + place->is_queue = true; + place->place_ref = queue; + } } } - std::vector unfolders_at_level; - for (JEventUnfolder* unfolder : m_components->get_unfolders()) { - if (unfolder->GetLevel() == current_level) { - unfolders_at_level.push_back(unfolder); + i = 0; + for (PlaceRefBase* place : downstream->m_places) { + if (place->is_input) { + if (i++ == down_index) { + // Found the correct input + place->is_queue = true; + place->place_ref = queue; + } } } + upstream->attach(downstream); +} - if (sources_at_level.size() != 0) { - throw JException("Support for lower-level event sources coming soon!"); - } - if (unfolders_at_level.size() != 0) { - throw JException("Support for lower-level event unfolders coming soon!"); - } - if (procs_at_level.size() == 0) { - throw JException("For now we require you to provide at least one JEventProcessor"); - } - - auto q1 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); - queues.push_back(q1); - - auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); - queues.push_back(q2); - - auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap"); - proc_arrow->set_input(q1); - proc_arrow->set_output(q2); - arrows.push_back(proc_arrow); - proc_arrow->set_logger(GetLogger()); - if (found_sink) { - proc_arrow->set_is_sink(false); - } - - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); +void JTopologyBuilder::connect_to_first_available(JArrow* upstream, std::vector downstreams) { + for (JArrow* downstream : downstreams) { + if (downstream != nullptr) { + // Arrows at the same level all connect at index 0 (even the input for the parent JFoldArrow) + connect(upstream, 0, downstream, 0); + return; + } } - - parent_unfolder->attach_child_in(pool); - parent_unfolder->attach_child_out(q1); - parent_folder->attach_child_in(q2); - parent_folder->attach_child_out(pool); - parent_unfolder->attach(proc_arrow); - proc_arrow->attach(parent_folder); } -void JTopologyBuilder::attach_top_level(JEventLevel current_level) { - +void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder) { std::stringstream ss; ss << current_level; auto level_str = ss.str(); + // Find all event sources at this level std::vector sources_at_level; for (JEventSource* source : m_components->get_evt_srces()) { if (source->GetLevel() == current_level) { sources_at_level.push_back(source); } } - if (sources_at_level.size() == 0) { - // Skip level entirely for now. Consider eventually supporting - // folding low levels into higher levels without corresponding unfold - LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; - JEventLevel next = next_level(current_level); - if (next == JEventLevel::None) { - LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; - return; - } - return attach_top_level(next); - } - LOG_DEBUG(GetLogger()) << "JTopologyBuilder: Attaching components at top level = " << current_level << LOG_END; - - // We've now found our top level. No matter what, we need an event pool for this level - JEventPool* pool_at_level = new JEventPool(m_components, - m_event_pool_size, - m_location_count, - m_limit_total_events_in_flight, - current_level); - pool_at_level->init(); - pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - - // There are two possibilities at this point: - // a. This is the only level, in which case we wire up the arrows and exit - // b. We have an unfolder/folder pair, in which case we wire everything up, and then recursively attach_lower_level(). - // We use the presence of an unfolder as our test for whether or not a lower level should be included. This is because - // the folder might be trivial and hence omitted by the user. (Note that some folder is always needed in order to return - // the higher-level event to the pool). - // The user always needs to provide an unfolder because I can't think of a trivial unfolder that would be useful. - + + // Find all unfolders at this level std::vector unfolders_at_level; for (JEventUnfolder* unfolder : m_components->get_unfolders()) { if (unfolder->GetLevel() == current_level) { unfolders_at_level.push_back(unfolder); } } + + // Find all processors at this level + std::vector mappable_procs_at_level; + std::vector tappable_procs_at_level; - std::vector procs_at_level; for (JEventProcessor* proc : m_components->get_evt_procs()) { if (proc->GetLevel() == current_level) { - procs_at_level.push_back(proc); + mappable_procs_at_level.push_back(proc); + if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { + tappable_procs_at_level.push_back(proc); + } } } - if (unfolders_at_level.size() == 0) { - // No unfolders, so this is the only level - // Attach the source to the map/tap just like before - // - // We might want to print a friendly warning message communicating why any lower-level - // components are being ignored, like so: - // skip_lower_level(next_level(current_level)); - - LOG_DEBUG(GetLogger()) << "JTopologyBuilder: No unfolders found at level " << current_level << ", finishing here." << LOG_END; - auto queue = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); - queues.push_back(queue); - - auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); - src_arrow->set_input(pool_at_level); - src_arrow->set_output(queue); - arrows.push_back(src_arrow); - - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap"); - proc_arrow->set_input(queue); - proc_arrow->set_output(pool_at_level); - arrows.push_back(proc_arrow); - - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); + bool is_top_level = (parent_unfolder == nullptr); + if (is_top_level && sources_at_level.size() == 0) { + // Skip level entirely when no source is present. + LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; + JEventLevel next = next_level(current_level); + if (next == JEventLevel::None) { + LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; + return; } - src_arrow->attach(proc_arrow); + return attach_level(next, nullptr, nullptr); } - else if (unfolders_at_level.size() != 1) { - throw JException("At most one unfolder must be provided for each level in the event hierarchy!"); + + // Enforce constraints on what our builder will accept (at least for now) + if (!is_top_level && !sources_at_level.empty()) { + throw JException("Topology forbids event sources at lower event levels in the topology"); } - else { - - auto q1 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); - auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); + if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) { + throw JException("Topology requires matching unfolder/folder arrow pairs"); + } + if (unfolders_at_level.size() > 1) { + throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str()); + } + // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by + // the level-skipping logic above + - queues.push_back(q1); - queues.push_back(q2); + // Fill out arrow grid from components at this event level + // -------------------------- + // 0. Pool + // -------------------------- + JEventPool* pool_at_level = new JEventPool(m_components, m_pool_capacity, m_location_count, m_limit_total_events_in_flight, current_level); + pool_at_level->init(); + pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); + // -------------------------- + // 1. Source + // -------------------------- + JEventSourceArrow* src_arrow = nullptr; + bool need_source = !sources_at_level.empty(); + if (need_source) { + src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); src_arrow->set_input(pool_at_level); - src_arrow->set_output(q1); + src_arrow->set_output(pool_at_level); arrows.push_back(src_arrow); + } - auto *map_arrow = new JEventMapArrow(level_str+"Map"); - map_arrow->set_input(q1); - map_arrow->set_output(q2); - arrows.push_back(map_arrow); - src_arrow->attach(map_arrow); + // -------------------------- + // 2. Map1 + // -------------------------- + bool have_parallel_sources = false; + for (JEventSource* source: sources_at_level) { + have_parallel_sources |= source->IsPreprocessEnabled(); + } + bool have_unfolder = !unfolders_at_level.empty(); + JEventMapArrow* map1_arrow = nullptr; + bool need_map1 = (have_parallel_sources || have_unfolder); + + if (need_map1) { + map1_arrow = new JEventMapArrow(level_str+"Map1"); + for (JEventSource* source: sources_at_level) { + if (source->IsPreprocessEnabled()) { + map1_arrow->add_source(source); + } + } + for (JEventUnfolder* unf: unfolders_at_level) { + map1_arrow->add_unfolder(unf); + } + map1_arrow->set_input(pool_at_level); + map1_arrow->set_output(pool_at_level); + arrows.push_back(map1_arrow); + } - // TODO: We are using q2 temporarily knowing that it will be overwritten in attach_lower_level. - // It would be better to rejigger how we validate PlaceRefs and accept empty placerefs/fewer ctor args - auto *unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0], q2, pool_at_level, q2); + // -------------------------- + // 3. Unfold + // -------------------------- + JUnfoldArrow* unfold_arrow = nullptr; + bool need_unfold = have_unfolder; + if (need_unfold) { + unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]); arrows.push_back(unfold_arrow); - map_arrow->attach(unfold_arrow); - - // child_in, child_out, parent_out - auto *fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel(), q2, pool_at_level, pool_at_level); - // TODO: Support user-provided folders - - bool found_sink = (procs_at_level.size() > 0); - attach_lower_level(unfolders_at_level[0]->GetChildLevel(), unfold_arrow, fold_arrow, found_sink); + } - // Push fold arrow back _after_ attach_lower_level so that arrows can be iterated over in order + // -------------------------- + // 4. Fold + // -------------------------- + JFoldArrow* fold_arrow = nullptr; + bool need_fold = have_unfolder; + if(need_fold) { + fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel()); arrows.push_back(fold_arrow); + fold_arrow->attach_parent_out(pool_at_level); + } - if (procs_at_level.size() != 0) { + // -------------------------- + // 5. Map2 + // -------------------------- + JEventMapArrow* map2_arrow = nullptr; + bool need_map2 = !mappable_procs_at_level.empty(); + if (need_map2) { + map2_arrow = new JEventMapArrow(level_str+"Map2"); + for (JEventProcessor* proc : mappable_procs_at_level) { + map2_arrow->add_processor(proc); + map2_arrow->set_input(pool_at_level); + map2_arrow->set_output(pool_at_level); + } + arrows.push_back(map2_arrow); + } - auto q3 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); - queues.push_back(q3); + // -------------------------- + // 6. Tap + // -------------------------- + JEventTapArrow* tap_arrow = nullptr; + bool need_tap = !tappable_procs_at_level.empty(); + if (need_tap) { + tap_arrow = new JEventTapArrow(level_str+"Tap"); + for (JEventProcessor* proc : tappable_procs_at_level) { + tap_arrow->add_processor(proc); + tap_arrow->set_input(pool_at_level); + tap_arrow->set_output(pool_at_level); + } + arrows.push_back(tap_arrow); + } - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap"); - proc_arrow->set_input(q3); - proc_arrow->set_output(pool_at_level); - arrows.push_back(proc_arrow); - for (auto proc: procs_at_level) { - proc_arrow->add_processor(proc); - } + // Now that we've set up our component grid, we can do wiring! + // -------------------------- + // 1. Source + // -------------------------- + if (parent_unfolder != nullptr) { + parent_unfolder->attach_child_in(pool_at_level); + connect_to_first_available(parent_unfolder, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder}); + } + if (src_arrow != nullptr) { + connect_to_first_available(src_arrow, {map1_arrow, unfold_arrow, map2_arrow, tap_arrow, parent_folder}); + } + if (map1_arrow != nullptr) { + connect_to_first_available(map1_arrow, {unfold_arrow, map2_arrow, tap_arrow, parent_folder}); + } + if (fold_arrow != nullptr) { + connect_to_first_available(fold_arrow, {map2_arrow, tap_arrow, parent_folder}); + } + if (map2_arrow != nullptr) { + connect_to_first_available(map2_arrow, {tap_arrow, parent_folder}); + } + if (tap_arrow != nullptr) { + connect_to_first_available(tap_arrow, {parent_folder}); + } + if (parent_folder != nullptr) { + parent_folder->attach_child_out(pool_at_level); + } - fold_arrow->attach_parent_out(q3); - fold_arrow->attach(proc_arrow); + // Finally, we recur over lower levels! + if (need_unfold) { + auto next_level = unfolders_at_level[0]->GetChildLevel(); + attach_level(next_level, unfold_arrow, fold_arrow); + } + else { + // This is the lowest level + // TODO: Improve logic for determining event counts for multilevel topologies + if (tap_arrow != nullptr) { + tap_arrow->set_is_sink(true); + } + else if (map2_arrow != nullptr) { + map2_arrow->set_is_sink(true); + } + else if (map1_arrow != nullptr) { + map1_arrow->set_is_sink(true); + } + else if (src_arrow != nullptr) { + src_arrow->set_is_sink(true); } } - } + + diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 05c3c9ca3..af1e29d19 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -35,8 +35,8 @@ class JTopologyBuilder : public JService { std::vector pools; // Pools shared between arrows // Topology configuration - size_t m_event_pool_size = 4; - size_t m_event_queue_threshold = 80; + size_t m_pool_capacity = 4; + size_t m_queue_capacity = 4; size_t m_location_count = 1; bool m_enable_stealing = false; bool m_limit_total_events_in_flight = true; @@ -64,9 +64,9 @@ class JTopologyBuilder : public JService { void create_topology(); - void attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink); - - void attach_top_level(JEventLevel current_level); + void attach_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder); + void connect_to_first_available(JArrow* upstream, std::vector downstreams); + void connect(JArrow* upstream, size_t upstream_index, JArrow* downstream, size_t downstream_index); std::string print_topology(); diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 1916c3980..5dd8c75cd 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -23,19 +23,17 @@ class JUnfoldArrow : public JArrow { JUnfoldArrow( std::string name, - JEventUnfolder* unfolder, - JMailbox* parent_in, - JEventPool* child_in, - JMailbox* child_out) + JEventUnfolder* unfolder) : JArrow(std::move(name), false, false, false), m_unfolder(unfolder), - m_parent_in(this, parent_in, true, 1, 1), - m_child_in(this, child_in, true, 1, 1), - m_child_out(this, child_out, false, 1, 1) + m_parent_in(this, true, 1, 1), + m_child_in(this, true, 1, 1), + m_child_out(this, false, 1, 1) { } + void attach_parent_in(JMailbox* parent_in) { m_parent_in.place_ref = parent_in; m_parent_in.is_queue = true; diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index ba90ea2d5..e3a7a467c 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -34,12 +34,37 @@ struct BarrierSource : public JEventSource { else { LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END; } - bench.consume_cpu_ms(50, 0, true); + bench.consume_cpu_ms(50, 0, false); return Result::Success; } }; +struct LegacyBarrierProcessor : public JEventProcessor { + + JBenchUtils bench; + std::mutex m_my_mutex; + + + void Process(const std::shared_ptr& event) override { + + bench.consume_cpu_ms(200, 0, true); + + std::lock_guard lock(m_my_mutex); + + if (event->GetSequential()) { + LOG_INFO(GetLogger()) << "Processing barrier event = " << event->GetEventNumber() << ", writing global var = " << global_resource+1 << LOG_END; + REQUIRE(global_resource == ((event->GetEventNumber() - 1) / 10)); + global_resource += 1; + } + else { + LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event->GetEventNumber() << ", reading global var = " << global_resource << LOG_END; + REQUIRE(global_resource == (event->GetEventNumber() / 10)); + } + bench.consume_cpu_ms(100, 0, true); + } +}; + struct BarrierProcessor : public JEventProcessor { @@ -48,6 +73,10 @@ struct BarrierProcessor : public JEventProcessor { BarrierProcessor() { SetCallbackStyle(CallbackStyle::ExpertMode); } + void ProcessParallel(const JEvent&) override { + bench.consume_cpu_ms(200, 0, false); + } + void Process(const JEvent& event) override { if (event.GetSequential()) { @@ -59,22 +88,34 @@ struct BarrierProcessor : public JEventProcessor { LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; REQUIRE(global_resource == (event.GetEventNumber() / 10)); } - bench.consume_cpu_ms(100, 0, true); + bench.consume_cpu_ms(100, 0, false); } }; TEST_CASE("BarrierEventTests") { - SECTION("Basic Barrier") { - JApplication app; - app.Add(new BarrierProcessor); - app.Add(new BarrierSource); - app.SetParameterValue("nthreads", 4); - app.SetParameterValue("jana:nevents", 40); - app.SetParameterValue("jana:log:show_threadstamp", true); - app.SetParameterValue("jana:loglevel", "debug"); - app.Run(true); - } + global_resource = 0; + JApplication app; + app.Add(new BarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 4); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); +}; + + +TEST_CASE("BarrierEventTests_Legacy") { + global_resource = 0; + JApplication app; + app.Add(new LegacyBarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 4); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); }; diff --git a/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc b/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc index e43edd642..1137642de 100644 --- a/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc +++ b/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc @@ -49,9 +49,8 @@ TEST_CASE("JEventProcessorSequentialRootTests") { app.Add(new JEventSource()); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 4); - app.SetParameterValue("jana:event_source_chunksize", 1); - app.SetParameterValue("jana:event_processor_chunksize", 1); - app.SetParameterValue("jana:loglevel", "warn"); + //app.SetParameterValue("jana:loglevel", "debug"); + //app.SetParameterValue("jana:log:show_threadstamp", true); auto proc = new MyRootProcessor; app.Add(proc); app.Run(true); @@ -135,9 +134,8 @@ TEST_CASE("JEventProcessorSequentialTests") { app.Add(new JEventSource()); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 4); - app.SetParameterValue("jana:event_source_chunksize", 1); - app.SetParameterValue("jana:event_processor_chunksize", 1); - app.SetParameterValue("jana:loglevel", "warn"); + //app.SetParameterValue("jana:loglevel", "debug"); + //app.SetParameterValue("jana:log:show_threadstamp", 1); auto proc = new MySeqProcessor; app.Add(proc); app.Run(true); diff --git a/src/programs/unit_tests/Components/UnfoldTests.cc b/src/programs/unit_tests/Components/UnfoldTests.cc index 59a0a7a87..92293eea0 100644 --- a/src/programs/unit_tests/Components/UnfoldTests.cc +++ b/src/programs/unit_tests/Components/UnfoldTests.cc @@ -64,7 +64,10 @@ TEST_CASE("UnfoldTests_Basic") { parent_queue.try_push(&ts2, 1); TestUnfolder unfolder; - JUnfoldArrow arrow("sut", &unfolder, &parent_queue, &child_pool, &child_queue); + JUnfoldArrow arrow("sut", &unfolder); + arrow.attach_parent_in(&parent_queue); + arrow.attach_child_in(&child_pool); + arrow.attach_child_out(&child_queue); JArrowMetrics m; arrow.initialize(); @@ -100,7 +103,10 @@ TEST_CASE("FoldArrowTests") { JMailbox*> child_out; JMailbox*> parent_out; - JFoldArrow arrow("sut", JEventLevel::Timeslice, JEventLevel::PhysicsEvent, &child_in, &child_out, &parent_out); + JFoldArrow arrow("sut", JEventLevel::Timeslice, JEventLevel::PhysicsEvent); + arrow.attach_child_in(&child_in); + arrow.attach_child_out(&child_out); + arrow.attach_parent_out(&parent_out); JArrowMetrics metrics; arrow.initialize(); diff --git a/src/programs/unit_tests/Topology/ArrowTests.cc b/src/programs/unit_tests/Topology/ArrowTests.cc index 4a3b8877d..20d261b93 100644 --- a/src/programs/unit_tests/Topology/ArrowTests.cc +++ b/src/programs/unit_tests/Topology/ArrowTests.cc @@ -14,10 +14,10 @@ struct TestMapArrow : public JJunctionArrow { JMailbox* qd) : JJunctionArrow("testmaparrow", false, false, true) { - first_input = {this, qi, true, 1, 1}; - first_output = {this, pi, false, 1, 1}; - second_input = {this, pd, true, 1, 1}; - second_output = {this, qd, false, 1, 1}; + first_input.set_queue(qi); + first_output.set_pool(pi); + second_input.set_pool(pd); + second_output.set_queue(qd); } Status process(Data& input_int, diff --git a/src/programs/unit_tests/Topology/MapArrow.h b/src/programs/unit_tests/Topology/MapArrow.h index 81790a5ef..6b3147345 100644 --- a/src/programs/unit_tests/Topology/MapArrow.h +++ b/src/programs/unit_tests/Topology/MapArrow.h @@ -81,10 +81,6 @@ class MapArrow : public JArrow { } size_t get_pending() final { return _input_queue->size(); } - - size_t get_threshold() final { return _input_queue->get_threshold(); } - - void set_threshold(size_t threshold) final { _input_queue->set_threshold(threshold); } };