diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 2af338db..2a639d26 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -51,8 +51,8 @@ set(XEUS_PYTHON_TESTS main.cpp ../src/xutils.cpp #test_debugger.cpp - #xeus_client.hpp - #xeus_client.cpp + xeus_client.hpp + xeus_client.cpp ) add_executable(test_xeus_python ${XEUS_PYTHON_TESTS}) diff --git a/test/xeus_client.cpp b/test/xeus_client.cpp index e5597deb..3eb43f3b 100644 --- a/test/xeus_client.cpp +++ b/test/xeus_client.cpp @@ -16,153 +16,9 @@ #include "xeus/xguid.hpp" #include "xeus/xmessage.hpp" #include "xeus-zmq/xmiddleware.hpp" -#include "xeus-zmq/xzmq_serializer.hpp" using namespace std::chrono_literals; -/*********************************** - * xeus-client_base implementation * - ***********************************/ - -xeus_client_base::xeus_client_base(xeus::xcontext& context, - const std::string& user_name, - const xeus::xconfiguration& config) - : p_shell_authentication(xeus::make_xauthentication(config.m_signature_scheme, config.m_key)) - , p_control_authentication(xeus::make_xauthentication(config.m_signature_scheme, config.m_key)) - , p_iopub_authentication(xeus::make_xauthentication(config.m_signature_scheme, config.m_key)) - , m_shell(context, zmq::socket_type::dealer) - , m_control(context, zmq::socket_type::dealer) - , m_iopub(context, zmq::socket_type::sub) - , m_shell_end_point("") - , m_control_end_point("") - , m_iopub_end_point("") - , m_user_name(user_name) - , m_session_id(xeus::new_xguid()) -{ - m_shell_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_shell_port); - m_control_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_control_port); - m_iopub_end_point = xeus::get_end_point(config.m_transport, config.m_ip, config.m_iopub_port); - - m_shell.connect(m_shell_end_point); - m_control.connect(m_control_end_point); - m_iopub.connect(m_iopub_end_point); -} - -xeus_client_base::~xeus_client_base() -{ - m_shell.disconnect(m_shell_end_point); - m_control.disconnect(m_control_end_point); - m_iopub.disconnect(m_iopub_end_point); -} - -void xeus_client_base::send_on_shell(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content) -{ - send_message(std::move(header), - std::move(parent_header), - std::move(metadata), - std::move(content), - m_shell, - *p_shell_authentication); -} - -nl::json xeus_client_base::receive_on_shell() -{ - return receive_message(m_shell, *p_shell_authentication); -} - -void xeus_client_base::send_on_control(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content) -{ - send_message(std::move(header), - std::move(parent_header), - std::move(metadata), - std::move(content), - m_control, - *p_control_authentication); -} - -nl::json xeus_client_base::receive_on_control() -{ - return receive_message(m_control, *p_control_authentication); -} - -void xeus_client_base::subscribe_iopub(const std::string& filter) -{ - m_iopub.set(zmq::sockopt::subscribe, filter); -} - -void xeus_client_base::unsubscribe_iopub(const std::string& filter) -{ - m_iopub.set(zmq::sockopt::unsubscribe, filter); -} - -nl::json xeus_client_base::receive_on_iopub() -{ - zmq::multipart_t wire_msg; - wire_msg.recv(m_iopub); - xeus::xpub_message msg = xeus::xzmq_serializer::deserialize_iopub(wire_msg, *p_iopub_authentication); - - nl::json res = aggregate(msg.header(), - msg.parent_header(), - msg.metadata(), - msg.content()); - res["topic"] = msg.topic(); - return res; -} - -nl::json xeus_client_base::make_header(const std::string& msg_type) const -{ - return xeus::make_header(msg_type, m_user_name, m_session_id); -} - -void xeus_client_base::send_message(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content, - zmq::socket_t& socket, - const xeus::xauthentication& auth) -{ - xeus::xmessage msg(xeus::xmessage::guid_list(), - std::move(header), - std::move(parent_header), - std::move(metadata), - std::move(content), - xeus::buffer_sequence()); - zmq::multipart_t wire_msg = xeus::xzmq_serializer::serialize(std::move(msg), auth); - wire_msg.send(socket); -} - -nl::json xeus_client_base::receive_message(zmq::socket_t& socket, - const xeus::xauthentication& auth) -{ - zmq::multipart_t wire_msg; - wire_msg.recv(socket); - xeus::xmessage msg = xeus::xzmq_serializer::deserialize(wire_msg, auth); - - return aggregate(msg.header(), - msg.parent_header(), - msg.metadata(), - msg.content()); -} - -nl::json xeus_client_base::aggregate(const nl::json& header, - const nl::json& parent_header, - const nl::json& metadata, - const nl::json& content) const -{ - nl::json result; - result["header"] = header; - result["parent_header"] = parent_header; - result["metadata"] = metadata; - result["content"] = content; - return result; -} - /************************************* * xeus_logger_client implementation * *************************************/ @@ -171,73 +27,120 @@ xeus_logger_client::xeus_logger_client(xeus::xcontext& context, const std::string& user_name, const xeus::xconfiguration& config, const std::string& file_name) - : xeus_client_base(context, user_name, config) + : m_user_name(user_name) , m_file_name(file_name) - , m_iopub_thread() + , p_client(xeus::make_xclient_zmq(context, config)) { std::ofstream out(m_file_name); out << "STARTING CLIENT" << std::endl; - base_type::subscribe_iopub(""); - m_iopub_thread = std::move(std::thread(&xeus_logger_client::poll_iopub, this)); + + // Register listeners for shell and kernel status + register_shell_listener(); + register_kernel_status_listener(); } xeus_logger_client::~xeus_logger_client() { - m_iopub_thread.join(); } void xeus_logger_client::send_on_shell(const std::string& msg_type, nl::json content) { - nl::json header = base_type::make_header(msg_type); - log_message(base_type::aggregate(header, - nl::json::object(), - nl::json::object(), - content)); - base_type::send_on_shell(std::move(header), - nl::json::object(), - nl::json::object(), - std::move(content)); + nl::json header = xeus::make_header(msg_type, m_user_name, xeus::new_xguid()); + + nl::json result; + result["header"] = header; + result["parent_header"] = nl::json::object(); + result["metadata"] = nl::json::object(); + result["content"] = content; + log_message(result); + + xeus::xmessage msg(xeus::xmessage::guid_list(), + std::move(header), + nl::json::object(), + nl::json::object(), + std::move(content), + xeus::buffer_sequence()); + + p_client->send_on_shell(std::move(msg)); } void xeus_logger_client::send_on_control(const std::string& msg_type, nl::json content) { - nl::json header = base_type::make_header(msg_type); - log_message(base_type::aggregate(header, - nl::json::object(), - nl::json::object(), - content)); - base_type::send_on_control(std::move(header), - nl::json::object(), - nl::json::object(), - std::move(content)); + nl::json header = xeus::make_header(msg_type, m_user_name, xeus::new_xguid()); + + nl::json result; + result["header"] = header; + result["parent_header"] = nl::json::object(); + result["metadata"] = nl::json::object(); + result["content"] = content; + log_message(result); + + xeus::xmessage msg(xeus::xmessage::guid_list(), + std::move(header), + nl::json::object(), + nl::json::object(), + std::move(content), + xeus::buffer_sequence()); + + p_client->send_on_control(std::move(msg)); } nl::json xeus_logger_client::receive_on_shell() { - nl::json msg = base_type::receive_on_shell(); - log_message(msg); - return msg; + std::optional msg_opt = p_client->check_shell_answer(); + nl::json result = nl::json::object(); + + if (msg_opt.has_value()) + { + xeus::xmessage msg = std::move(*msg_opt); + result["header"] = msg.header(); + result["parent_header"] = msg.parent_header(); + result["metadata"] = msg.metadata(); + result["content"] = msg.content(); + } + log_message(result); + + return result; } nl::json xeus_logger_client::receive_on_control() { - nl::json msg = base_type::receive_on_control(); - log_message(msg); - return msg; + std::optional msg_opt = p_client->check_control_answer(); + nl::json result = nl::json::object(); + + if (msg_opt.has_value()) + { + xeus::xmessage msg = std::move(*msg_opt); + result["header"] = msg.header(); + result["parent_header"] = msg.parent_header(); + result["metadata"] = msg.metadata(); + result["content"] = msg.content(); + } + log_message(result); + + return result; } std::size_t xeus_logger_client::iopub_queue_size() const { - std::lock_guard guard(m_queue_mutex); - return m_message_queue.size(); + return p_client->iopub_queue_size(); } nl::json xeus_logger_client::pop_iopub_message() { - std::lock_guard guard(m_queue_mutex); - nl::json res = m_message_queue.back(); - m_message_queue.pop(); - return res; + std::optional msg_opt = p_client->pop_iopub_message(); + nl::json result = nl::json::object(); + + if (msg_opt.has_value()) + { + xeus::xmessage msg = std::move(*msg_opt); + result["header"] = msg.header(); + result["parent_header"] = msg.parent_header(); + result["metadata"] = msg.metadata(); + result["content"] = msg.content(); + } + + return result; } nl::json xeus_logger_client::wait_for_debug_event(const std::string& event) @@ -268,28 +171,58 @@ nl::json xeus_logger_client::wait_for_debug_event(const std::string& event) return msg; } -void xeus_logger_client::poll_iopub() +void xeus_logger_client::start() +{ + p_client->start(); +} + +void xeus_logger_client::register_shell_listener() +{ + p_client->register_shell_listener([this](xeus::xmessage msg) { + handle_shell_message(std::move(msg)); + }); +} + +void xeus_logger_client::register_kernel_status_listener() +{ + p_client->register_kernel_status_listener([this](bool status) { + handle_kernel_status_message(status); + }); +} + +void xeus_logger_client::handle_shell_message(xeus::xmessage msg) { - while(true) + nl::json json_msg; + json_msg["header"] = msg.header(); + json_msg["parent_header"] = msg.parent_header(); + json_msg["metadata"] = msg.metadata(); + json_msg["content"] = msg.content(); + + log_message(json_msg); + + std::string topic = json_msg["topic"]; + std::size_t topic_size = topic.size(); + if (topic.substr(topic_size - 8, topic_size) == "shutdown") { - nl::json msg = base_type::receive_on_iopub(); - { - std::unique_lock lk(m_notify_mutex); - std::unique_lock guard(m_queue_mutex); - m_message_queue.push(msg); - guard.unlock(); - lk.unlock(); - m_notify_cond.notify_one(); - } - std::string topic = msg["topic"]; - std::size_t topic_size = topic.size(); - log_message(std::move(msg)); - if(topic.substr(topic_size - 8, topic_size) == "shutdown") - { - std::cout << "Received shutdown, exiting" << std::endl; - break; - } + std::cout << "Received shutdown, exiting" << std::endl; + // TODO + } +} + +void xeus_logger_client::handle_kernel_status_message(bool status) +{ + nl::json msg; + if (status) + { + // TODO + // msg["topic"] = "Kernel is dead"; } + else + { + // TODO + // msg["topic"] = "Kernel is alive"; + } + log_message(msg); } void xeus_logger_client::log_message(nl::json msg) @@ -297,4 +230,4 @@ void xeus_logger_client::log_message(nl::json msg) std::lock_guard guard(m_file_mutex); std::ofstream out(m_file_name, std::ios_base::app); out << msg.dump(4) << std::endl; -} +} \ No newline at end of file diff --git a/test/xeus_client.hpp b/test/xeus_client.hpp index 2690f068..99f1eab5 100644 --- a/test/xeus_client.hpp +++ b/test/xeus_client.hpp @@ -16,85 +16,18 @@ #include "nlohmann/json.hpp" #include "xeus/xkernel_configuration.hpp" -#include "xeus-zmq/xauthentication.hpp" - -// Base class for clients, provides an API to -// send and receive messages, but nothing more ;) +#include "xeus/xmessage.hpp" +#include "xeus-zmq/xclient_zmq.hpp" namespace nl = nlohmann; -// TODO: rewrite this class with the new client framework from xeus-zmq -/* -class xeus_client_base -{ -public: - - xeus_client_base(xeus::xcontext& context, - const std::string& user_name, - const xeus::xconfiguration& config); - - virtual ~xeus_client_base(); - - void subscribe_iopub(const std::string& filter); - void unsubscribe_iopub(const std::string& filter); - -protected: - - void send_on_shell(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content); - nl::json receive_on_shell(); - - void send_on_control(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content); - nl::json receive_on_control(); - - nl::json receive_on_iopub(); - - nl::json make_header(const std::string& msg_type) const; - nl::json aggregate(const nl::json& header, - const nl::json& parent_header, - const nl::json& metadata, - const nl::json& content) const; - -private: - - void send_message(nl::json header, - nl::json parent_header, - nl::json metadata, - nl::json content, - zmq::socket_t& socket, - const xeus::xauthentication& auth); - - nl::json receive_message(zmq::socket_t& socket, - const xeus::xauthentication& auth); - - using authentication_ptr = std::unique_ptr; - authentication_ptr p_shell_authentication; - authentication_ptr p_control_authentication; - authentication_ptr p_iopub_authentication; - - zmq::socket_t m_shell; - zmq::socket_t m_control; - zmq::socket_t m_iopub; - - std::string m_shell_end_point; - std::string m_control_end_point; - std::string m_iopub_end_point; - - std::string m_user_name; - std::string m_session_id; -}; - // Client that logs sent and received messages. -// Runs the iopub poller in a dedicated thread and -// push messages in a queue for future usage. -class xeus_logger_client : public xeus_client_base +// Based on xclient_zmq from xeus-zmq. + +class xeus_logger_client { public: + using client_ptr = std::unique_ptr; xeus_logger_client(xeus::xcontext& context, const std::string& user_name, @@ -113,20 +46,22 @@ class xeus_logger_client : public xeus_client_base nl::json pop_iopub_message(); nl::json wait_for_debug_event(const std::string& event); + void start(); + void log_message(nl::json msg); private: - using base_type = xeus_client_base; + void register_shell_listener(); + void register_kernel_status_listener(); - void poll_iopub(); - void log_message(nl::json msg); + void handle_shell_message(xeus::xmessage msg); + void handle_kernel_status_message(bool status); + std::string m_user_name; std::string m_file_name; - std::thread m_iopub_thread; - std::queue m_message_queue; std::mutex m_file_mutex; - mutable std::mutex m_queue_mutex; std::mutex m_notify_mutex; std::condition_variable m_notify_cond; -}; -*/ + + client_ptr p_client; +}; \ No newline at end of file