Skip to content

Commit

Permalink
Merge pull request #293 from JeffersonLab/nbrei_reorganize
Browse files Browse the repository at this point in the history
Refactoring: Separate "Engine" and "Topology" layers
  • Loading branch information
nathanwbrei authored Apr 29, 2024
2 parents c522549 + 886f99b commit c1378b3
Show file tree
Hide file tree
Showing 84 changed files with 693 additions and 851 deletions.
67 changes: 37 additions & 30 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

#include <JANA/JApplication.h>
#include <JANA/JObject.h>
#include <JANA/Engine/JSubeventArrow.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>
#include "JANA/Engine/JTopologyBuilder.h"

#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Topology/JEventProcessorArrow.h>
#include <JANA/Topology/JSubeventArrow.h>
#include "JANA/Topology/JTopologyBuilder.h"


struct MyInput : public JObject {
Expand Down Expand Up @@ -82,16 +85,6 @@ struct SimpleProcessor : public JEventProcessor {

int main() {

MyProcessor processor;
JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

JApplication app;
app.SetParameterValue("log:info", "JWorker,JScheduler,JArrowProcessingController,JEventProcessorArrow");
app.SetTimeoutEnabled(false);
Expand All @@ -100,25 +93,39 @@ int main() {
auto source = new SimpleSource();
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource
MyProcessor processor;

auto topology = app.GetService<JTopologyBuilder>();
topology->set_configure_fn([&](JTopologyBuilder& builder) {

JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
builder.arrows.push_back(split_arrow);
builder.arrows.push_back(subprocess_arrow);
builder.arrows.push_back(merge_arrow);
builder.arrows.push_back(proc_arrow);

source_arrow->attach(split_arrow);
split_arrow->attach(subprocess_arrow);
subprocess_arrow->attach(merge_arrow);
merge_arrow->attach(proc_arrow);
});

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

topology->arrows.push_back(source_arrow);
topology->arrows.push_back(split_arrow);
topology->arrows.push_back(subprocess_arrow);
topology->arrows.push_back(merge_arrow);
topology->arrows.push_back(proc_arrow);

source_arrow->attach(split_arrow);
split_arrow->attach(subprocess_arrow);
subprocess_arrow->attach(merge_arrow);
merge_arrow->attach(proc_arrow);

app.Run(true);

Expand Down
41 changes: 20 additions & 21 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,31 @@ set(JANA2_SOURCES
JMultifactory.h
JService.cc

Engine/JArrow.h
Engine/JArrowMetrics.h
Engine/JArrowPerfSummary.cc
Engine/JArrowPerfSummary.h
Engine/JArrowProcessingController.cc
Engine/JArrowProcessingController.h
Engine/JArrowTopology.cc
Engine/JArrowTopology.h
Engine/JEventProcessorArrow.cc
Engine/JEventProcessorArrow.h
Engine/JEventSourceArrow.cc
Engine/JEventSourceArrow.h
Engine/JEventMapArrow.h
Engine/JEventMapArrow.cc
Engine/JPool.h

Engine/JMailbox.h
Engine/JScheduler.cc
Engine/JScheduler.h
Engine/JSubeventArrow.h
Engine/JWorker.h
Engine/JWorker.cc
Engine/JWorkerMetrics.h
Engine/JTopologyBuilder.h
Engine/JPerfMetrics.cc
Engine/JPerfMetrics.h
Engine/JPerfSummary.cc
Engine/JPerfSummary.h

Topology/JArrow.h
Topology/JArrowMetrics.h
Topology/JEventProcessorArrow.cc
Topology/JEventProcessorArrow.h
Topology/JEventSourceArrow.cc
Topology/JEventSourceArrow.h
Topology/JEventMapArrow.h
Topology/JEventMapArrow.cc
Topology/JPool.h
Topology/JMailbox.h
Topology/JSubeventArrow.h
Topology/JTopologyBuilder.h
Topology/JTopologyBuilder.cc

Services/JComponentManager.cc
Services/JComponentManager.h
Expand All @@ -60,15 +61,11 @@ set(JANA2_SOURCES
Services/JParameterManager.h
Services/JPluginLoader.cc
Services/JPluginLoader.h
Services/JProcessingController.h
Services/JServiceLocator.h
Services/JEventGroupTracker.h

Status/JComponentSummary.h
Status/JComponentSummary.cc
Status/JPerfMetrics.cc
Status/JPerfMetrics.h
Status/JPerfSummary.h

Streaming/JDiscreteJoin.h
Streaming/JEventBuilder.h
Expand Down Expand Up @@ -245,6 +242,7 @@ file(GLOB jana_cli_headers "CLI/*.h*")
file(GLOB jana_compat_headers "Compatibility/*.h*")
file(GLOB jana_podio_headers "Podio/*.h*")
file(GLOB jana_omni_headers "Omni/*.h*")
file(GLOB jana_topology_headers "Topology/*.h*")

install(FILES ${jana_headers} DESTINATION include/JANA)
install(FILES ${jana_engine_headers} DESTINATION include/JANA/Engine)
Expand All @@ -256,6 +254,7 @@ install(FILES ${jana_calibs_headers} DESTINATION include/JANA/Calibrations)
install(FILES ${jana_cli_headers} DESTINATION include/JANA/CLI)
install(FILES ${jana_compat_headers} DESTINATION include/JANA/Compatibility)
install(FILES ${jana_omni_headers} DESTINATION include/JANA/Omni)
install(FILES ${jana_topology_headers} DESTINATION include/JANA/Topology)

if (${USE_PODIO})
install(FILES ${jana_podio_headers} DESTINATION include/JANA/Podio)
Expand Down
22 changes: 11 additions & 11 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// Subject to the terms in the LICENSE file found in the top-level directory.

#include <JANA/Engine/JArrowProcessingController.h>
#include <JANA/Engine/JArrowPerfSummary.h>
#include <JANA/Engine/JPerfSummary.h>
#include <JANA/Topology/JTopologyBuilder.h>
#include <JANA/Utils/JCpuInfo.h>
#include <JANA/JLogger.h>

Expand All @@ -18,6 +19,8 @@ void JArrowProcessingController::acquire_services(JServiceLocator * sl) {
m_worker_logger = ls->get_logger("JWorker");
m_scheduler_logger = ls->get_logger("JScheduler");

m_topology = sl->get<JTopologyBuilder>();

// Obtain timeouts from parameter manager
auto params = sl->get<JParameterManager>();
params->SetDefaultParameter("jana:timeout", m_timeout_s, "Max time (in seconds) JANA will wait for a thread to update its heartbeat before hard-exiting. 0 to disable timeout completely.");
Expand Down Expand Up @@ -157,14 +160,14 @@ bool JArrowProcessingController::is_timed_out() {
// Probably want to refactor so that we only make one such call per ticker iteration.
// Since we are storing our metrics summary anyway, we could call measure_performance()
// and have print_report(), print_final_report(), is_timed_out(), etc use the cached version
auto metrics = measure_internal_performance();
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->event_pool_size / metrics->thread_count) {
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
else if (!m_topology->limit_total_events_in_flight) {
else if (!m_topology->m_limit_total_events_in_flight) {
// New events are constantly emitted, each of which may contain jfactorysets which need to be warmed up
timeout_s = m_warmup_timeout_s;
}
Expand Down Expand Up @@ -219,16 +222,16 @@ JArrowProcessingController::~JArrowProcessingController() {
}

void JArrowProcessingController::print_report() {
auto metrics = measure_internal_performance();
auto metrics = measure_performance();
LOG_INFO(m_logger) << "Running" << *metrics << LOG_END;
}

void JArrowProcessingController::print_final_report() {
auto metrics = measure_internal_performance();
auto metrics = measure_performance();
LOG_INFO(m_logger) << "Final Report" << *metrics << LOG_END;
}

std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_internal_performance() {
std::unique_ptr<const JPerfSummary> JArrowProcessingController::measure_performance() {

// Measure perf on all Workers first, as this will prompt them to publish
// any ArrowMetrics they have collected
Expand Down Expand Up @@ -275,12 +278,9 @@ std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_int
? std::numeric_limits<double>::infinity()
: m_perf_summary.avg_throughput_hz / tighter_bottleneck;

return std::unique_ptr<JArrowPerfSummary>(new JArrowPerfSummary(m_perf_summary));
return std::unique_ptr<JPerfSummary>(new JPerfSummary(m_perf_summary));
}

std::unique_ptr<const JPerfSummary> JArrowProcessingController::measure_performance() {
return measure_internal_performance();
}



Expand Down
40 changes: 18 additions & 22 deletions src/libraries/JANA/Engine/JArrowProcessingController.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,51 @@
#ifndef JANA2_JARROWPROCESSINGCONTROLLER_H
#define JANA2_JARROWPROCESSINGCONTROLLER_H

#include <JANA/Services/JProcessingController.h>

#include <JANA/Engine/JArrow.h>
#include <JANA/Topology/JTopologyBuilder.h>
#include <JANA/Engine/JWorker.h>
#include <JANA/Engine/JArrowTopology.h>
#include <JANA/Engine/JArrowPerfSummary.h>
#include <JANA/Engine/JPerfSummary.h>

#include <vector>

class JArrowProcessingController : public JProcessingController {
class JArrowProcessingController : public JService {
public:

explicit JArrowProcessingController(std::shared_ptr<JArrowTopology> topology) : m_topology(topology) {};
~JArrowProcessingController() override;
void acquire_services(JServiceLocator *) override;

void initialize() override;
void run(size_t nthreads) override;
void scale(size_t nthreads) override;
void initialize();
void run(size_t nthreads);
void scale(size_t nthreads);
void request_pause();
void wait_until_paused();
void request_stop() override;
void wait_until_stopped() override;
void request_stop();
void wait_until_stopped();

bool is_stopped() override;
bool is_finished() override;
bool is_timed_out() override;
bool is_excepted() override;
bool is_stopped();
bool is_finished();
bool is_timed_out();
bool is_excepted();

std::vector<JException> get_exceptions() const override;
std::vector<JException> get_exceptions() const;

std::unique_ptr<const JPerfSummary> measure_performance() override;
std::unique_ptr<const JArrowPerfSummary> measure_internal_performance();
std::unique_ptr<const JPerfSummary> measure_performance();

void print_report() override;
void print_final_report() override;
void print_report();
void print_final_report();

// This is so we can test
inline JScheduler* get_scheduler() { return m_scheduler; }


private:
std::shared_ptr<JTopologyBuilder> m_topology;

using jclock_t = std::chrono::steady_clock;
int m_timeout_s = 8;
int m_warmup_timeout_s = 30;

JArrowPerfSummary m_perf_summary;
std::shared_ptr<JArrowTopology> m_topology; // Owned by JArrowProcessingController
JPerfSummary m_perf_summary;
JScheduler* m_scheduler = nullptr;

std::vector<JWorker*> m_workers;
Expand Down
27 changes: 0 additions & 27 deletions src/libraries/JANA/Engine/JArrowTopology.cc

This file was deleted.

Loading

0 comments on commit c1378b3

Please sign in to comment.