Skip to content

Commit

Permalink
Data inspector (#12610)
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wolk authored Feb 1, 2024
1 parent 7f3af54 commit 393608c
Show file tree
Hide file tree
Showing 13 changed files with 1,021 additions and 0 deletions.
1 change: 1 addition & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ endif()
add_subdirectory(TestWorkflows)

add_subdirectory(AODMerger)
add_subdirectory(DataInspector)
21 changes: 21 additions & 0 deletions Framework/DataInspector/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
# All rights not expressly granted are reserved.
#
# This software is distributed under the terms of the GNU General Public
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
#
# In applying this license CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

# Given GCC 7.3 does not provide std::filesystem we use Boost instead
# Drop this once we move to GCC 8.2+
o2_add_library(FrameworkDataInspector
SOURCES src/Plugin.cxx
src/DataInspector.cxx
src/DataInspectorService.cxx
src/DIMessages.cxx
src/DISocket.cxx
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
PUBLIC_LINK_LIBRARIES O2::Framework)
145 changes: 145 additions & 0 deletions Framework/DataInspector/src/DIMessages.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "DIMessages.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include <fairlogger/Logger.h>

using namespace rapidjson;

Document toJson(const DIMessages::RegisterDevice::Specs::Input& input)
{
Document doc;
doc.SetObject();
auto& alloc = doc.GetAllocator();

doc.AddMember("binding", Value(input.binding.c_str(), alloc), alloc);
doc.AddMember("sourceChannel", Value(input.sourceChannel.c_str(), alloc), alloc);
doc.AddMember("timeslice", Value((uint64_t)input.timeslice), alloc);

if (input.origin.has_value()) {
doc.AddMember("origin", Value(input.origin.value().c_str(), alloc), alloc);
}
if (input.description.has_value()) {
doc.AddMember("description", Value(input.description.value().c_str(), alloc), alloc);
}
if (input.subSpec.has_value()) {
doc.AddMember("subSpec", Value(input.subSpec.value()), alloc);
}

return doc;
}

Document toJson(const DIMessages::RegisterDevice::Specs::Output& output)
{
Document doc;
doc.SetObject();
auto& alloc = doc.GetAllocator();

doc.AddMember("binding", Value(output.binding.c_str(), alloc), alloc);
doc.AddMember("channel", Value(output.channel.c_str(), alloc), alloc);
doc.AddMember("timeslice", Value((uint64_t)output.timeslice), alloc);
doc.AddMember("maxTimeslices", Value((uint64_t)output.maxTimeslices), alloc);

doc.AddMember("origin", Value(output.origin.c_str(), alloc), alloc);
doc.AddMember("description", Value(output.description.c_str(), alloc), alloc);
if (output.subSpec.has_value()) {
doc.AddMember("subSpec", Value(output.subSpec.value()), alloc);
}

return doc;
}

Document toJson(const DIMessages::RegisterDevice::Specs::Forward& forward)
{
Document doc;
doc.SetObject();
auto& alloc = doc.GetAllocator();

doc.AddMember("binding", Value(forward.binding.c_str(), alloc), alloc);
doc.AddMember("channel", Value(forward.channel.c_str(), alloc), alloc);
doc.AddMember("timeslice", Value((uint64_t)forward.timeslice), alloc);
doc.AddMember("maxTimeslices", Value((uint64_t)forward.maxTimeslices), alloc);

if (forward.origin.has_value()) {
doc.AddMember("origin", Value(forward.origin.value().c_str(), alloc), alloc);
}
if (forward.description.has_value()) {
doc.AddMember("description", Value(forward.description.value().c_str(), alloc), alloc);
}
if (forward.subSpec.has_value()) {
doc.AddMember("subSpec", Value(forward.subSpec.value()), alloc);
}

return doc;
}

Document specToJson(const DIMessages::RegisterDevice::Specs& specs)
{
Document doc;
doc.SetObject();
auto& alloc = doc.GetAllocator();

doc.AddMember("rank", Value((uint64_t)specs.rank), alloc);
doc.AddMember("nSlots", Value((uint64_t)specs.nSlots), alloc);
doc.AddMember("inputTimesliceId", Value((uint64_t)specs.inputTimesliceId), alloc);
doc.AddMember("maxInputTimeslices", Value((uint64_t)specs.maxInputTimeslices), alloc);

Value inputsArray;
inputsArray.SetArray();
for (auto& input : specs.inputs) {
Value inputValue;
inputValue.CopyFrom(toJson(input), alloc);
inputsArray.PushBack(inputValue, alloc);
}
doc.AddMember("inputs", inputsArray, alloc);

Value outputsArray;
outputsArray.SetArray();
for (auto& output : specs.outputs) {
Value outputValue;
outputValue.CopyFrom(toJson(output), alloc);
outputsArray.PushBack(outputValue, alloc);
}
doc.AddMember("outputs", outputsArray, alloc);

Value forwardsArray;
forwardsArray.SetArray();
for (auto& forward : specs.forwards) {
Value forwardValue;
forwardValue.CopyFrom(toJson(forward), alloc);
forwardsArray.PushBack(forwardValue, alloc);
}
doc.AddMember("forwards", forwardsArray, alloc);

return doc;
}

std::string DIMessages::RegisterDevice::toJson()
{
Document doc;
doc.SetObject();
auto& alloc = doc.GetAllocator();

doc.AddMember("name", Value(name.c_str(), alloc), alloc);
doc.AddMember("runId", Value(runId.c_str(), alloc), alloc);

Value specsValue;
specsValue.CopyFrom(specToJson(specs), alloc);
doc.AddMember("specs", specsValue, alloc);

StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
doc.Accept(writer);

return {buffer.GetString(), buffer.GetSize()};
}
73 changes: 73 additions & 0 deletions Framework/DataInspector/src/DIMessages.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_DIMESSAGES_H
#define O2_DIMESSAGES_H

#include <string>
#include <cstdint>
#include <vector>
#include <boost/optional.hpp>

namespace DIMessages
{
struct RegisterDevice {
struct Specs {
struct Input {
std::string binding;
std::string sourceChannel;
size_t timeslice;

boost::optional<std::string> origin;
boost::optional<std::string> description;
boost::optional<uint32_t> subSpec;
};

struct Output {
std::string binding;
std::string channel;
size_t timeslice;
size_t maxTimeslices;

std::string origin;
std::string description;
boost::optional<uint32_t> subSpec;
};

struct Forward {
std::string binding;
size_t timeslice;
size_t maxTimeslices;
std::string channel;

boost::optional<std::string> origin;
boost::optional<std::string> description;
boost::optional<uint32_t> subSpec;
};

std::vector<Input> inputs;
std::vector<Output> outputs;
std::vector<Forward> forwards;

size_t rank;
size_t nSlots;
size_t inputTimesliceId;
size_t maxInputTimeslices;
};

std::string name;
std::string runId;
Specs specs;

std::string toJson();
};
} // namespace DIMessages

#endif // O2_DIMESSAGES_H
117 changes: 117 additions & 0 deletions Framework/DataInspector/src/DISocket.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "DISocket.h"
#include <fairlogger/Logger.h>

#define ASIO_CATCH(customMessage) \
catch (boost::system::system_error & err) \
{ \
auto msg = std::string{err.what()}; \
auto code = std::to_string(err.code().value()); \
throw std::runtime_error{"Exception in DataInspector (boost_code=" + code + ", boost_msg=" + msg + ") - " + customMessage}; \
}

DIMessage::Header::Type DIMessage::Header::type() const
{
return static_cast<DIMessage::Header::Type>(boost::endian::little_to_native(typeLE));
}

uint64_t DIMessage::Header::payloadSize() const
{
return boost::endian::little_to_native(payloadSizeLE);
}

DIMessage::DIMessage(const DIMessage& other) noexcept : header(other.header)
{
this->payload = new char[other.header.payloadSize()];
std::memcpy(this->payload, other.payload, other.header.payloadSize());
}

DIMessage& DIMessage::operator=(const DIMessage& other) noexcept
{
if (&other == this) {
return *this;
}

this->header = Header{other.header};

delete[] payload;
this->payload = new char[other.header.payloadSize()];
std::memcpy(this->payload, other.payload, other.header.payloadSize());

return *this;
}

DIMessage::DIMessage(DIMessage&& other) noexcept : header(other.header), payload(other.payload)
{
other.payload = nullptr;
}

DIMessage& DIMessage::operator=(DIMessage&& other) noexcept
{
header = Header{other.header};
delete[] payload;
payload = other.payload;

other.payload = nullptr;
return *this;
}

DIMessage::~DIMessage()
{
delete[] payload;
}

DISocket::DISocket(const std::string& address, int port) : ioContext(), socket(ioContext)
{
try {
auto ip_address = boost::asio::ip::address::from_string(address);
socket.connect(boost::asio::ip::tcp::endpoint(ip_address, port));
}
ASIO_CATCH("DISocket::DISocket")
}

DISocket::~DISocket()
{
socket.close();
}

bool DISocket::isMessageAvailable()
{
return socket.available() >= sizeof(DIMessage::Header);
}

void DISocket::send(const DIMessage& message)
{
try {
socket.send(std::array<boost::asio::const_buffer, 2>{
boost::asio::buffer(&message.header, sizeof(DIMessage::Header)),
boost::asio::buffer(message.payload, message.header.payloadSize())});
}
ASIO_CATCH("DISocket::send")
}

DIMessage DISocket::receive()
{
try {
DIMessage message;
socket.receive(boost::asio::buffer(&message.header, sizeof(DIMessage::Header)));

if (message.header.payloadSize() > 0) {
message.payload = new char[message.header.payloadSize()];
socket.receive(boost::asio::buffer(message.payload, message.header.payloadSize()));
}

return message;
}
ASIO_CATCH("DISocket::receive")
return {};
}
Loading

0 comments on commit 393608c

Please sign in to comment.