diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 0225d4374..3ed5bcf77 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -197,4 +197,66 @@ void ZmqClient::sendMsg( throw system_error(make_error_code(errc::io_error), message); } +bool ZmqClient::wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos, + + std::vector& buffer) + +{ + + SWSS_LOG_ENTER(); + + int rc; + + for (int i = 0; true ; ++i) + + { + + rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0); + + if (rc < 0) + + { + + if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY) + + { + + continue; + + } + + SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); + + } + + if (rc >= (int)buffer.size()) + + { + + SWSS_LOG_THROW( + + "zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", + + (int)buffer.size(), rc); + + } + + break; + + } + + buffer.at(rc) = 0; // make sure that we end string with zero before parse + + kcos.clear(); + + BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos); + + return true; + +} + } diff --git a/common/zmqclient.h b/common/zmqclient.h index 313e65735..79b4d766a 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -12,6 +12,7 @@ namespace swss { class ZmqClient { public: + ZmqClient(const std::string& endpoint); ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); @@ -24,6 +25,15 @@ class ZmqClient const std::string& tableName, const std::vector& kcos, std::vector& sendbuffer); + + bool wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos, + + std::vector& buffer); + private: void initialize(const std::string& endpoint, const std::string& vrf); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index ec9396b39..c171163f7 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -171,6 +171,18 @@ void ZmqProducerStateTable::send(const std::vector &kcos } } +bool ZmqProducerStateTable::wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos) + +{ + + return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer); + +} + size_t ZmqProducerStateTable::dbUpdaterQueueSize() { if (m_asyncDBUpdater == nullptr) diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 749107825..3c794237b 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -37,6 +37,14 @@ class ZmqProducerStateTable : public ProducerStateTable // Batched send that can include both SET and DEL requests. virtual void send(const std::vector &kcos); + // This method should only be used if the ZmqClient enables one-to-one sync. + + virtual bool wait(std::string& dbName, + + std::string& tableName, + + std::vector>& kcos); + size_t dbUpdaterQueueSize(); private: void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 4800b9ba2..0418bc50b 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -12,13 +12,13 @@ using namespace std; namespace swss { ZmqServer::ZmqServer(const std::string& endpoint) - : ZmqServer(endpoint, "") + : m_endpoint(endpoint) { } ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) : m_endpoint(endpoint), - m_vrf(vrf) + m_vrf(vrf), m_allowZmqPoll(true) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -29,8 +29,14 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) ZmqServer::~ZmqServer() { + m_allowZmqPoll = true; m_runThread = false; m_mqPollThread->join(); + + zmq_close(m_socket); + + zmq_ctx_destroy(m_context); + } void ZmqServer::registerMessageHandler( @@ -90,39 +96,20 @@ void ZmqServer::mqPollThread() SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("mqPollThread begin"); - // Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket - void* context = zmq_ctx_new();; - void* socket = zmq_socket(context, ZMQ_PULL); - - // Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt - int high_watermark = MQ_WATERMARK; - zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); - - if (!m_vrf.empty()) - { - zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); - } - - int rc = zmq_bind(socket, m_endpoint.c_str()); - if (rc != 0) - { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", - m_endpoint.c_str(), - zmq_errno()); - } - // zmq_poll will use less CPU zmq_pollitem_t poll_item; poll_item.fd = 0; - poll_item.socket = socket; + poll_item.socket = m_socket; poll_item.events = ZMQ_POLLIN; poll_item.revents = 0; SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str()); while (m_runThread) { + m_allowZmqPoll = false; + // receive message - rc = zmq_poll(&poll_item, 1, 1000); + auto rc = zmq_poll(&poll_item, 1, 1000); if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN)) { // timeout or other event @@ -131,7 +118,7 @@ void ZmqServer::mqPollThread() } // receive message - rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); + rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT); if (rc < 0) { int zmq_err = zmq_errno(); @@ -160,8 +147,8 @@ void ZmqServer::mqPollThread() handleReceivedData(m_buffer.data(), rc); } - zmq_close(socket); - zmq_ctx_destroy(context); + zmq_close(m_socket); + zmq_ctx_destroy(m_context); SWSS_LOG_NOTICE("mqPollThread end"); } diff --git a/common/zmqserver.h b/common/zmqserver.h index 8afe18d7c..d5b36d39a 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -40,6 +40,9 @@ class ZmqServer ZmqMessageHandler* handler); private: + + void connect(); + void handleReceivedData(const char* buffer, const size_t size); void mqPollThread(); @@ -56,6 +59,12 @@ class ZmqServer std::string m_vrf; + void* m_context; + + void* m_socket; + + bool m_allowZmqPoll; + std::map> m_HandlerMap; };