diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index 6d0429060..f4ba1051e 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -73,7 +73,7 @@ class JEventProcessor : public jana::components::JComponent, // any contention. if (m_status == Status::Uninitialized) { - DoInitialize(); + throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()"); } else if (m_status == Status::Finalized) { throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); @@ -121,15 +121,16 @@ class JEventProcessor : public jana::components::JComponent, auto run_number = event->GetRunNumber(); - if (m_status == Status::Uninitialized) { - DoInitialize(); - } - else if (m_status == Status::Finalized) { - throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); - } { // Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun(). std::lock_guard lock(m_mutex); + + if (m_status == Status::Uninitialized) { + throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()"); + } + else if (m_status == Status::Finalized) { + throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()"); + } if (m_last_run_number != run_number) { if (m_last_run_number != -1) { CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index 107768740..31d48e5db 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -52,9 +52,25 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& void JEventMapArrow::initialize() { LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + processor->DoInitialize(); + LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + } + } + LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END; } void JEventMapArrow::finalize() { LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END; + processor->DoFinalize(); + LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END; + } + } + LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END; } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 3e20f8cfe..abded05c6 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -381,6 +381,27 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par parent_folder->attach_child_out(pool_at_level); } + // Finally, we recur over lower levels! + if (need_unfold) { + auto next_level = unfolders_at_level[0]->GetChildLevel(); + attach_level(next_level, unfold_arrow, fold_arrow); + } + else { + // This is the lowest level + // TODO: Improve logic for determining event counts for multilevel topologies + if (tap_arrow != nullptr) { + tap_arrow->set_is_sink(true); + } + else if (map2_arrow != nullptr) { + map2_arrow->set_is_sink(true); + } + else if (map1_arrow != nullptr) { + map1_arrow->set_is_sink(true); + } + else if (src_arrow != nullptr) { + src_arrow->set_is_sink(true); + } + } } diff --git a/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc b/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc index e43edd642..1137642de 100644 --- a/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc +++ b/src/programs/unit_tests/Components/JEventProcessorSequentialTests.cc @@ -49,9 +49,8 @@ TEST_CASE("JEventProcessorSequentialRootTests") { app.Add(new JEventSource()); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 4); - app.SetParameterValue("jana:event_source_chunksize", 1); - app.SetParameterValue("jana:event_processor_chunksize", 1); - app.SetParameterValue("jana:loglevel", "warn"); + //app.SetParameterValue("jana:loglevel", "debug"); + //app.SetParameterValue("jana:log:show_threadstamp", true); auto proc = new MyRootProcessor; app.Add(proc); app.Run(true); @@ -135,9 +134,8 @@ TEST_CASE("JEventProcessorSequentialTests") { app.Add(new JEventSource()); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 4); - app.SetParameterValue("jana:event_source_chunksize", 1); - app.SetParameterValue("jana:event_processor_chunksize", 1); - app.SetParameterValue("jana:loglevel", "warn"); + //app.SetParameterValue("jana:loglevel", "debug"); + //app.SetParameterValue("jana:log:show_threadstamp", 1); auto proc = new MySeqProcessor; app.Add(proc); app.Run(true);