Skip to content

Commit

Permalink
[core] Move eCAL::protobuf::CDynamicJSONSubscriber based on CDynamicM…
Browse files Browse the repository at this point in the history
…essageSubscriber (#1605)
  • Loading branch information
KerstinKeller authored May 21, 2024
1 parent 526ef8a commit 0bdb60c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 204 deletions.
288 changes: 90 additions & 198 deletions ecal/core/include/ecal/msg/protobuf/dynamic_json_subscriber.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,13 +26,15 @@

#include <ecal/ecal.h>
#include <ecal/ecal_os.h>
#include <ecal/msg/dynamic.h>
#include <ecal/msg/protobuf/ecal_proto_dyn.h>

#include <iostream>
#include <sstream>
#include <fstream>
#include <memory>
#include <cstdio>
#include <string>

#ifdef _MSC_VER
#pragma warning(push, 0) // disable proto warnings
Expand All @@ -54,216 +56,106 @@

namespace eCAL
{
namespace protobuf
{
/**
* @brief eCAL dynamic protobuf to json subscriber.
**/
class CDynamicJSONSubscriber
namespace internal
{
public:
/**
* @brief Constructor.
**/
CDynamicJSONSubscriber();

/**
* @brief Constructor.
*
* @param topic_name_ Unique topic name.
**/
explicit CDynamicJSONSubscriber(const std::string& topic_name_);

/**
* @brief Destructor.
**/
~CDynamicJSONSubscriber();

CDynamicJSONSubscriber(const CDynamicJSONSubscriber&) = delete;
CDynamicJSONSubscriber& operator=(const CDynamicJSONSubscriber&) = delete;
CDynamicJSONSubscriber(CDynamicJSONSubscriber&& rhs) = delete;
CDynamicJSONSubscriber& operator=(CDynamicJSONSubscriber&& rhs) = delete;

/**
* @brief Creates this object.
*
* @param topic_name_ Unique topic name.
*
* @return true if it succeeds, false if it fails.
**/
void Create(const std::string& topic_name_);

/**
* @brief Destroys this object.
*
* @return true if it succeeds, false if it fails.
**/
void Destroy();

/**
* @brief Query if this object is created.
*
* @return true if created, false if not.
**/
bool IsCreated() const { return(m_created); }

/**
* @brief Add callback function for incoming receives.
*
* @param callback_ The callback function to add.
*
* @return True if succeeded, false if not.
**/
bool AddReceiveCallback(ReceiveCallbackT callback_);

/**
* @brief Remove callback function for incoming receives.
*
* @return True if succeeded, false if not.
**/
bool RemReceiveCallback();

protected:
void OnReceive(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_);

bool m_created;
std::unique_ptr<eCAL::protobuf::CProtoDynDecoder> m_msg_decoder;
std::string m_msg_string;
eCAL::CSubscriber m_msg_sub;
ReceiveCallbackT m_msg_callback;

std::string m_topic_type;
std::string m_topic_type_full;

std::string m_topic_desc;
std::shared_ptr<google::protobuf::util::TypeResolver> m_resolver;
google::protobuf::DescriptorPool m_descriptor_pool;
};
/** @example proto_dyn_json.cpp
* This is an example how to use CDynamicJSONSubscriber to receive dynamic google::protobuf data as a JSON string with eCAL.
*/

inline CDynamicJSONSubscriber::CDynamicJSONSubscriber() :
m_created(false),
m_msg_decoder(nullptr)
{}

inline CDynamicJSONSubscriber::CDynamicJSONSubscriber(const std::string& topic_name_) :
m_created(false),
m_msg_decoder(nullptr)
{
Create(topic_name_);
}

inline CDynamicJSONSubscriber::~CDynamicJSONSubscriber()
{
Destroy();
}

inline void CDynamicJSONSubscriber::Create(const std::string& topic_name_)
{
if (m_created) return;

// create message decoder
m_msg_decoder = std::make_unique<eCAL::protobuf::CProtoDynDecoder>();

// create subscriber
m_msg_sub.Create(topic_name_);

// add callback
m_msg_sub.AddReceiveCallback(std::bind(&CDynamicJSONSubscriber::OnReceive, this, std::placeholders::_1, std::placeholders::_2));

m_created = true;
}

inline void CDynamicJSONSubscriber::Destroy()
{
if (!m_created) return;

// remove callback
m_msg_sub.RemReceiveCallback();

// destroy subscriber
m_msg_sub.Destroy();

// delete message decoder
m_msg_decoder.reset();

m_created = false;
}
class ProtobufDynamicJSONDeserializer
{
public:
std::string Deserialize(const void* buffer_, size_t size_, const SDataTypeInformation& datatype_info_)
{
google::protobuf::util::JsonOptions options;
#if GOOGLE_PROTOBUF_VERSION >= 5026000
options.always_print_fields_with_no_presence = true;
#else
options.always_print_primitive_fields = true;
#endif

inline bool CDynamicJSONSubscriber::AddReceiveCallback(ReceiveCallbackT callback_)
{
if (!m_created) return false;
m_msg_callback = callback_;
return true;
}
std::string binary_input;
binary_input.assign(static_cast<const char*>(buffer_), static_cast<size_t>(size_));
std::string json_output;
auto status = google::protobuf::util::BinaryToJsonString(GetTypeResolver(datatype_info_).get(), GetQualifiedTopicType(datatype_info_), binary_input, &json_output, options);
if (status.ok())
{
return json_output;
}
else
{
throw new DynamicReflectionException("Error deserializing Protobuf data to json object.");
}
}

inline bool CDynamicJSONSubscriber::RemReceiveCallback()
{
if (!m_created) return false;
m_msg_callback = nullptr;
return true;
}
private:
std::shared_ptr<google::protobuf::util::TypeResolver> GetTypeResolver(const SDataTypeInformation& datatype_info_)
{
auto schema = m_type_resolver_map.find(datatype_info_);
if (schema == m_type_resolver_map.end())
{
m_type_resolver_map[datatype_info_] = CreateTypeResolver(datatype_info_);
}
return m_type_resolver_map[datatype_info_];
}

inline void CDynamicJSONSubscriber::OnReceive(const char* topic_name_, const struct eCAL::SReceiveCallbackData* data_)
{
if (m_msg_string.empty())
{
// get topic type
SDataTypeInformation topic_info;
//nodiscard???
eCAL::Util::GetTopicDataTypeInformation(topic_name_, topic_info);
m_topic_type_full = topic_info.name;
m_topic_type = m_topic_type_full.substr(m_topic_type_full.find_last_of('.') + 1, m_topic_type_full.size());
m_topic_type_full = "/" + m_topic_type_full;
std::shared_ptr<google::protobuf::util::TypeResolver> CreateTypeResolver(const SDataTypeInformation& datatype_info_)
{
std::string unqualified_topic_type = GetUnqualifiedTopicType(datatype_info_);

if (StrEmptyOrNull(unqualified_topic_type))
{
throw DynamicReflectionException("ProtobufDynamicJSONDeserializer: Could not get type");
}

std::string topic_desc = datatype_info_.descriptor;
if (StrEmptyOrNull(topic_desc))
{
throw DynamicReflectionException("ProtobufDynamicJSONDeserializer: Could not get description for type" + std::string(unqualified_topic_type));
}

google::protobuf::FileDescriptorSet proto_desc;
proto_desc.ParseFromString(topic_desc);
std::string error_s;
const std::shared_ptr<google::protobuf::Message> msg(m_dynamic_decoder.GetProtoMessageFromDescriptorSet(proto_desc, unqualified_topic_type, error_s));
std::shared_ptr<google::protobuf::util::TypeResolver> resolver{ google::protobuf::util::NewTypeResolverForDescriptorPool("", m_dynamic_decoder.GetDescriptorPool()) };

if (resolver == nullptr)
{
std::stringstream s;
s << "ProtobufDynamicJSONDeserializer: Message of type " + unqualified_topic_type << " could not be decoded" << std::endl;
s << error_s;
throw DynamicReflectionException(s.str());
}

return resolver;
}

if (m_topic_type.empty())
std::string GetQualifiedTopicType(const SDataTypeInformation& data_type_info_)
{
std::cout << "could not get type for topic " << topic_name_ << std::endl;
return;
return "/" + data_type_info_.name;
}

// get topic description
m_topic_desc = topic_info.descriptor;
if (m_topic_desc.empty())
std::string GetUnqualifiedTopicType(const SDataTypeInformation& data_type_info_)
{
std::cout << "could not get description for topic " << topic_name_ << std::endl;
return;
const auto& type_name = data_type_info_.name;
return type_name.substr(type_name.find_last_of('.') + 1, type_name.size());
}

std::string error_s;
google::protobuf::FileDescriptorSet proto_desc;
proto_desc.ParseFromString(m_topic_desc);
const std::shared_ptr<google::protobuf::Message> msg(m_msg_decoder->GetProtoMessageFromDescriptorSet(proto_desc, m_topic_type, error_s));
m_resolver.reset(google::protobuf::util::NewTypeResolverForDescriptorPool("", m_msg_decoder->GetDescriptorPool()));
}

// decode message and execute callback
//if(msg_callback && msg_ptr && msg_ptr->ParseFromArray(data_->buf, data_->size))
if (m_msg_callback)
{
eCAL::protobuf::CProtoDynDecoder m_dynamic_decoder;
std::map<SDataTypeInformation, std::shared_ptr<google::protobuf::util::TypeResolver>> m_type_resolver_map;
};
}

google::protobuf::util::JsonOptions options;
#if GOOGLE_PROTOBUF_VERSION >= 5026000
options.always_print_fields_with_no_presence = true;
#else
options.always_print_primitive_fields = true;
#endif
namespace protobuf
{
/**
* @brief eCAL protobuf dynamic subscriber class.
*
* Dynamic subscriber class for protobuf messages. For details see documentation of CDynamicMessageSubscriber class.
*
**/
using CDynamicJSONSubscriber = CDynamicMessageSubscriber<std::string, internal::ProtobufDynamicJSONDeserializer>;

std::string binary_input;
binary_input.assign(static_cast<char*>(data_->buf), static_cast<size_t>(data_->size));
m_msg_string.clear();
auto status = google::protobuf::util::BinaryToJsonString(m_resolver.get(), m_topic_type_full, binary_input, &m_msg_string, options);
if (status.ok())
{
SReceiveCallbackData cb_data;
cb_data.buf = (void*)m_msg_string.c_str();
cb_data.size = (long)m_msg_string.size();
cb_data.time = data_->time;
m_msg_callback(topic_name_, &cb_data);
}
}
/** @example proto_dyn_rec.cpp
* This is an example how to use eCAL::protobuf::CDynamicSubscriber to receive dynamic protobuf data with eCAL. To receive the data, see @ref proto_dyn_rec.cpp .
*/
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ========================= eCAL LICENSE =================================
#
# Copyright (C) 2016 - 2019 Continental Corporation
# Copyright (C) 2016 - 2024 Continental Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@ target_link_libraries(${PROJECT_NAME}
eCAL::core_protobuf
)

target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14)
target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_17)

ecal_install_sample(${PROJECT_NAME})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,10 +25,9 @@

const std::string MESSAGE_NAME("person");

void ProtoMsgCallback(const char* topic_name_, const eCAL::SReceiveCallbackData* msg_)
void ProtoMsgCallback(const char* topic_name_, const std::string& msg_, long long /*time_*/, long long /*clock_*/, long long /*id_*/)
{
std::string content((char*)msg_->buf, msg_->size);
std::cout << topic_name_ << " : " << content << std::endl;
std::cout << topic_name_ << " : " << msg_ << std::endl;
std::cout << std::endl;
}

Expand Down

0 comments on commit 0bdb60c

Please sign in to comment.