Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor UI blocks and use multithreaded scheduler #149

Merged
merged 8 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include(FetchContent)
FetchContent_Declare(
opencmw-cpp
GIT_REPOSITORY https://github.com/fair-acc/opencmw-cpp.git
GIT_TAG 57f31a19d8998da944ec73223d7f3fba4feeb324
GIT_TAG a7a7c5c319b93ddfaf160893665a011d2d88bff8 # main as of 2024-02-06
)

FetchContent_Declare(
Expand Down
12 changes: 6 additions & 6 deletions src/service/dashboard/defaultDashboard.flowgraph
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ blocks:
- name: FFT
id: FFT
- name: sum sigs
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sine source 1
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.100000
- name: source for sink 1
id: sink_source
- name: source for sink 2
id: sink_source
- name: remote source 1
id: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
id: opendigitizer::RemoteSource
parameters:
remote_uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
- name: sink 1
id: sink
- name: sink 2
Expand All @@ -29,6 +32,3 @@ connections:
- [sum sigs, 0, sink 2, 0]
- [sine source 1, 0, sink 3, 0]
- [remote source 1, 0, sink 4, 0]
remote_sources:
- uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
3 changes: 0 additions & 3 deletions src/ui/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ set(sources
flowgraph.cpp
flowgraphitem.cpp
dashboard.cpp
flowgraph/datasource.cpp
flowgraph/datasink.cpp
flowgraph/remotedatasource.cpp
flowgraph/arithmetic_block.cpp
dashboardpage.cpp
opendashboardpage.cpp
imguiutils.cpp
Expand Down
30 changes: 19 additions & 11 deletions src/ui/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
#include "common.h"
#include "dashboard.h"
#include "dashboardpage.h"
#include "flowgraph.h"
#include "flowgraphitem.h"
#include "opendashboardpage.h"

#include <gnuradio-4.0/Scheduler.hpp>

struct ImFont;

namespace DigitizerUi {
Expand Down Expand Up @@ -72,13 +73,14 @@ struct App {
ImFont *fontIconsSolidBig;
ImFont *fontIconsSolidLarge;
std::chrono::seconds editPaneCloseDelay{ 15 };
// The thread limit here is mainly for emscripten
std::shared_ptr<gr::thread_pool::BasicThreadPool> schedulerThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>("scheduler-pool", gr::thread_pool::CPU_BOUND, 2, 4);

template<typename Scheduler, typename Graph>
template<typename Graph>
void assignScheduler(Graph &&graph) {
if (m_scheduler) {
m_garbageSchedulers.push_back(std::move(m_scheduler));
}
m_scheduler.emplace<Scheduler>(std::forward<Graph>(graph));
using Scheduler = gr::scheduler::Simple<gr::scheduler::multiThreaded>;

m_scheduler.emplace<Scheduler>(std::forward<Graph>(graph), schedulerThreadPool);
}

private:
Expand All @@ -95,26 +97,32 @@ struct App {
};
template<typename T>
struct HandlerImpl : Handler {
T data;
std::thread thread;
T data;
std::thread thread;
std::atomic<bool> stopRequested = false;

template<typename... Args>
explicit HandlerImpl(Args &&...args)
: data(std::forward<Args>(args)...) {
thread = std::thread([this]() {
data.init();
data.runAndWait();
data.start();
while (!stopRequested && data.isProcessing()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
data.stop();
});
}
~HandlerImpl() {
stopRequested = true;
thread.join();
}
};

std::unique_ptr<Handler> handler;
};

SchedWrapper m_scheduler;
std::vector<SchedWrapper> m_garbageSchedulers; // TODO: Cleaning up schedulers needs support in opencmw to return unsubscription confirmation
SchedWrapper m_scheduler;

App();

Expand Down
6 changes: 3 additions & 3 deletions src/ui/assets/sampleDashboards/DemoDashboard.grc
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
blocks:
- name: sum sigs
id: Arithmetic
id: opendigitizer::Arithmetic
- name: FFT
id: FFT
- name: sine source 1
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 1.000000
- name: sine source 3
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 1.300000
- name: sink 1
Expand Down
14 changes: 7 additions & 7 deletions src/ui/assets/sampleDashboards/ExtendedDemoDashboard.grc
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
blocks:
- name: sum sigs1
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sum sigs2
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sum sigs3
id: Arithmetic
id: opendigitizer::Arithmetic
- name: FFT
id: FFT
- name: sine source 3
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.100000
- name: sine source 4
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.200000
- name: sine source 5
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.300000
- name: sine source 6
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.400000
- name: source for sink 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#include "arithmetic_block.h"
#ifndef OPENDIGITIZER_ARITHMETIC_HPP
#define OPENDIGITIZER_ARITHMETIC_HPP

#include <type_traits>
#include <gnuradio-4.0/Block.hpp>

#include "../flowgraph.h"
namespace opendigitizer {

template<typename T>
requires std::is_arithmetic_v<T>
struct MathNode : public gr::Block<MathNode<T>> {
gr::PortIn<T> in1{};
gr::PortIn<T> in2{};
struct Arithmetic : public gr::Block<Arithmetic<T>> {
gr::PortIn<T> in1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally OK for this PR but we should move all of these isolated block implementation to the GR 4.0 repo and also make them conform and check the corresponding items in summary issue EPIC: List of GR 3.10 blocks & Others to be ported to GR 4.0

gr::PortIn<T> in2;

gr::PortOut<T> out{};
gr::PortOut<T> out;

gr::Annotated<std::string, "operation"> operation = std::string("+");

Expand All @@ -33,12 +34,8 @@ struct MathNode : public gr::Block<MathNode<T>> {
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (MathNode<T>), in1, in2, out, operation);
} // namespace opendigitizer

namespace DigitizerUi {
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::Arithmetic, in1, in2, out, operation)

void ArithmeticBlock::registerBlockType() {
BlockType::registry().addBlockType<MathNode>("Arithmetic");
}

} // namespace DigitizerUi
#endif
107 changes: 107 additions & 0 deletions src/ui/blocks/RemoteSource.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#ifndef OPENDIGITIZER_REMOTESOURCE_HPP
#define OPENDIGITIZER_REMOTESOURCE_HPP

#include <gnuradio-4.0/Block.hpp>

#include <daq_api.hpp>

#include <IoSerialiserYaS.hpp>
#include <MdpMessage.hpp>
#include <opencmw.hpp>
#include <RestClient.hpp>
#include <type_traits>

namespace opendigitizer {

template<typename T>
requires std::is_same_v<T, float>
struct RemoteSource : public gr::Block<RemoteSource<T>> {
gr::PortOut<float> out;
std::string remote_uri;
std::string signal_name;
opencmw::client::RestClient _client;

struct Data {
opendigitizer::acq::Acquisition data;
std::size_t read = 0;
};

struct Queue {
std::deque<Data> data;
std::mutex mutex;
};

std::shared_ptr<Queue> _queue = std::make_shared<Queue>();

auto processBulk(gr::PublishableSpan auto &output) noexcept {
std::size_t written = 0;
std::lock_guard lock(_queue->mutex);
while (written < output.size() && !_queue->data.empty()) {
auto &d = _queue->data.front();
auto in = std::span<const float>(d.data.channelValue.begin(), d.data.channelValue.end());
in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read));

std::copy(in.begin(), in.end(), output.begin() + written);
written += in.size();
d.read += in.size();
if (d.read == d.data.channelValue.size()) {
_queue->data.pop_front();
}
}
output.publish(written);
return gr::work::Status::OK;
}

void
settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) {
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
fmt::print("Unsubscribing from {}\n", oldUri);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [oldUri](const opencmw::mdp::Message &) {
// TODO: Add cleanup once openCMW starts calling the callback
// on successful unsubscribe
fmt::print("Unsubscribed from {} successfully\n", oldUri);
};
}
}

opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.topic = opencmw::URI<>(remote_uri);
fmt::print("Subscribing to {}\n", remote_uri);

std::weak_ptr maybeQueue = _queue;

command.callback = [maybeQueue](const opencmw::mdp::Message &rep) {
if (rep.data.empty()) {
return;
}
try {
auto queue = maybeQueue.lock();
if (!queue) {
return;
}
auto buf = rep.data;
opendigitizer::acq::Acquisition acq;
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq);
std::lock_guard lock(queue->mutex);
queue->data.push_back({ std::move(acq), 0 });
} catch (opencmw::ProtocolException &e) {
fmt::print(std::cerr, "{}\n", e.what());
return;
}
};
_client.request(command);
}
};

} // namespace opendigitizer

ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name)

