Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of multiple JEventSources #176

Merged
merged 5 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/examples/SubeventCUDAExample/SubeventCUDAExample.cu
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ int main() {
app.SetTicker(false);

auto source = new SimpleSource("simpleSource");
source->SetRange(0, 10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
Expand Down
4 changes: 2 additions & 2 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ int main() {
app.SetTicker(false);

auto source = new SimpleSource("simpleSource");
source->SetRange(0, 10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
source,
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
Expand Down
44 changes: 22 additions & 22 deletions src/libraries/JANA/Engine/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
using SourceStatus = JEventSource::RETURN_STATUS;

JEventSourceArrow::JEventSourceArrow(std::string name,
JEventSource* source,
std::vector<JEventSource*> sources,
EventQueue* output_queue,
std::shared_ptr<JEventPool> pool
)
: JArrow(name, false, NodeType::Source)
, m_source(source)
, m_sources(sources)
, m_output_queue(output_queue)
, m_pool(pool) {
}
Expand Down Expand Up @@ -48,24 +48,18 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) {
in_status = JEventSource::ReturnStatus::TryAgain;
break;
}
if (event->GetJEventSource() != m_source) {
// If we have multiple event sources, we need to make sure we are using
// event-source-specific factories on top of the default ones.
// This is obviously not the best way to handle this but I'll need to
// rejigger the whole thing anyway when we re-add parallel event sources.
auto factory_set = new JFactorySet();
auto src_fac_gen = m_source->GetFactoryGenerator();
if (src_fac_gen != nullptr) {
src_fac_gen->GenerateFactories(factory_set);
while (m_current_source < m_sources.size()) {
in_status = m_sources[m_current_source]->DoNext(event);

if (in_status == JEventSource::ReturnStatus::Finished) {
m_current_source++;
// TODO: Adjust nskip and nevents for the new source
}
else {
// This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
break;
}
factory_set->Merge(*event->GetFactorySet());
event->SetFactorySet(factory_set);
event->SetJEventSource(m_source);
}
event->SetSequential(false);
event->SetJApplication(m_source->GetApplication());
event->GetJCallGraphRecorder()->Reset();
in_status = m_source->DoNext(event);
if (in_status == JEventSource::ReturnStatus::Success) {
m_chunk_buffer.push_back(std::move(event));
}
Expand Down Expand Up @@ -108,11 +102,17 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) {
}

void JEventSourceArrow::initialize() {
m_source->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
// Initialization of individual sources happens on-demand, in order to keep us from having lots of open files
}

void JEventSourceArrow::finalize() {
m_source->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
// Generally JEventSources finalize themselves as soon as they detect that they have run out of events.
// However, we can't rely on the JEventSources turning themselves off since execution can be externally paused.
// Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then.
for (JEventSource* source : m_sources) {
if (source->GetStatus() == JEventSource::SourceStatus::Opened) {
LOG_INFO(m_logger) << "Finalizing JEventSource '" << source->GetTypeName() << "' (" << source->GetResourceName() << ")" << LOG_END;
source->DoFinalize();
}
}
}
5 changes: 3 additions & 2 deletions src/libraries/JANA/Engine/JEventSourceArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ class JEventPool;

class JEventSourceArrow : public JArrow {
private:
JEventSource* m_source;
std::vector<JEventSource*> m_sources;
size_t m_current_source = 0;
EventQueue* m_output_queue;
std::shared_ptr<JEventPool> m_pool;
std::vector<Event> m_chunk_buffer;

public:
JEventSourceArrow(std::string name, JEventSource* source, EventQueue* output_queue, std::shared_ptr<JEventPool> pool);
JEventSourceArrow(std::string name, std::vector<JEventSource*> sources, EventQueue* output_queue, std::shared_ptr<JEventPool> pool);
void initialize() final;
void finalize() final;
void execute(JArrowMetrics& result, size_t location_id) final;
Expand Down
20 changes: 9 additions & 11 deletions src/libraries/JANA/Engine/JTopologyBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,15 @@ class JTopologyBuilder : public JService {
// 2. Oftentimes we want to call JApplication::Initialize() just to set up plugins and services, i.e. for testing.
// We don't want to force the user to create a dummy event source if they know they are never going to call JApplication::Run().

for (auto src: m_components->get_evt_srces()) {

// create arrow for each source. Don't open until arrow.activate() called
JArrow *arrow = new JEventSourceArrow(src->GetName(), src, queue, m_topology->event_pool);
arrow->set_backoff_tries(0);
m_topology->arrows.push_back(arrow);
m_topology->sources.push_back(arrow);
arrow->set_chunksize(m_event_source_chunksize);
arrow->set_logger(m_arrow_logger);
arrow->set_running_arrows(&m_topology->running_arrow_count);
}
// Create arrow for sources.
JArrow *arrow = new JEventSourceArrow("sources", m_components->get_evt_srces(), queue, m_topology->event_pool);
arrow->set_backoff_tries(0);
m_topology->arrows.push_back(arrow);
m_topology->sources.push_back(arrow);
arrow->set_chunksize(m_event_source_chunksize);
arrow->set_logger(m_arrow_logger);
arrow->set_running_arrows(&m_topology->running_arrow_count);


auto proc_arrow = new JEventProcessorArrow("processors", queue, nullptr, m_topology->event_pool);
proc_arrow->set_chunksize(m_event_processor_chunksize);
Expand Down
37 changes: 29 additions & 8 deletions src/libraries/JANA/JEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <JANA/JException.h>
#include <JANA/Utils/JTypeInfo.h>
#include <JANA/JEvent.h>
#include <JANA/JFactoryGenerator.h>

#include <string>
#include <atomic>
Expand Down Expand Up @@ -143,7 +144,7 @@ class JEventSource {
throw(JException(e.what()));
}
catch (...) {
auto ex = JException("Unknown exception in JEventSource::Open()");
auto ex = JException("Unknown exception in JEventSource::Close()");
ex.nested_exception = std::current_exception();
ex.plugin_name = m_plugin_name;
ex.component_name = GetType();
Expand All @@ -164,17 +165,33 @@ class JEventSource {
}
if (m_status == SourceStatus::Opened) {
if (m_event_count < first_evt_nr) {
// Skip these events due to nskip
event->SetEventNumber(m_event_count); // Default event number to event count
auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h)
GetEvent(event);
event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
m_event_count += 1;
return ReturnStatus::TryAgain; // Reject this event and recycle it
} else if (m_nevents != 0 && (m_event_count == last_evt_nr)) {
m_status = SourceStatus::Finished;
// Declare ourselves finished due to nevents
DoFinalize(); // Close out the event source as soon as it declares itself finished
return ReturnStatus::Finished;
} else {
event->SetEventNumber(m_event_count); // Default event number to event count
// Actually emit an event.
// GetEvent() expects the following things from its incoming JEvent
event->SetEventNumber(m_event_count);
event->SetJApplication(m_application);
event->SetJEventSource(this);
event->SetSequential(false);
event->GetJCallGraphRecorder()->Reset();
if (event->GetJEventSource() != this && m_factory_generator != nullptr) {
// If we have multiple event sources, we need to make sure we are using
// event-source-specific factories on top of the default ones.
auto factory_set = new JFactorySet();
m_factory_generator->GenerateFactories(factory_set);
factory_set->Merge(*event->GetFactorySet());
event->SetFactorySet(factory_set);
}
auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h)
GetEvent(event);
event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin );
Expand All @@ -190,7 +207,7 @@ class JEventSource {
catch (RETURN_STATUS rs) {

if (rs == RETURN_STATUS::kNO_MORE_EVENTS) {
m_status = SourceStatus::Finished;
DoFinalize();
return ReturnStatus::Finished;
}
else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) {
Expand Down Expand Up @@ -260,6 +277,10 @@ class JEventSource {
//This should create default factories for all types available in the event source
JFactoryGenerator* GetFactoryGenerator() const { return m_factory_generator; }

uint64_t GetNSkip() { return m_nskip; }
uint64_t GetNEvents() { return m_nevents; }


/// SetTypeName is intended as a replacement to GetType(), which should be less confusing for the
/// user. It should be called from the constructor. For convenience, we provide a
/// NAME_OF_THIS macro so that the user doesn't have to type the class name as a string, which may
Expand Down Expand Up @@ -288,10 +309,10 @@ class JEventSource {
void SetPluginName(std::string plugin_name) { m_plugin_name = std::move(plugin_name); };

// Meant to be called by JANA
void SetRange(uint64_t nskip, uint64_t nevents) {
m_nskip = nskip;
m_nevents = nevents;
};
void SetNEvents(uint64_t nevents) { m_nevents = nevents; };

// Meant to be called by JANA
void SetNSkip(uint64_t nskip) { m_nskip = nskip; };


private:
Expand Down
6 changes: 5 additions & 1 deletion src/libraries/JANA/Services/JComponentManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ void JComponentManager::resolve_event_sources() {
m_app->SetDefaultParameter("jana:nskip", m_nskip, "Number of events that sources should skip before starting emitting");

for (auto source : m_evt_srces) {
source->SetRange(m_nskip, m_nevents);
// If nskip/nevents are set individually on JEventSources, respect those. Otherwise use global values.
// Note that this is not what we usually want when we have multiple event sources. It would make more sense to
// take the nskip/nevent slice across the stream of events emitted by each JEventSource in turn.
if (source->GetNSkip() == 0) source->SetNSkip(m_nskip);
if (source->GetNEvents() == 0) source->SetNEvents(m_nevents);
}
}

Expand Down
104 changes: 104 additions & 0 deletions src/programs/tests/NEventNSkipTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ struct NEventNSkipBoundedSource : public JEventSource {
std::atomic_int event_count {0};
int event_bound = 100;
std::vector<int> events_emitted;
std::atomic_int open_count{0};
std::atomic_int close_count{0};

NEventNSkipBoundedSource(std::string source_name, JApplication *app) : JEventSource(source_name, app) { }

Expand All @@ -22,6 +24,15 @@ struct NEventNSkipBoundedSource : public JEventSource {
event_count += 1;
events_emitted.push_back(event_count);
}

void Open() override {
open_count++;
LOG << "Opening source " << GetResourceName() << LOG_END;
}
void Close() override {
close_count++;
LOG << "Closing source " << GetResourceName() << LOG_END;
}
};


Expand Down Expand Up @@ -67,3 +78,96 @@ TEST_CASE("NEventNSkipTests") {
}
}

TEST_CASE("JEventSourceArrow with multiple JEventSources") {
JParameterManager* params = new JParameterManager;
params->SetParameter("log:debug","JArrow,JArrowProcessingController");
JApplication app(params);
auto source1 = new NEventNSkipBoundedSource("BoundedSource1", &app);
auto source2 = new NEventNSkipBoundedSource("BoundedSource2", &app);
auto source3 = new NEventNSkipBoundedSource("BoundedSource3", &app);
app.Add(source1);
app.Add(source2);
app.Add(source3);

SECTION("All three event sources initialize, run, and finish") {
source1->event_bound = 9;
source2->event_bound = 13;
source3->event_bound = 7;

app.SetParameterValue("jana:nskip", 0);
app.SetParameterValue("jana:nevents", 0);
app.SetParameterValue("nthreads", 4);
app.Run(true);

REQUIRE(app.GetExitCode() == (int) JApplication::ExitCode::Success);
REQUIRE(source1->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source2->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source3->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source1->open_count == 1);
REQUIRE(source2->open_count == 1);
REQUIRE(source3->open_count == 1);
REQUIRE(source1->close_count == 1);
REQUIRE(source2->close_count == 1);
REQUIRE(source3->close_count == 1);
REQUIRE(source1->GetEventCount() == 9);
REQUIRE(source2->GetEventCount() == 13);
REQUIRE(source3->GetEventCount() == 7);
REQUIRE(app.GetNEventsProcessed() == 9+13+7);
}

SECTION("All three event sources initialize, run, and finish, each using the same nskip,nevents (self-terminated)") {
source1->event_bound = 9;
source2->event_bound = 13;
source3->event_bound = 7;

app.SetParameterValue("jana:nskip", 3);
app.SetParameterValue("jana:nevents", 9);
app.SetParameterValue("nthreads", 4);
app.Run(true);

REQUIRE(app.GetExitCode() == (int) JApplication::ExitCode::Success);
REQUIRE(source1->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source2->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source3->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source1->open_count == 1);
REQUIRE(source2->open_count == 1);
REQUIRE(source3->open_count == 1);
REQUIRE(source1->close_count == 1);
REQUIRE(source2->close_count == 1);
REQUIRE(source3->close_count == 1);
REQUIRE(source1->GetEventCount() == 9); // 3 dropped, 6 emitted
REQUIRE(source2->GetEventCount() == 12); // 3 dropped, 9 emitted
REQUIRE(source3->GetEventCount() == 7); // 3 dropped, 4 emitted
REQUIRE(app.GetNEventsProcessed() == 19);
}


SECTION("All three event sources initialize, run, and finish, using individualized nskip,nevents (nevents-terminated)") {
source1->event_bound = 9;
source2->event_bound = 13;
source3->event_bound = 7;
source1->SetNSkip(2);
source1->SetNEvents(4);
source3->SetNEvents(4);

app.SetParameterValue("nthreads", 4);
app.Run(true);

REQUIRE(app.GetExitCode() == (int) JApplication::ExitCode::Success);
REQUIRE(source1->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source2->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source3->GetStatus() == JEventSource::SourceStatus::Finished);
REQUIRE(source1->open_count == 1);
REQUIRE(source2->open_count == 1);
REQUIRE(source3->open_count == 1);
REQUIRE(source1->close_count == 1);
REQUIRE(source2->close_count == 1);
REQUIRE(source3->close_count == 1);
REQUIRE(source1->GetEventCount() == 6); // 2 dropped, 4 emitted
REQUIRE(source2->GetEventCount() == 13); // 13 emitted
REQUIRE(source3->GetEventCount() == 4); // 4 emitted
REQUIRE(app.GetNEventsProcessed() == 21);
}

}

2 changes: 1 addition & 1 deletion src/programs/tests/SubeventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ TEST_CASE("Basic subevent arrow functionality") {

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
new SimpleSource("simpleSource"),
{new SimpleSource("simpleSource")},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
Expand Down
8 changes: 6 additions & 2 deletions src/programs/tests/TerminationTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ TEST_CASE("TerminationTests") {

SECTION("Arrow engine, interrupted during JEventSource::Open()") {

// TODO: This test is kind of useless now that JEventSource::Open is called from
// JEventSourceArrow::execute rather than JEventSourceArrow::initialize().
// What we really want is an Arrow that has an initialize() that we override.
// However to do that, we need to extend JESA and create a custom topology.
app.SetParameterValue("jana:engine", 0);
auto source = new InterruptedSource("InterruptedSource", &app);
app.Add(source);
app.Run(true);
REQUIRE(processor->processed_count == 0);
REQUIRE(processor->finish_call_count == 0);
REQUIRE(processor->processed_count == 1); // TODO: Was 0, should become zero again
REQUIRE(processor->finish_call_count == 1);
// Stop() tells JApplication to finish Initialize() but not to proceed with Run().
// If we had called Quit() instead, it would have exited Initialize() immediately and ended the program.

Expand Down