-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor UI blocks and use multithreaded scheduler #149
Changes from all commits
0d4a14f
72ca3e0
25be4c4
545d0c3
c5d0c8b
70b86ff
4468bcf
90f3c2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
#ifndef OPENDIGITIZER_REMOTESOURCE_HPP | ||
#define OPENDIGITIZER_REMOTESOURCE_HPP | ||
|
||
#include <gnuradio-4.0/Block.hpp> | ||
|
||
#include <daq_api.hpp> | ||
|
||
#include <IoSerialiserYaS.hpp> | ||
#include <MdpMessage.hpp> | ||
#include <opencmw.hpp> | ||
#include <RestClient.hpp> | ||
#include <type_traits> | ||
|
||
namespace opendigitizer { | ||
|
||
template<typename T> | ||
requires std::is_same_v<T, float> | ||
struct RemoteSource : public gr::Block<RemoteSource<T>> { | ||
gr::PortOut<float> out; | ||
std::string remote_uri; | ||
std::string signal_name; | ||
opencmw::client::RestClient _client; | ||
|
||
struct Data { | ||
opendigitizer::acq::Acquisition data; | ||
std::size_t read = 0; | ||
}; | ||
|
||
struct Queue { | ||
std::deque<Data> data; | ||
std::mutex mutex; | ||
}; | ||
|
||
std::shared_ptr<Queue> _queue = std::make_shared<Queue>(); | ||
|
||
auto processBulk(gr::PublishableSpan auto &output) noexcept { | ||
std::size_t written = 0; | ||
std::lock_guard lock(_queue->mutex); | ||
while (written < output.size() && !_queue->data.empty()) { | ||
auto &d = _queue->data.front(); | ||
auto in = std::span<const float>(d.data.channelValue.begin(), d.data.channelValue.end()); | ||
in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read)); | ||
|
||
std::copy(in.begin(), in.end(), output.begin() + written); | ||
written += in.size(); | ||
d.read += in.size(); | ||
if (d.read == d.data.channelValue.size()) { | ||
_queue->data.pop_front(); | ||
} | ||
} | ||
output.publish(written); | ||
return gr::work::Status::OK; | ||
} | ||
|
||
void | ||
settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) { | ||
const auto oldValue = old_settings.find("remote_uri"); | ||
if (oldValue != old_settings.end()) { | ||
const auto oldUri = std::get<std::string>(oldValue->second); | ||
if (!oldUri.empty()) { | ||
fmt::print("Unsubscribing from {}\n", oldUri); | ||
opencmw::client::Command command; | ||
command.command = opencmw::mdp::Command::Unsubscribe; | ||
command.topic = opencmw::URI<>(remote_uri); | ||
command.callback = [oldUri](const opencmw::mdp::Message &) { | ||
// TODO: Add cleanup once openCMW starts calling the callback | ||
// on successful unsubscribe | ||
fmt::print("Unsubscribed from {} successfully\n", oldUri); | ||
}; | ||
} | ||
} | ||
|
||
opencmw::client::Command command; | ||
command.command = opencmw::mdp::Command::Subscribe; | ||
command.topic = opencmw::URI<>(remote_uri); | ||
fmt::print("Subscribing to {}\n", remote_uri); | ||
|
||
std::weak_ptr maybeQueue = _queue; | ||
|
||
command.callback = [maybeQueue](const opencmw::mdp::Message &rep) { | ||
if (rep.data.empty()) { | ||
return; | ||
} | ||
try { | ||
auto queue = maybeQueue.lock(); | ||
if (!queue) { | ||
return; | ||
} | ||
auto buf = rep.data; | ||
opendigitizer::acq::Acquisition acq; | ||
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq); | ||
std::lock_guard lock(queue->mutex); | ||
queue->data.push_back({ std::move(acq), 0 }); | ||
} catch (opencmw::ProtocolException &e) { | ||
fmt::print(std::cerr, "{}\n", e.what()); | ||
return; | ||
} | ||
}; | ||
_client.request(command); | ||
} | ||
}; | ||
|
||
} // namespace opendigitizer | ||
|
||
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name) | ||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,12 @@ | ||
#include "datasource.h" | ||
|
||
#include <fmt/format.h> | ||
#include <math.h> | ||
#include <mutex> | ||
#ifndef OPENDIGITIZER_SINESOURCE_HPP | ||
#define OPENDIGITIZER_SINESOURCE_HPP | ||
|
||
#include <fmt/format.h> | ||
#include <gnuradio-4.0/Block.hpp> | ||
|
||
#include "../flowgraph.h" | ||
#include <mutex> | ||
|
||
namespace opendigitizer { | ||
|
||
template<typename T> | ||
requires std::is_arithmetic_v<T> | ||
|
@@ -40,31 +39,35 @@ struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> { | |
} | ||
|
||
~SineSource() { | ||
std::unique_lock guard(mutex); | ||
quit = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be replaced and synchronised with EPIC: List of GR 3.10 blocks & Others to be ported to GR 4.0. Also there is already a sine and cosine source in GR 4.0. @drslebedev maybe you could comment/document the usage and check this if this conforms to the functionality in GR 3.X. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the example of constexpr std::uint32_t n_samples = 200;
constexpr float sample_rate = 1000.f;
Graph testGraph;
auto &clockSrc = testGraph.emplaceBlock<gr::basic::ClockSource<float>>({ { "sample_rate", sample_rate }, { "n_samples_max", n_samples }, { "name", "ClockSource" } });
auto &signalGen = testGraph.emplaceBlock<SignalGenerator<float>>({ { "sample_rate", sample_rate }, { "name", "SignalGenerator" } });
testGraph.connect<"out">(clockSrc).to<"in">(signalGen);
testGraph.connect<"out">(signalGen).to<"in">(sink);
scheduler::Simple sched{ std::move(testGraph) };
sched.runAndWait(); |
||
thread.join(); | ||
conditionvar.notify_all(); | ||
} | ||
|
||
T processOne() { | ||
auto processBulk(gr::PublishableSpan auto &output) { | ||
// technically, this wouldn't have to block, but could just publish 0 samples, | ||
// but keep it as test case for BlockingIO<true>. | ||
std::unique_lock guard(mutex); | ||
if (samples.size() == 0) { | ||
while (samples.empty() && !quit) { | ||
conditionvar.wait(guard); | ||
} | ||
|
||
T v = samples.front(); | ||
samples.pop_front(); | ||
out.max_samples = std::max<int>(1, samples.size()); | ||
return v; | ||
const auto n = std::min(output.size(), samples.size()); | ||
if (n == 0) { | ||
output.publish(0); | ||
return gr::work::Status::OK; | ||
} | ||
|
||
std::copy(samples.begin(), samples.begin() + n, output.begin()); | ||
samples.erase(samples.begin(), samples.begin() + n); | ||
output.publish(n); | ||
return gr::work::Status::OK; | ||
} | ||
}; | ||
|
||
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (SineSource<T>), out, frequency); | ||
static_assert(gr::traits::block::can_processOne<SineSource<float>>); | ||
static_assert(gr::traits::block::can_processOne<SineSource<double>>); | ||
|
||
namespace DigitizerUi { | ||
} // namespace opendigitizer | ||
|
||
void DataSource::registerBlockType() { | ||
BlockType::registry().addBlockType<SineSource>("sine_source"); | ||
} | ||
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::SineSource, out, frequency) | ||
|
||
} // namespace DigitizerUi | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally OK for this PR but we should move all of these isolated block implementation to the GR 4.0 repo and also make them conform and check the corresponding items in summary issue EPIC: List of GR 3.10 blocks & Others to be ported to GR 4.0