#endif
45 changes: 24 additions & 21 deletions src/ui/flowgraph/datasource.cpp → src/ui/blocks/SineSource.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "datasource.h"

#include <fmt/format.h>
#include <math.h>
#include <mutex>
#ifndef OPENDIGITIZER_SINESOURCE_HPP
#define OPENDIGITIZER_SINESOURCE_HPP

#include <fmt/format.h>
#include <gnuradio-4.0/Block.hpp>

#include "../flowgraph.h"
#include <mutex>

namespace opendigitizer {

template<typename T>
requires std::is_arithmetic_v<T>
Expand Down Expand Up @@ -40,31 +39,35 @@ struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> {
}

~SineSource() {
std::unique_lock guard(mutex);
quit = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be replaced and synchronised with EPIC: List of GR 3.10 blocks & Others to be ported to GR 4.0. Also there is already a sine and cosine source in GR 4.0.

@drslebedev maybe you could comment/document the usage and check this if this conforms to the functionality in GR 3.X.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the example of SignalGenerator which can generate sine wave (this is the default signal type), note that ClockSource is required sa input to SignalGenerator:

constexpr std::uint32_t n_samples   = 200;
constexpr float         sample_rate = 1000.f;
Graph                   testGraph;
auto                   &clockSrc  = testGraph.emplaceBlock<gr::basic::ClockSource<float>>({ { "sample_rate", sample_rate }, { "n_samples_max", n_samples }, { "name", "ClockSource" } });
auto                   &signalGen = testGraph.emplaceBlock<SignalGenerator<float>>({ { "sample_rate", sample_rate }, { "name", "SignalGenerator" } });

testGraph.connect<"out">(clockSrc).to<"in">(signalGen);
testGraph.connect<"out">(signalGen).to<"in">(sink);

scheduler::Simple sched{ std::move(testGraph) };
sched.runAndWait();

thread.join();
conditionvar.notify_all();
}

T processOne() {
auto processBulk(gr::PublishableSpan auto &output) {
// technically, this wouldn't have to block, but could just publish 0 samples,
// but keep it as test case for BlockingIO<true>.
std::unique_lock guard(mutex);
if (samples.size() == 0) {
while (samples.empty() && !quit) {
conditionvar.wait(guard);
}

T v = samples.front();
samples.pop_front();
out.max_samples = std::max<int>(1, samples.size());
return v;
const auto n = std::min(output.size(), samples.size());
if (n == 0) {
output.publish(0);
return gr::work::Status::OK;
}

std::copy(samples.begin(), samples.begin() + n, output.begin());
samples.erase(samples.begin(), samples.begin() + n);
output.publish(n);
return gr::work::Status::OK;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (SineSource<T>), out, frequency);
static_assert(gr::traits::block::can_processOne<SineSource<float>>);
static_assert(gr::traits::block::can_processOne<SineSource<double>>);

namespace DigitizerUi {
} // namespace opendigitizer

void DataSource::registerBlockType() {
BlockType::registry().addBlockType<SineSource>("sine_source");
}
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::SineSource, out, frequency)

} // namespace DigitizerUi
#endif
Loading
Loading