From aba0f66dfe1c77c6256cd6be6d2edeefa611efa4 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Tue, 13 Aug 2024 06:56:04 -0700 Subject: [PATCH 1/8] Add new DASH APPL_DB table names (#897) --- common/schema.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/schema.h b/common/schema.h index 46f75d07a..4007967ae 100644 --- a/common/schema.h +++ b/common/schema.h @@ -166,6 +166,11 @@ namespace swss { #define APP_DASH_ROUTE_TABLE_NAME "DASH_ROUTE_TABLE" #define APP_DASH_ROUTE_RULE_TABLE_NAME "DASH_ROUTE_RULE_TABLE" #define APP_DASH_VNET_MAPPING_TABLE_NAME "DASH_VNET_MAPPING_TABLE" +#define APP_DASH_ENI_ROUTE_TABLE_NAME "DASH_ENI_ROUTE_TABLE" +#define APP_DASH_ROUTE_GROUP_TABLE_NAME "DASH_ROUTE_GROUP_TABLE" +#define APP_DASH_TUNNEL_TABLE_NAME "DASH_TUNNEL_TABLE" +#define APP_DASH_PA_VALIDATION_TABLE_NAME "DASH_PA_VALIDATION_TABLE" +#define APP_DASH_ROUTING_APPLIANCE_TABLE_NAME "DASH_ROUTING_APPLIANCE_TABLE" /***** TO BE REMOVED *****/ From 636f565025d5b4b235791c71e8c9ca25ce289f47 Mon Sep 17 00:00:00 2001 From: Zain Budhwani <99770260+zbud-msft@users.noreply.github.com> Date: Thu, 29 Aug 2024 09:22:50 -0700 Subject: [PATCH 2/8] Account for control character as part of zmq_read_part (#906) --- common/events_common.h | 15 +++++++++---- tests/events_common_ut.cpp | 46 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/common/events_common.h b/common/events_common.h index 0aec96e9d..ae9bb7d58 100644 --- a/common/events_common.h +++ b/common/events_common.h @@ -310,7 +310,15 @@ struct serialization more = 0; zmq_msg_init(&msg); int rc = zmq_msg_recv(&msg, sock, flag); - if (rc != -1) { + if (rc == 1) { + char control_character = *(char*)zmq_msg_data(&msg); + if (control_character == 0x01 || control_character == 0x00) { + SWSS_LOG_INFO("Received subscription/unsubscription message when XSUB connect to XPUB: %c", control_character); + } else { + SWSS_LOG_DEBUG("Received non subscription based control character: %c", control_character); + } + rc = 0; + } else if (rc != -1) { size_t more_size = sizeof (more); zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &more_size); @@ -318,8 +326,7 @@ struct serialization rc = zmsg_to_map(msg, data); RET_ON_ERR(rc == 0, "Failed to deserialize part rc=%d", rc); /* read more flag if message read fails to de-serialize */ - } - else { + } else { /* override with zmq err */ rc = zmq_errno(); if (rc != 11) { @@ -332,7 +339,7 @@ struct serialization return rc; } - + template int zmq_send_part(void *sock, int flag, const DT &data) diff --git a/tests/events_common_ut.cpp b/tests/events_common_ut.cpp index 7df48588b..524265793 100644 --- a/tests/events_common_ut.cpp +++ b/tests/events_common_ut.cpp @@ -97,9 +97,51 @@ TEST(events_common, send_recv) zmq_ctx_term(zmq_ctx); } +TEST(events_common, send_recv_control_character) +{ +#if 0 + { + /* Direct log messages to stdout */ + string dummy, op("STDOUT"); + swss::Logger::swssOutputNotify(dummy, op); + swss::Logger::setMinPrio(swss::Logger::SWSS_DEBUG); + } +#endif + char *path = "tcp://127.0.0.1:5570"; + void *zmq_ctx = zmq_ctx_new(); + void *sock_p0 = zmq_socket (zmq_ctx, ZMQ_PAIR); + EXPECT_EQ(0, zmq_connect (sock_p0, path)); + void *sock_p1 = zmq_socket (zmq_ctx, ZMQ_PAIR); + EXPECT_EQ(0, zmq_bind (sock_p1, path)); + string source; + map m; + + // Subscription based control character test + zmq_msg_t sub_msg; + zmq_msg_init_size(&sub_msg, 1); + *(char*)zmq_msg_data(&sub_msg) = 0x01; + EXPECT_EQ(1, zmq_msg_send(&sub_msg, sock_p0, 0)); + zmq_msg_close(&sub_msg); + // First part will be read only and will return as 0, but will not be deserialized event + EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m)); + EXPECT_EQ("", source); + EXPECT_EQ(0, m.size()); + + // Non-subscription based control character test + zmq_msg_t ctrl_msg; + zmq_msg_init_size(&ctrl_msg, 1); + *(char*)zmq_msg_data(&ctrl_msg) = 0x07; + EXPECT_EQ(1, zmq_msg_send(&ctrl_msg, sock_p0, 0)); + zmq_msg_close(&ctrl_msg); + // First part will be read only and will return as 0, but will not be deserialized event + EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m)); + EXPECT_EQ("", source); + EXPECT_EQ(0, m.size()); - - + zmq_close(sock_p0); + zmq_close(sock_p1); + zmq_ctx_term(zmq_ctx); +} From a76b983b8c66527c86e6b45f78fb893e9e26883d Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Tue, 10 Sep 2024 15:13:48 -0700 Subject: [PATCH 3/8] run DASH vs tests (#913) sonic-net/sonic-swss#3048 moves DASH-specific VS tests into a separate folder --- .azure-pipelines/test-docker-sonic-vs-template.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.azure-pipelines/test-docker-sonic-vs-template.yml b/.azure-pipelines/test-docker-sonic-vs-template.yml index f26ee4c09..81af9bd82 100644 --- a/.azure-pipelines/test-docker-sonic-vs-template.yml +++ b/.azure-pipelines/test-docker-sonic-vs-template.yml @@ -76,7 +76,7 @@ jobs: # run pytests in sets of 20 all_tests=$(ls test_*.py) - all_tests="${all_tests} p4rt" + all_tests="${all_tests} p4rt dash" test_set=() for test in ${all_tests}; do test_set+=("${test}") From e271c9bef5aea4801b01a88a54ec589d81b47a2b Mon Sep 17 00:00:00 2001 From: Vivek Date: Wed, 11 Sep 2024 15:19:09 -0700 Subject: [PATCH 4/8] [DASH] Add support for ENI Counters (#904) --- common/schema.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/schema.h b/common/schema.h index 4007967ae..6bea1d404 100644 --- a/common/schema.h +++ b/common/schema.h @@ -213,6 +213,7 @@ namespace swss { #define COUNTERS_DEBUG_NAME_SWITCH_STAT_MAP "COUNTERS_DEBUG_NAME_SWITCH_STAT_MAP" #define COUNTERS_TUNNEL_TYPE_MAP "COUNTERS_TUNNEL_TYPE_MAP" #define COUNTERS_TUNNEL_NAME_MAP "COUNTERS_TUNNEL_NAME_MAP" +#define COUNTERS_ENI_NAME_MAP "COUNTERS_ENI_NAME_MAP" #define COUNTERS_ROUTE_NAME_MAP "COUNTERS_ROUTE_NAME_MAP" #define COUNTERS_ROUTE_TO_PATTERN_MAP "COUNTERS_ROUTE_TO_PATTERN_MAP" #define COUNTERS_FABRIC_QUEUE_NAME_MAP "COUNTERS_FABRIC_QUEUE_NAME_MAP" @@ -250,6 +251,7 @@ namespace swss { #define QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST" #define QUEUE_ATTR_ID_LIST "QUEUE_ATTR_ID_LIST" #define BUFFER_POOL_COUNTER_ID_LIST "BUFFER_POOL_COUNTER_ID_LIST" +#define ENI_COUNTER_ID_LIST "ENI_COUNTER_ID_LIST" #define PFC_WD_STATE_TABLE "PFC_WD_STATE_TABLE" #define PFC_WD_PORT_COUNTER_ID_LIST "PORT_COUNTER_ID_LIST" #define PFC_WD_QUEUE_COUNTER_ID_LIST "QUEUE_COUNTER_ID_LIST" From 24979b05ff9c2daa18432abcd0dd8518bd5c7e60 Mon Sep 17 00:00:00 2001 From: Ze Gan Date: Fri, 13 Sep 2024 15:06:40 -0700 Subject: [PATCH 5/8] Add dpu db in schema (#903) According to the HLD: https://github.com/sonic-net/SONiC/blob/8c39d0cf1886fc1e8a169160e162fb40fe200d86/doc/smart-switch/smart-switch-database-architecture/smart-switch-database-design.md?plain=1#L159-L181, Add the DPU DB macros in schema. --- common/schema.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/schema.h b/common/schema.h index 6bea1d404..48d4d5d83 100644 --- a/common/schema.h +++ b/common/schema.h @@ -23,6 +23,10 @@ namespace swss { #define CHASSIS_APP_DB 12 #define CHASSIS_STATE_DB 13 #define APPL_STATE_DB 14 +#define DPU_APPL_DB 15 +#define DPU_APPL_STATE_DB 16 +#define DPU_STATE_DB 17 +#define DPU_COUNTERS_DB 18 #define EVENT_DB 19 #define BMP_STATE_DB 20 From 898aa5dbee22920847dafb0849907d55c8a84816 Mon Sep 17 00:00:00 2001 From: Hua Liu <58683130+liuh-80@users.noreply.github.com> Date: Mon, 30 Sep 2024 08:59:37 +0800 Subject: [PATCH 6/8] Add VRF support to ZMQ server/client (#920) --- common/zmqclient.cpp | 15 +++++++++++++-- common/zmqclient.h | 6 ++++-- common/zmqserver.cpp | 13 ++++++++++++- common/zmqserver.h | 3 +++ 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index e6cb07da9..0225d4374 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -16,8 +16,13 @@ using namespace std; namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) +:ZmqClient(endpoint, "") { - initialize(endpoint); +} + +ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) +{ + initialize(endpoint, vrf); } ZmqClient::~ZmqClient() @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient() } } -void ZmqClient::initialize(const std::string& endpoint) +void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) { m_connected = false; m_endpoint = endpoint; m_context = nullptr; m_socket = nullptr; + m_vrf = vrf; connect(); } @@ -89,6 +95,11 @@ void ZmqClient::connect() int high_watermark = MQ_WATERMARK; zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str()); int rc = zmq_connect(m_socket, m_endpoint.c_str()); if (rc != 0) diff --git a/common/zmqclient.h b/common/zmqclient.h index 3f56cc299..313e65735 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -13,6 +13,7 @@ class ZmqClient { public: ZmqClient(const std::string& endpoint); + ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); bool isConnected(); @@ -24,11 +25,12 @@ class ZmqClient const std::vector& kcos, std::vector& sendbuffer); private: - void initialize(const std::string& endpoint); - + void initialize(const std::string& endpoint, const std::string& vrf); std::string m_endpoint; + std::string m_vrf; + void* m_context; void* m_socket; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index d553c2a26..4800b9ba2 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -12,7 +12,13 @@ using namespace std; namespace swss { ZmqServer::ZmqServer(const std::string& endpoint) - : m_endpoint(endpoint) + : ZmqServer(endpoint, "") +{ +} + +ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) + : m_endpoint(endpoint), + m_vrf(vrf) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread() int high_watermark = MQ_WATERMARK; zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { diff --git a/common/zmqserver.h b/common/zmqserver.h index 002e78b1a..8afe18d7c 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,6 +31,7 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); + ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); void registerMessageHandler( @@ -53,6 +54,8 @@ class ZmqServer std::string m_endpoint; + std::string m_vrf; + std::map> m_HandlerMap; }; From 352234ae773dc6010ea2fd3b3446ced48f33ee0a Mon Sep 17 00:00:00 2001 From: Vijaya Kumar Abbaraju Date: Sat, 12 Oct 2024 02:58:21 +0530 Subject: [PATCH 7/8] Schema.h Changes to support PAC functionality. (#871) #### Why I did it Added below tables to support PAC functionality. #define APP_PAC_PORT_TABLE_NAME "PAC_PORT_TABLE" #define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE" #define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE" #define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE" #define STATE_PAC_GLOBAL_OPER_TABLE "PAC_GLOBAL_OPER_TABLE" #define STATE_PAC_PORT_OPER_TABLE "PAC_PORT_OPER_TABLE" #define STATE_PAC_AUTHENTICATED_CLIENT_OPER_TABLE "PAC_AUTHENTICATED_CLIENT_OPER_TABLE" #define STATE_OPER_VLAN_TABLE_NAME "OPER_VLAN" #define STATE_OPER_VLAN_MEMBER_TABLE_NAME "OPER_VLAN_MEMBER" #define STATE_OPER_FDB_TABLE_NAME "OPER_FDB" #define STATE_OPER_PORT_TABLE_NAME "OPER_PORT" --- common/schema.h | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/common/schema.h b/common/schema.h index 48d4d5d83..27aecdb0c 100644 --- a/common/schema.h +++ b/common/schema.h @@ -176,6 +176,8 @@ namespace swss { #define APP_DASH_PA_VALIDATION_TABLE_NAME "DASH_PA_VALIDATION_TABLE" #define APP_DASH_ROUTING_APPLIANCE_TABLE_NAME "DASH_ROUTING_APPLIANCE_TABLE" +#define APP_PAC_PORT_TABLE_NAME "PAC_PORT_TABLE" + /***** TO BE REMOVED *****/ #define APP_TC_TO_QUEUE_MAP_TABLE_NAME "TC_TO_QUEUE_MAP_TABLE" @@ -465,6 +467,11 @@ namespace swss { #define CFG_SUPPRESS_ASIC_SDK_HEALTH_EVENT_NAME "SUPPRESS_ASIC_SDK_HEALTH_EVENT" +#define CFG_PAC_PORT_CONFIG_TABLE "PAC_PORT_CONFIG_TABLE" +#define CFG_PAC_GLOBAL_CONFIG_TABLE "PAC_GLOBAL_CONFIG_TABLE" +#define CFG_PAC_HOSTAPD_GLOBAL_CONFIG_TABLE "HOSTAPD_GLOBAL_CONFIG_TABLE" + + /***** STATE DATABASE *****/ #define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY" @@ -549,6 +556,16 @@ namespace swss { #define STATE_ACL_TABLE_TABLE_NAME "ACL_TABLE_TABLE" #define STATE_ACL_RULE_TABLE_NAME "ACL_RULE_TABLE" + +/*PAC*/ +#define STATE_PAC_GLOBAL_OPER_TABLE "PAC_GLOBAL_OPER_TABLE" +#define STATE_PAC_PORT_OPER_TABLE "PAC_PORT_OPER_TABLE" +#define STATE_PAC_AUTHENTICATED_CLIENT_OPER_TABLE "PAC_AUTHENTICATED_CLIENT_OPER_TABLE" +#define STATE_OPER_VLAN_TABLE_NAME "OPER_VLAN" +#define STATE_OPER_VLAN_MEMBER_TABLE_NAME "OPER_VLAN_MEMBER" +#define STATE_OPER_FDB_TABLE_NAME "OPER_FDB" +#define STATE_OPER_PORT_TABLE_NAME "OPER_PORT" + /***** Counter capability tables for Queue and Port ****/ #define STATE_QUEUE_COUNTER_CAPABILITIES_NAME "QUEUE_COUNTER_CAPABILITIES" #define STATE_PORT_COUNTER_CAPABILITIES_NAME "PORT_COUNTER_CAPABILITIES" From 45d7cb010e709d340da8a2a21e1efcea239a8499 Mon Sep 17 00:00:00 2001 From: erer1243 <1377477+erer1243@users.noreply.github.com> Date: Tue, 29 Oct 2024 20:23:24 -0400 Subject: [PATCH 8/8] Initial implementation of C api (#915) Implement a C interface to some of libswsscommon in support of sonic-dash-ha. Related: https://github.com/sonic-net/sonic-dash-ha/pull/6 https://github.com/sonic-net/sonic-swss-common/pull/921 Incoming follow up PR: https://github.com/erer1243/sonic-swss-common/pull/1 --------- Co-authored-by: erer1243 --- common/Makefile.am | 11 +- common/binaryserializer.h | 26 +- common/c-api/consumerstatetable.cpp | 34 +++ common/c-api/consumerstatetable.h | 28 +++ common/c-api/dbconnector.cpp | 84 +++++++ common/c-api/dbconnector.h | 101 ++++++++ common/c-api/producerstatetable.cpp | 53 ++++ common/c-api/producerstatetable.h | 44 ++++ common/c-api/subscriberstatetable.cpp | 52 ++++ common/c-api/subscriberstatetable.h | 43 ++++ common/c-api/util.cpp | 3 + common/c-api/util.h | 181 ++++++++++++++ common/c-api/zmqclient.cpp | 35 +++ common/c-api/zmqclient.h | 30 +++ common/c-api/zmqconsumerstatetable.cpp | 58 +++++ common/c-api/zmqconsumerstatetable.h | 48 ++++ common/c-api/zmqproducerstatetable.cpp | 29 +++ common/c-api/zmqproducerstatetable.h | 32 +++ common/c-api/zmqserver.cpp | 14 ++ common/c-api/zmqserver.h | 20 ++ common/dbconnector.cpp | 33 ++- common/dbconnector.h | 10 +- common/zmqserver.cpp | 5 +- debian/libswsscommon-dev.install | 1 + tests/Makefile.am | 1 + tests/c_api_ut.cpp | 325 +++++++++++++++++++++++++ tests/main.cpp | 4 + 27 files changed, 1282 insertions(+), 23 deletions(-) create mode 100644 common/c-api/consumerstatetable.cpp create mode 100644 common/c-api/consumerstatetable.h create mode 100644 common/c-api/dbconnector.cpp create mode 100644 common/c-api/dbconnector.h create mode 100644 common/c-api/producerstatetable.cpp create mode 100644 common/c-api/producerstatetable.h create mode 100644 common/c-api/subscriberstatetable.cpp create mode 100644 common/c-api/subscriberstatetable.h create mode 100644 common/c-api/util.cpp create mode 100644 common/c-api/util.h create mode 100644 common/c-api/zmqclient.cpp create mode 100644 common/c-api/zmqclient.h create mode 100644 common/c-api/zmqconsumerstatetable.cpp create mode 100644 common/c-api/zmqconsumerstatetable.h create mode 100644 common/c-api/zmqproducerstatetable.cpp create mode 100644 common/c-api/zmqproducerstatetable.h create mode 100644 common/c-api/zmqserver.cpp create mode 100644 common/c-api/zmqserver.h create mode 100644 tests/c_api_ut.cpp diff --git a/common/Makefile.am b/common/Makefile.am index 18cfd8035..5d1de753e 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -68,7 +68,16 @@ common_libswsscommon_la_SOURCES = \ common/zmqclient.cpp \ common/zmqserver.cpp \ common/asyncdbupdater.cpp \ - common/redis_table_waiter.cpp + common/redis_table_waiter.cpp \ + common/c-api/util.cpp \ + common/c-api/dbconnector.cpp \ + common/c-api/consumerstatetable.cpp \ + common/c-api/producerstatetable.cpp \ + common/c-api/subscriberstatetable.cpp \ + common/c-api/zmqclient.cpp \ + common/c-api/zmqserver.cpp \ + common/c-api/zmqconsumerstatetable.cpp \ + common/c-api/zmqproducerstatetable.cpp common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS) common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS) diff --git a/common/binaryserializer.h b/common/binaryserializer.h index 413ca5010..6ae4dcd25 100644 --- a/common/binaryserializer.h +++ b/common/binaryserializer.h @@ -2,6 +2,8 @@ #define __BINARY_SERIALIZER__ #include "common/armhelper.h" +#include "common/rediscommand.h" +#include "common/table.h" #include @@ -11,6 +13,26 @@ namespace swss { class BinarySerializer { public: + static size_t serializedSize(const string &dbName, const string &tableName, + const vector &kcos) { + size_t n = 0; + n += dbName.size() + sizeof(size_t); + n += tableName.size() + sizeof(size_t); + + for (const KeyOpFieldsValuesTuple &kco : kcos) { + const vector &fvs = kfvFieldsValues(kco); + n += kfvKey(kco).size() + sizeof(size_t); + n += to_string(fvs.size()).size() + sizeof(size_t); + + for (const FieldValueTuple &fv : fvs) { + n += fvField(fv).size() + sizeof(size_t); + n += fvValue(fv).size() + sizeof(size_t); + } + } + + return n + sizeof(size_t); + } + static size_t serializeBuffer( const char* buffer, const size_t size, @@ -192,8 +214,8 @@ class BinarySerializer { { if ((size_t)(m_current_position - m_buffer + datalen + sizeof(size_t)) > m_buffer_size) { - SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\ - key count: %zu, data length %zu, buffer size: %zu", + SWSS_LOG_THROW("There are not enough buffer for binary serializer to serialize,\n" + " key count: %zu, data length %zu, buffer size: %zu", m_kvp_count, datalen, m_buffer_size); diff --git a/common/c-api/consumerstatetable.cpp b/common/c-api/consumerstatetable.cpp new file mode 100644 index 000000000..c01ed8229 --- /dev/null +++ b/common/c-api/consumerstatetable.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +#include "../consumerstatetable.h" +#include "../dbconnector.h" +#include "../table.h" +#include "consumerstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSConsumerStateTable) new ConsumerStateTable( + (DBConnector *)db, string(tableName), popBatchSize, pri)); +} + +void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl) { + SWSSTry(delete (ConsumerStateTable *)tbl); +} + +SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl) { + SWSSTry({ + deque vkco; + ((ConsumerStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} diff --git a/common/c-api/consumerstatetable.h b/common/c-api/consumerstatetable.h new file mode 100644 index 000000000..bd2fdaaf0 --- /dev/null +++ b/common/c-api/consumerstatetable.h @@ -0,0 +1,28 @@ +#ifndef SWSS_COMMON_C_API_CONSUMERSTATETABLE_H +#define SWSS_COMMON_C_API_CONSUMERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSConsumerStateTableOpaque *SWSSConsumerStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSConsumerStateTable SWSSConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *popBatchSize, const int32_t *pri); + +void SWSSConsumerStateTable_free(SWSSConsumerStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSConsumerStateTable_pops(SWSSConsumerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/dbconnector.cpp b/common/c-api/dbconnector.cpp new file mode 100644 index 000000000..bb32f42aa --- /dev/null +++ b/common/c-api/dbconnector.cpp @@ -0,0 +1,84 @@ +#include +#include + +#include "../dbconnector.h" +#include "dbconnector.h" +#include "util.h" + +using namespace swss; +using namespace std; + +void SWSSSonicDBConfig_initialize(const char *path) { + SWSSTry(SonicDBConfig::initialize(path)); +} + +void SWSSSonicDBConfig_initializeGlobalConfig(const char *path) { + SWSSTry(SonicDBConfig::initializeGlobalConfig(path)); +} + +SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port, + uint32_t timeout) { + SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(hostname), port, timeout)); +} + +SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout) { + SWSSTry(return (SWSSDBConnector) new DBConnector(dbId, string(sock_path), timeout)); +} + +SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn) { + SWSSTry(return (SWSSDBConnector) new DBConnector(string(dbName), timeout_ms, isTcpConn)); +} + +void SWSSDBConnector_free(SWSSDBConnector db) { + delete (DBConnector *)db; +} + +int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key) { + SWSSTry(return ((DBConnector *)db)->del(string(key)) ? 1 : 0); +} + +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value) { + SWSSTry(((DBConnector *)db)->set(string(key), string(value))); +} + +char *SWSSDBConnector_get(SWSSDBConnector db, const char *key) { + SWSSTry({ + shared_ptr s = ((DBConnector *)db)->get(string(key)); + return s ? strdup(s->c_str()) : nullptr; + }); +} + +int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key) { + SWSSTry(return ((DBConnector *)db)->exists(string(key)) ? 1 : 0); +} + +int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry(return ((DBConnector *)db)->hdel(string(key), string(field)) ? 1 : 0); +} + +void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, + const char *value) { + SWSSTry(((DBConnector *)db)->hset(string(key), string(field), string(value))); +} + +char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry({ + shared_ptr s = ((DBConnector *)db)->hget(string(key), string(field)); + return s ? strdup(s->c_str()) : nullptr; + }); +} + +SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key) { + SWSSTry({ + auto map = ((DBConnector *)db)->hgetall(key); + return makeFieldValueArray(map); + }); +} + +int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field) { + SWSSTry(return ((DBConnector *)db)->hexists(string(key), string(field)) ? 1 : 0); +} + +int8_t SWSSDBConnector_flushdb(SWSSDBConnector db) { + SWSSTry(return ((DBConnector *)db)->flushdb() ? 1 : 0); +} diff --git a/common/c-api/dbconnector.h b/common/c-api/dbconnector.h new file mode 100644 index 000000000..8e6c51e0b --- /dev/null +++ b/common/c-api/dbconnector.h @@ -0,0 +1,101 @@ +#ifndef SWSS_COMMON_C_API_DBCONNECTOR_H +#define SWSS_COMMON_C_API_DBCONNECTOR_H + +#include "util.h" +#ifdef __cplusplus +extern "C" { +#endif + +#include + +void SWSSSonicDBConfig_initialize(const char *path); + +void SWSSSonicDBConfig_initializeGlobalConfig(const char *path); + +typedef struct SWSSDBConnectorOpaque *SWSSDBConnector; + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_tcp(int32_t dbId, const char *hostname, uint16_t port, + uint32_t timeout_ms); + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_unix(int32_t dbId, const char *sock_path, uint32_t timeout_ms); + +// Pass 0 to timeout for infinity +SWSSDBConnector SWSSDBConnector_new_named(const char *dbName, uint32_t timeout_ms, uint8_t isTcpConn); + +void SWSSDBConnector_free(SWSSDBConnector db); + +// Returns 0 when key doesn't exist, 1 when key was deleted +int8_t SWSSDBConnector_del(SWSSDBConnector db, const char *key); + +void SWSSDBConnector_set(SWSSDBConnector db, const char *key, const char *value); + +// Returns NULL if key doesn't exist. +// Result must be freed using free() +char *SWSSDBConnector_get(SWSSDBConnector db, const char *key); + +// Returns 0 for false, 1 for true +int8_t SWSSDBConnector_exists(SWSSDBConnector db, const char *key); + +// Returns 0 when key or field doesn't exist, 1 when field was deleted +int8_t SWSSDBConnector_hdel(SWSSDBConnector db, const char *key, const char *field); + +void SWSSDBConnector_hset(SWSSDBConnector db, const char *key, const char *field, + const char *value); + +// Returns NULL if key or field doesn't exist. +// Result must be freed using free() +char *SWSSDBConnector_hget(SWSSDBConnector db, const char *key, const char *field); + +// Returns an empty map when the key doesn't exist. +// Result array and all of its elements must be freed using free() +SWSSFieldValueArray SWSSDBConnector_hgetall(SWSSDBConnector db, const char *key); + +// Returns 0 when key or field doesn't exist, 1 when field exists +int8_t SWSSDBConnector_hexists(SWSSDBConnector db, const char *key, const char *field); + +// std::vector keys(const std::string &key); + +// std::pair> scan(int cursor = 0, const char +// *match = "", uint32_t count = 10); + +// template +// void hmset(const std::string &key, InputIterator start, InputIterator stop); + +// void hmset(const std::unordered_map>>& multiHash); + +// std::shared_ptr get(const std::string &key); + +// std::shared_ptr hget(const std::string &key, const std::string +// &field); + +// int64_t incr(const std::string &key); + +// int64_t decr(const std::string &key); + +// int64_t rpush(const std::string &list, const std::string &item); + +// std::shared_ptr blpop(const std::string &list, int timeout); + +// void subscribe(const std::string &pattern); + +// void psubscribe(const std::string &pattern); + +// void punsubscribe(const std::string &pattern); + +// int64_t publish(const std::string &channel, const std::string &message); + +// void config_set(const std::string &key, const std::string &value); + +// Returns 1 on success, 0 on failure +int8_t SWSSDBConnector_flushdb(SWSSDBConnector db); + +// std::map>> getall(); +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/producerstatetable.cpp b/common/c-api/producerstatetable.cpp new file mode 100644 index 000000000..083536d7e --- /dev/null +++ b/common/c-api/producerstatetable.cpp @@ -0,0 +1,53 @@ +#include +#include + +#include "../dbconnector.h" +#include "../producerstatetable.h" +#include "dbconnector.h" +#include "producerstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSProducerStateTable SWSSProducerStateTable_new(SWSSDBConnector db, const char *tableName) { + SWSSTry(return (SWSSProducerStateTable) new ProducerStateTable((DBConnector *)db, + string(tableName))); +} + +void SWSSProducerStateTable_free(SWSSProducerStateTable tbl) { + SWSSTry(delete ((ProducerStateTable *)tbl)); +} + +void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buffered) { + SWSSTry(((ProducerStateTable *)tbl)->setBuffered((bool)buffered)) +} + +void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, + SWSSFieldValueArray values) { + SWSSTry(((ProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values))); +} + +void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key) { + SWSSTry(((ProducerStateTable *)tbl)->del(string(key))); +} + +void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->flush()); +} + +int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl) { + SWSSTry(return ((ProducerStateTable *)tbl)->count()); +} + +void SWSSProducerStateTable_clear(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->clear()); +} + +void SWSSProducerStateTable_create_temp_view(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->create_temp_view()); +} + +void SWSSProducerStateTable_apply_temp_view(SWSSProducerStateTable tbl) { + SWSSTry(((ProducerStateTable *)tbl)->apply_temp_view()); +} diff --git a/common/c-api/producerstatetable.h b/common/c-api/producerstatetable.h new file mode 100644 index 000000000..e8db2c65d --- /dev/null +++ b/common/c-api/producerstatetable.h @@ -0,0 +1,44 @@ +#ifndef SWSS_COMMON_C_API_PRODUCERSTATETABLE_H +#define SWSS_COMMON_C_API_PRODUCERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSProducerStateTableOpaque *SWSSProducerStateTable; + +SWSSProducerStateTable SWSSProducerStateTable_new(SWSSDBConnector db, const char *tableName); + +void SWSSProducerStateTable_free(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_setBuffered(SWSSProducerStateTable tbl, uint8_t buffered); + +void SWSSProducerStateTable_set(SWSSProducerStateTable tbl, const char *key, SWSSFieldValueArray values); + +void SWSSProducerStateTable_del(SWSSProducerStateTable tbl, const char *key); + +// Batched version of set() and del(). +// virtual void set(const std::vector& values); + +// virtual void del(const std::vector& keys); + +void SWSSProducerStateTable_flush(SWSSProducerStateTable tbl); + +int64_t SWSSProducerStateTable_count(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_clear(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_create_temp_view(SWSSProducerStateTable tbl); + +void SWSSProducerStateTable_apply_temp_view(SWSSProducerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/subscriberstatetable.cpp b/common/c-api/subscriberstatetable.cpp new file mode 100644 index 000000000..b64829117 --- /dev/null +++ b/common/c-api/subscriberstatetable.cpp @@ -0,0 +1,52 @@ +#include +#include +#include +#include + +#include "../dbconnector.h" +#include "../subscriberstatetable.h" +#include "../table.h" +#include "subscriberstatetable.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSSubscriberStateTable) new SubscriberStateTable( + (DBConnector *)db, string(tableName), popBatchSize, pri)); +} + +void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl) { + delete (SubscriberStateTable *)tbl; +} + +SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl) { + SWSSTry({ + deque vkco; + ((SubscriberStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} + +uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl) { + SWSSTry(return ((SubscriberStateTable *)tbl)->hasData() ? 1 : 0); +} + +uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl) { + SWSSTry(return ((SubscriberStateTable *)tbl)->hasCachedData() ? 1 : 0); +} + +uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl) { + SWSSTry(return ((SubscriberStateTable *)tbl)->initializedWithData() ? 1 : 0); +} + +SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, + uint32_t timeout_ms) { + SWSSTry(return selectOne((SubscriberStateTable *)tbl, timeout_ms)); +} diff --git a/common/c-api/subscriberstatetable.h b/common/c-api/subscriberstatetable.h new file mode 100644 index 000000000..4501a3af4 --- /dev/null +++ b/common/c-api/subscriberstatetable.h @@ -0,0 +1,43 @@ +#ifndef SWSS_COMMON_C_API_SUBSCRIBERSTATETABLE_H +#define SWSS_COMMON_C_API_SUBSCRIBERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSSubscriberStateTableOpaque *SWSSSubscriberStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSSubscriberStateTable SWSSSubscriberStateTable_new(SWSSDBConnector db, const char *tableName, + const int32_t *popBatchSize, + const int32_t *pri); + +void SWSSSubscriberStateTable_free(SWSSSubscriberStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSSubscriberStateTable_pops(SWSSSubscriberStateTable tbl); + +// Returns 0 for false, 1 for true +uint8_t SWSSSubscriberStateTable_hasData(SWSSSubscriberStateTable tbl); + +// Returns 0 for false, 1 for true +uint8_t SWSSSubscriberStateTable_hasCachedData(SWSSSubscriberStateTable tbl); + +// Returns 0 for false, 1 for true +uint8_t SWSSSubscriberStateTable_initializedWithData(SWSSSubscriberStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSSubscriberStateTable_readData(SWSSSubscriberStateTable tbl, + uint32_t timeout_ms); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/util.cpp b/common/c-api/util.cpp new file mode 100644 index 000000000..fb983d5cf --- /dev/null +++ b/common/c-api/util.cpp @@ -0,0 +1,3 @@ +#include "util.h" + +bool swss::cApiTestingDisableAbort = false; diff --git a/common/c-api/util.h b/common/c-api/util.h new file mode 100644 index 000000000..79eb93cfd --- /dev/null +++ b/common/c-api/util.h @@ -0,0 +1,181 @@ +#ifndef SWSS_COMMON_C_API_UTIL_H +#define SWSS_COMMON_C_API_UTIL_H + +// External utilities (c-facing) +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct { + const char *field; + const char *value; +} SWSSFieldValuePair; + +typedef struct { + uint64_t len; + const SWSSFieldValuePair *data; +} SWSSFieldValueArray; + +typedef struct { + const char *key; + const char *operation; + SWSSFieldValueArray fieldValues; +} SWSSKeyOpFieldValues; + +typedef struct { + uint64_t len; + const SWSSKeyOpFieldValues *data; +} SWSSKeyOpFieldValuesArray; + +typedef enum { + SWSSSelectResult_DATA = 0, + SWSSSelectResult_TIMEOUT = 1, + SWSSSelectResult_SIGNAL = 2, +} SWSSSelectResult; + +#ifdef __cplusplus +} +#endif + +// Internal utilities (used to help define c-facing functions) +#ifdef __cplusplus +#include +#include +#include +#include +#include +#include + +#include "../logger.h" +#include "../rediscommand.h" +#include "../select.h" + +using boost::numeric_cast; + +namespace swss { + +extern bool cApiTestingDisableAbort; + +// In the catch block, we must abort because passing an exception across an ffi boundary is +// undefined behavior. It was also decided that no exceptions in swss-common are recoverable, so +// there is no reason to convert exceptions into a returnable type. +#define SWSSTry(...) \ + if (cApiTestingDisableAbort) { \ + { __VA_ARGS__; } \ + } else { \ + try { \ + { __VA_ARGS__; } \ + } catch (std::exception & e) { \ + std::cerr << "Aborting due to exception: " << e.what() << std::endl; \ + SWSS_LOG_ERROR("Aborting due to exception: %s", e.what()); \ + std::abort(); \ + } \ + } + +static inline SWSSSelectResult selectOne(swss::Selectable *s, uint32_t timeout_ms) { + Select select; + Selectable *sOut; + select.addSelectable(s); + int ret = select.select(&sOut, numeric_cast(timeout_ms)); + switch (ret) { + case Select::OBJECT: + return SWSSSelectResult_DATA; + case Select::ERROR: + throw std::system_error(errno, std::generic_category()); + case Select::TIMEOUT: + return SWSSSelectResult_TIMEOUT; + case Select::SIGNALINT: + return SWSSSelectResult_SIGNAL; + default: + SWSS_LOG_THROW("impossible: unhandled Select::select() return value: %d", ret); + } +} + +// malloc() with safe numeric casting of the size parameter +template static inline void *mallocN(N size) { + return malloc(numeric_cast(size)); +} + +// T is anything that has a .size() method and which can be iterated over for pair +// eg unordered_map or vector> +template static inline SWSSFieldValueArray makeFieldValueArray(const T &in) { + SWSSFieldValuePair *data = + (SWSSFieldValuePair *)mallocN(in.size() * sizeof(SWSSFieldValuePair)); + + size_t i = 0; + for (const auto &pair : in) { + SWSSFieldValuePair entry; + entry.field = strdup(pair.first.c_str()); + entry.value = strdup(pair.second.c_str()); + data[i++] = entry; + } + + SWSSFieldValueArray out; + out.len = (uint64_t)in.size(); + out.data = data; + return out; +} + +static inline std::vector +takeFieldValueArray(const SWSSFieldValueArray &in) { + std::vector out; + for (uint64_t i = 0; i < in.len; i++) { + auto field = std::string(in.data[i].field); + auto value = std::string(in.data[i].value); + out.push_back(std::make_pair(field, value)); + } + return out; +} + +static inline SWSSKeyOpFieldValues makeKeyOpFieldValues(const swss::KeyOpFieldsValuesTuple &in) { + SWSSKeyOpFieldValues out; + out.key = strdup(kfvKey(in).c_str()); + out.operation = strdup(kfvOp(in).c_str()); + out.fieldValues = makeFieldValueArray(kfvFieldsValues(in)); + return out; +} + +static inline swss::KeyOpFieldsValuesTuple takeKeyOpFieldValues(const SWSSKeyOpFieldValues &in) { + std::string key(in.key), op(in.operation); + auto fieldValues = takeFieldValueArray(in.fieldValues); + return std::make_tuple(key, op, fieldValues); +} + +template static inline const T &getReference(const T &t) { + return t; +} + +template static inline const T &getReference(const std::shared_ptr &t) { + return *t; +} + +// T is anything that has a .size() method and which can be iterated over for +// swss::KeyOpFieldValuesTuple +template static inline SWSSKeyOpFieldValuesArray makeKeyOpFieldValuesArray(const T &in) { + SWSSKeyOpFieldValues *data = + (SWSSKeyOpFieldValues *)mallocN(in.size() * sizeof(SWSSKeyOpFieldValues)); + + size_t i = 0; + for (const auto &kfv : in) + data[i++] = makeKeyOpFieldValues(getReference(kfv)); + + SWSSKeyOpFieldValuesArray out; + out.len = (uint64_t)in.size(); + out.data = data; + return out; +} + +static inline std::vector +takeKeyOpFieldValuesArray(const SWSSKeyOpFieldValuesArray &in) { + std::vector out; + for (uint64_t i = 0; i < in.len; i++) + out.push_back(takeKeyOpFieldValues(in.data[i])); + return out; +} + +} // namespace swss + +#endif +#endif diff --git a/common/c-api/zmqclient.cpp b/common/c-api/zmqclient.cpp new file mode 100644 index 000000000..7e4a58f87 --- /dev/null +++ b/common/c-api/zmqclient.cpp @@ -0,0 +1,35 @@ +#include "../zmqclient.h" +#include "../binaryserializer.h" +#include "util.h" +#include "zmqclient.h" + +using namespace swss; +using namespace std; + +SWSSZmqClient SWSSZmqClient_new(const char *endpoint) { + SWSSTry(return (SWSSZmqClient) new ZmqClient(endpoint)); +} + +void SWSSZmqClient_free(SWSSZmqClient zmqc) { + SWSSTry(delete (ZmqClient *)zmqc); +} + +// Returns 0 for false, 1 for true +int8_t SWSSZmqClient_isConnected(SWSSZmqClient zmqc) { + SWSSTry(return ((ZmqClient *)zmqc)->isConnected() ? 1 : 0); +} + +void SWSSZmqClient_connect(SWSSZmqClient zmqc) { + SWSSTry(((ZmqClient *)zmqc)->connect()); +} + +void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, + const SWSSKeyOpFieldValuesArray *arr) { + SWSSTry({ + vector kcos = takeKeyOpFieldValuesArray(*arr); + size_t bufSize = BinarySerializer::serializedSize(dbName, tableName, kcos); + vector v(bufSize); + ((ZmqClient *)zmqc) + ->sendMsg(string(dbName), string(tableName), kcos, v); + }); +} diff --git a/common/c-api/zmqclient.h b/common/c-api/zmqclient.h new file mode 100644 index 000000000..47cd1efba --- /dev/null +++ b/common/c-api/zmqclient.h @@ -0,0 +1,30 @@ +#ifndef SWSS_COMMON_C_API_ZMQCLIENT_H +#define SWSS_COMMON_C_API_ZMQCLIENT_H + +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSZmqClientOpaque *SWSSZmqClient; + +SWSSZmqClient SWSSZmqClient_new(const char *endpoint); + +void SWSSZmqClient_free(SWSSZmqClient zmqc); + +// Returns 0 for false, 1 for true +int8_t SWSSZmqClient_isConnected(SWSSZmqClient zmqc); + +void SWSSZmqClient_connect(SWSSZmqClient zmqc); + +void SWSSZmqClient_sendMsg(SWSSZmqClient zmqc, const char *dbName, const char *tableName, + const SWSSKeyOpFieldValuesArray *kcos); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqconsumerstatetable.cpp b/common/c-api/zmqconsumerstatetable.cpp new file mode 100644 index 000000000..38cd87f93 --- /dev/null +++ b/common/c-api/zmqconsumerstatetable.cpp @@ -0,0 +1,58 @@ +#include "../zmqconsumerstatetable.h" +#include "../table.h" +#include "util.h" +#include "zmqconsumerstatetable.h" +#include "zmqserver.h" + +using namespace swss; +using namespace std; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSZmqConsumerStateTable SWSSZmqConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqServer zmqs, + const int32_t *p_popBatchSize, + const int32_t *p_pri) { + + int popBatchSize = p_popBatchSize ? numeric_cast(*p_popBatchSize) + : TableConsumable::DEFAULT_POP_BATCH_SIZE; + int pri = p_pri ? numeric_cast(*p_pri) : 0; + SWSSTry(return (SWSSZmqConsumerStateTable) new ZmqConsumerStateTable( + (DBConnector *)db, string(tableName), *(ZmqServer *)zmqs, popBatchSize, pri)); +} + +void SWSSZmqConsumerStateTable_free(SWSSZmqConsumerStateTable tbl) { + SWSSTry(delete (ZmqConsumerStateTable *)tbl); +} + +SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTable tbl) { + SWSSTry({ + deque vkco; + ((ZmqConsumerStateTable *)tbl)->pops(vkco); + return makeKeyOpFieldValuesArray(vkco); + }); +} + +SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, + uint32_t timeout_ms) { + SWSSTry(return selectOne((ZmqConsumerStateTable *)tbl, timeout_ms)); +} + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_hasData(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return ((ZmqConsumerStateTable *)tbl)->hasData() ? 1 : 0); +} + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_hasCachedData(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return ((ZmqConsumerStateTable *)tbl)->hasCachedData() ? 1 : 0); +} + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_initializedWithData(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return ((ZmqConsumerStateTable *)tbl)->initializedWithData() ? 1 : 0); +} + +const struct SWSSDBConnectorOpaque * +SWSSZmqConsumerStateTable_getDbConnector(SWSSZmqConsumerStateTable tbl) { + SWSSTry(return (const SWSSDBConnectorOpaque *)((ZmqConsumerStateTable *)tbl)->getDbConnector()); +} diff --git a/common/c-api/zmqconsumerstatetable.h b/common/c-api/zmqconsumerstatetable.h new file mode 100644 index 000000000..4810c3ef5 --- /dev/null +++ b/common/c-api/zmqconsumerstatetable.h @@ -0,0 +1,48 @@ +#ifndef SWSS_COMMON_C_API_ZMQCONSUMERSTATETABLE_H +#define SWSS_COMMON_C_API_ZMQCONSUMERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" +#include "zmqserver.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct SWSSZmqConsumerStateTableOpaque *SWSSZmqConsumerStateTable; + +// Pass NULL for popBatchSize and/or pri to use the default values +SWSSZmqConsumerStateTable SWSSZmqConsumerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqServer zmqs, + const int32_t *popBatchSize, + const int32_t *pri); + +void SWSSZmqConsumerStateTable_free(SWSSZmqConsumerStateTable tbl); + +// Result array and all of its members must be freed using free() +SWSSKeyOpFieldValuesArray SWSSZmqConsumerStateTable_pops(SWSSZmqConsumerStateTable tbl); + +// Block until data is available to read or until a timeout elapses. +// A timeout of 0 means the call will return immediately. +SWSSSelectResult SWSSZmqConsumerStateTable_readData(SWSSZmqConsumerStateTable tbl, + uint32_t timeout_ms); + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_hasData(SWSSZmqConsumerStateTable tbl); + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_hasCachedData(SWSSZmqConsumerStateTable tbl); + +// Returns 0 for false, 1 for true +uint8_t SWSSZmqConsumerStateTable_initializedWithData(SWSSZmqConsumerStateTable tbl); + +const struct SWSSDBConnectorOpaque * +SWSSZmqConsumerStateTable_getDbConnector(SWSSZmqConsumerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqproducerstatetable.cpp b/common/c-api/zmqproducerstatetable.cpp new file mode 100644 index 000000000..3e50916e1 --- /dev/null +++ b/common/c-api/zmqproducerstatetable.cpp @@ -0,0 +1,29 @@ +#include "zmqproducerstatetable.h" +#include "../zmqproducerstatetable.h" + +using namespace std; +using namespace swss; + +SWSSZmqProducerStateTable SWSSZmqProducerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqClient zmqc, uint8_t dbPersistence) { + + SWSSTry(return (SWSSZmqProducerStateTable) new ZmqProducerStateTable( + (DBConnector *)db, string(tableName), *(ZmqClient *)zmqc, dbPersistence)); +} + +void SWSSZmqProducerStateTable_free(SWSSZmqProducerStateTable tbl) { + SWSSTry(delete (ZmqProducerStateTable *)tbl); +} + +void SWSSZmqProducerStateTable_set(SWSSZmqProducerStateTable tbl, const char *key, + SWSSFieldValueArray values) { + SWSSTry(((ZmqProducerStateTable *)tbl)->set(string(key), takeFieldValueArray(values))); +} + +void SWSSZmqProducerStateTable_del(SWSSZmqProducerStateTable tbl, const char *key) { + SWSSTry(((ZmqProducerStateTable *)tbl)->del(string(key))); +} + +uint64_t SWSSZmqProducerStateTable_dbUpdaterQueueSize(SWSSZmqProducerStateTable tbl) { + SWSSTry(return numeric_cast(((ZmqProducerStateTable *)tbl)->dbUpdaterQueueSize())); +} diff --git a/common/c-api/zmqproducerstatetable.h b/common/c-api/zmqproducerstatetable.h new file mode 100644 index 000000000..08d059186 --- /dev/null +++ b/common/c-api/zmqproducerstatetable.h @@ -0,0 +1,32 @@ +#ifndef SWSS_COMMON_C_API_ZMQPRODUCERSTATETABLE_H +#define SWSS_COMMON_C_API_ZMQPRODUCERSTATETABLE_H + +#include "dbconnector.h" +#include "util.h" +#include "zmqclient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include "stdint.h" + +typedef struct SWSSZmqProducerStateTableOpaque *SWSSZmqProducerStateTable; + +SWSSZmqProducerStateTable SWSSZmqProducerStateTable_new(SWSSDBConnector db, const char *tableName, + SWSSZmqClient zmqc, uint8_t dbPersistence); + +void SWSSZmqProducerStateTable_free(SWSSZmqProducerStateTable tbl); + +void SWSSZmqProducerStateTable_set(SWSSZmqProducerStateTable tbl, const char *key, + SWSSFieldValueArray values); + +void SWSSZmqProducerStateTable_del(SWSSZmqProducerStateTable tbl, const char *key); + +uint64_t SWSSZmqProducerStateTable_dbUpdaterQueueSize(SWSSZmqProducerStateTable tbl); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/c-api/zmqserver.cpp b/common/c-api/zmqserver.cpp new file mode 100644 index 000000000..50452e22d --- /dev/null +++ b/common/c-api/zmqserver.cpp @@ -0,0 +1,14 @@ +#include "zmqserver.h" +#include "../zmqserver.h" +#include "util.h" + +using namespace swss; +using namespace std; + +SWSSZmqServer SWSSZmqServer_new(const char *endpoint) { + SWSSTry(return (SWSSZmqServer) new ZmqServer(string(endpoint))); +} + +void SWSSZmqServer_free(SWSSZmqServer zmqs) { + SWSSTry(delete (ZmqServer *)zmqs); +} diff --git a/common/c-api/zmqserver.h b/common/c-api/zmqserver.h new file mode 100644 index 000000000..decd0e0dc --- /dev/null +++ b/common/c-api/zmqserver.h @@ -0,0 +1,20 @@ +#ifndef SWSS_COMMON_C_API_ZMQSERVER_H +#define SWSS_COMMON_C_API_ZMQSERVER_H + +#include "util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SWSSZmqServerOpaque *SWSSZmqServer; + +SWSSZmqServer SWSSZmqServer_new(const char *endpoint); + +void SWSSZmqServer_free(SWSSZmqServer zmqs); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/common/dbconnector.cpp b/common/dbconnector.cpp index 0e044f3ea..96334780f 100755 --- a/common/dbconnector.cpp +++ b/common/dbconnector.cpp @@ -645,39 +645,46 @@ DBConnector::DBConnector(int dbId, const RedisContext& ctx) select(this); } +static struct timeval ms_to_timeval(unsigned int ms) { + return { + .tv_sec = (time_t)ms / 1000, + .tv_usec = ((suseconds_t)ms % 1000) * 1000 + }; +} + DBConnector::DBConnector(int dbId, const string& hostname, int port, - unsigned int timeout) + unsigned int timeout_ms) : m_dbId(dbId) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(hostname.c_str(), port, ptv); select(this); } -DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout) +DBConnector::DBConnector(int dbId, const string& unixPath, unsigned int timeout_ms) : m_dbId(dbId) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; initContext(unixPath.c_str(), ptv); select(this); } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn, const string& netns) - : DBConnector(dbName, timeout, isTcpConn, SonicDBKey(netns)) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const string& netns) + : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey(netns)) { } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn, const SonicDBKey &key) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn, const SonicDBKey &key) : m_dbId(SonicDBConfig::getDbId(dbName, key)) , m_dbName(dbName) , m_key(key) { - struct timeval tv = {0, (suseconds_t)timeout * 1000}; - struct timeval *ptv = timeout ? &tv : NULL; + struct timeval tv = ms_to_timeval(timeout_ms); + struct timeval *ptv = timeout_ms ? &tv : NULL; if (isTcpConn) { initContext(SonicDBConfig::getDbHostname(dbName, m_key).c_str(), SonicDBConfig::getDbPort(dbName, m_key), ptv); @@ -690,8 +697,8 @@ DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpC select(this); } -DBConnector::DBConnector(const string& dbName, unsigned int timeout, bool isTcpConn) - : DBConnector(dbName, timeout, isTcpConn, SonicDBKey()) +DBConnector::DBConnector(const string& dbName, unsigned int timeout_ms, bool isTcpConn) + : DBConnector(dbName, timeout_ms, isTcpConn, SonicDBKey()) { // Empty constructor } diff --git a/common/dbconnector.h b/common/dbconnector.h index c5bd48ad6..832983ed9 100644 --- a/common/dbconnector.h +++ b/common/dbconnector.h @@ -213,11 +213,11 @@ class DBConnector : public RedisContext */ explicit DBConnector(const DBConnector &other); DBConnector(int dbId, const RedisContext &ctx); - DBConnector(int dbId, const std::string &hostname, int port, unsigned int timeout); - DBConnector(int dbId, const std::string &unixPath, unsigned int timeout); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn = false); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn, const std::string &netns); - DBConnector(const std::string &dbName, unsigned int timeout, bool isTcpConn, const SonicDBKey &key); + DBConnector(int dbId, const std::string &hostname, int port, unsigned int timeout_ms); + DBConnector(int dbId, const std::string &unixPath, unsigned int timeout_ms); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn = false); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn, const std::string &netns); + DBConnector(const std::string &dbName, unsigned int timeout_ms, bool isTcpConn, const SonicDBKey &key); DBConnector& operator=(const DBConnector&) = delete; int getDbId() const; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 4800b9ba2..dca107405 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -106,9 +106,10 @@ void ZmqServer::mqPollThread() int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { - SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d", + SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d, message: %s", m_endpoint.c_str(), - zmq_errno()); + zmq_errno(), + strerror(zmq_errno())); } // zmq_poll will use less CPU diff --git a/debian/libswsscommon-dev.install b/debian/libswsscommon-dev.install index 1dd2670e9..85e3c4bca 100644 --- a/debian/libswsscommon-dev.install +++ b/debian/libswsscommon-dev.install @@ -1,2 +1,3 @@ common/*.h usr/include/swss +common/c-api/*.h usr/include/swss/c-api pyext/swsscommon.i usr/share/swss diff --git a/tests/Makefile.am b/tests/Makefile.am index a07fadc2e..39712b233 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -42,6 +42,7 @@ tests_tests_SOURCES = tests/redis_ut.cpp \ tests/binary_serializer_ut.cpp \ tests/zmq_state_ut.cpp \ tests/profileprovider_ut.cpp \ + tests/c_api_ut.cpp \ tests/main.cpp tests_tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS) diff --git a/tests/c_api_ut.cpp b/tests/c_api_ut.cpp new file mode 100644 index 000000000..d16dac7c1 --- /dev/null +++ b/tests/c_api_ut.cpp @@ -0,0 +1,325 @@ +#include +#include +#include + +#include "common/c-api/consumerstatetable.h" +#include "common/c-api/dbconnector.h" +#include "common/c-api/producerstatetable.h" +#include "common/c-api/subscriberstatetable.h" +#include "common/c-api/util.h" +#include "common/c-api/zmqclient.h" +#include "common/c-api/zmqconsumerstatetable.h" +#include "common/c-api/zmqproducerstatetable.h" +#include "common/c-api/zmqserver.h" +#include "common/select.h" +#include "common/subscriberstatetable.h" +#include "gtest/gtest.h" + +using namespace std; +using namespace swss; + +static void clearDB() { + DBConnector db("TEST_DB", 0, true); + RedisReply r(&db, "FLUSHALL", REDIS_REPLY_STATUS); + r.checkStatusOK(); +} + +static void sortKfvs(vector &kfvs) { + sort(kfvs.begin(), kfvs.end(), + [](const KeyOpFieldsValuesTuple &a, const KeyOpFieldsValuesTuple &b) { + return kfvKey(a) < kfvKey(b); + }); + + for (auto &kfv : kfvs) { + auto &fvs = kfvFieldsValues(kfv); + sort(fvs.begin(), fvs.end(), + [](const pair &a, const pair &b) { + return a.first < b.first; + }); + } +} + +template static void free(const T *ptr) { + std::free(const_cast(reinterpret_cast(ptr))); +} + +static void freeKeyOpFieldValuesArray(SWSSKeyOpFieldValuesArray arr) { + for (uint64_t i = 0; i < arr.len; i++) { + free(arr.data[i].key); + free(arr.data[i].operation); + for (uint64_t j = 0; j < arr.data[i].fieldValues.len; j++) { + free(arr.data[i].fieldValues.data[j].field); + free(arr.data[i].fieldValues.data[j].value); + } + free(arr.data[i].fieldValues.data); + } + free(arr.data); +} + +TEST(c_api, DBConnector) { + clearDB(); + + EXPECT_THROW(SWSSDBConnector_new_named("does not exist", 0, true), out_of_range); + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + EXPECT_FALSE(SWSSDBConnector_get(db, "mykey")); + EXPECT_FALSE(SWSSDBConnector_exists(db, "mykey")); + SWSSDBConnector_set(db, "mykey", "myval"); + const char *val = SWSSDBConnector_get(db, "mykey"); + EXPECT_STREQ(val, "myval"); + free(val); + EXPECT_TRUE(SWSSDBConnector_exists(db, "mykey")); + EXPECT_TRUE(SWSSDBConnector_del(db, "mykey")); + EXPECT_FALSE(SWSSDBConnector_del(db, "mykey")); + + EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "myfield")); + SWSSDBConnector_hset(db, "mykey", "myfield", "myval"); + val = SWSSDBConnector_hget(db, "mykey", "myfield"); + EXPECT_STREQ(val, "myval"); + free(val); + EXPECT_TRUE(SWSSDBConnector_hexists(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hget(db, "mykey", "notmyfield")); + EXPECT_FALSE(SWSSDBConnector_hexists(db, "mykey", "notmyfield")); + EXPECT_TRUE(SWSSDBConnector_hdel(db, "mykey", "myfield")); + EXPECT_FALSE(SWSSDBConnector_hdel(db, "mykey", "myfield")); + + EXPECT_TRUE(SWSSDBConnector_flushdb(db)); + SWSSDBConnector_free(db); +} + +TEST(c_api, ConsumerProducerStateTables) { + clearDB(); + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + SWSSProducerStateTable pst = SWSSProducerStateTable_new(db, "mytable"); + SWSSConsumerStateTable cst = SWSSConsumerStateTable_new(db, "mytable", nullptr, nullptr); + + SWSSKeyOpFieldValuesArray arr = SWSSConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSFieldValuePair data[2] = {{.field = "myfield1", .value = "myvalue1"}, + {.field = "myfield2", .value = "myvalue2"}}; + SWSSFieldValueArray values = { + .len = 2, + .data = data, + }; + SWSSProducerStateTable_set(pst, "mykey1", values); + + data[0] = {.field = "myfield3", .value = "myvalue3"}; + values.len = 1; + SWSSProducerStateTable_set(pst, "mykey2", values); + + arr = SWSSConsumerStateTable_pops(cst); + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey1"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + vector> &fieldValues0 = kfvFieldsValues(kfvs[0]); + ASSERT_EQ(fieldValues0.size(), 2); + EXPECT_EQ(fieldValues0[0].first, "myfield1"); + EXPECT_EQ(fieldValues0[0].second, "myvalue1"); + EXPECT_EQ(fieldValues0[1].first, "myfield2"); + EXPECT_EQ(fieldValues0[1].second, "myvalue2"); + + EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); + EXPECT_EQ(kfvOp(kfvs[1]), "SET"); + vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); + ASSERT_EQ(fieldValues1.size(), 1); + EXPECT_EQ(fieldValues1[0].first, "myfield3"); + EXPECT_EQ(fieldValues1[0].second, "myvalue3"); + + arr = SWSSConsumerStateTable_pops(cst); + EXPECT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSProducerStateTable_del(pst, "mykey3"); + SWSSProducerStateTable_del(pst, "mykey4"); + SWSSProducerStateTable_del(pst, "mykey5"); + + arr = SWSSConsumerStateTable_pops(cst); + kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 3); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); + EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[0]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[1]), "mykey4"); + EXPECT_EQ(kfvOp(kfvs[1]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[2]), "mykey5"); + EXPECT_EQ(kfvOp(kfvs[2]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[2]).size(), 0); + + SWSSProducerStateTable_free(pst); + SWSSConsumerStateTable_free(cst); + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} + +TEST(c_api, SubscriberStateTable) { + clearDB(); + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + SWSSSubscriberStateTable sst = SWSSSubscriberStateTable_new(db, "mytable", nullptr, nullptr); + + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_TIMEOUT); + EXPECT_FALSE(SWSSSubscriberStateTable_hasData(sst)); + SWSSKeyOpFieldValuesArray arr = SWSSSubscriberStateTable_pops(sst); + EXPECT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + SWSSDBConnector_hset(db, "mytable:mykey", "myfield", "myvalue"); + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_DATA); + EXPECT_EQ(SWSSSubscriberStateTable_readData(sst, 300), SWSSSelectResult_TIMEOUT); + EXPECT_TRUE(SWSSSubscriberStateTable_hasData(sst)); + + arr = SWSSSubscriberStateTable_pops(sst); + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + EXPECT_FALSE(SWSSSubscriberStateTable_hasData(sst)); + ASSERT_EQ(kfvs.size(), 1); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + ASSERT_EQ(kfvFieldsValues(kfvs[0]).size(), 1); + EXPECT_EQ(kfvFieldsValues(kfvs[0])[0].first, "myfield"); + EXPECT_EQ(kfvFieldsValues(kfvs[0])[0].second, "myvalue"); + + SWSSSubscriberStateTable_free(sst); + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} + +TEST(c_api, ZmqConsumerProducerStateTable) { + clearDB(); + + SWSSDBConnector db = SWSSDBConnector_new_named("TEST_DB", 1000, true); + + SWSSZmqServer srv = SWSSZmqServer_new("tcp://127.0.0.1:42312"); + SWSSZmqClient cli = SWSSZmqClient_new("tcp://127.0.0.1:42312"); + EXPECT_TRUE(SWSSZmqClient_isConnected(cli)); + SWSSZmqClient_connect(cli); // This should be idempotent/not throw + + SWSSZmqProducerStateTable pst = SWSSZmqProducerStateTable_new(db, "mytable", cli, false); + SWSSZmqConsumerStateTable cst = + SWSSZmqConsumerStateTable_new(db, "mytable", srv, nullptr, nullptr); + + ASSERT_EQ(SWSSZmqConsumerStateTable_getDbConnector(cst), db); + + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + EXPECT_FALSE(SWSSZmqConsumerStateTable_initializedWithData(cst)); + SWSSKeyOpFieldValuesArray arr = SWSSZmqConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + // On flag = 0, we use the ZmqProducerStateTable + // On flag = 1, we use the ZmqClient directly + for (int flag = 0; flag < 2; flag++) { + SWSSFieldValuePair values_key1_data[2] = {{.field = "myfield1", .value = "myvalue1"}, + {.field = "myfield2", .value = "myvalue2"}}; + SWSSFieldValueArray values_key1 = { + .len = 2, + .data = values_key1_data, + }; + + SWSSFieldValuePair values_key2_data[1] = {{.field = "myfield3", .value = "myvalue3"}}; + SWSSFieldValueArray values_key2 = { + .len = 1, + .data = values_key2_data, + }; + + SWSSKeyOpFieldValues arr_data[2] = { + {.key = "mykey1", .operation = "SET", .fieldValues = values_key1}, + {.key = "mykey2", .operation = "SET", .fieldValues = values_key2}}; + arr = {.len = 2, .data = arr_data}; + + if (flag == 0) + for (uint64_t i = 0; i < arr.len; i++) + SWSSZmqProducerStateTable_set(pst, arr.data[i].key, arr.data[i].fieldValues); + else + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", &arr); + + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_DATA); + EXPECT_TRUE(SWSSZmqConsumerStateTable_hasData(cst)); + EXPECT_TRUE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + arr = SWSSZmqConsumerStateTable_pops(cst); + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + EXPECT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_TIMEOUT); + + vector kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey1"); + EXPECT_EQ(kfvOp(kfvs[0]), "SET"); + vector> &fieldValues0 = kfvFieldsValues(kfvs[0]); + ASSERT_EQ(fieldValues0.size(), 2); + EXPECT_EQ(fieldValues0[0].first, "myfield1"); + EXPECT_EQ(fieldValues0[0].second, "myvalue1"); + EXPECT_EQ(fieldValues0[1].first, "myfield2"); + EXPECT_EQ(fieldValues0[1].second, "myvalue2"); + + EXPECT_EQ(kfvKey(kfvs[1]), "mykey2"); + EXPECT_EQ(kfvOp(kfvs[1]), "SET"); + vector> &fieldValues1 = kfvFieldsValues(kfvs[1]); + ASSERT_EQ(fieldValues1.size(), 1); + EXPECT_EQ(fieldValues1[0].first, "myfield3"); + EXPECT_EQ(fieldValues1[0].second, "myvalue3"); + + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); + arr = SWSSZmqConsumerStateTable_pops(cst); + ASSERT_EQ(arr.len, 0); + freeKeyOpFieldValuesArray(arr); + + arr_data[0] = {.key = "mykey3", .operation = "DEL", .fieldValues = {}}; + arr_data[1] = {.key = "mykey4", .operation = "DEL", .fieldValues = {}}; + arr = { .len = 2, .data = arr_data }; + + if (flag == 0) + for (uint64_t i = 0; i < arr.len; i++) + SWSSZmqProducerStateTable_del(pst, arr.data[i].key); + else + SWSSZmqClient_sendMsg(cli, "TEST_DB", "mytable", &arr); + + ASSERT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_DATA); + EXPECT_TRUE(SWSSZmqConsumerStateTable_hasData(cst)); + EXPECT_TRUE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + arr = SWSSZmqConsumerStateTable_pops(cst); + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasData(cst)); + EXPECT_FALSE(SWSSZmqConsumerStateTable_hasCachedData(cst)); + EXPECT_EQ(SWSSZmqConsumerStateTable_readData(cst, 500), SWSSSelectResult_TIMEOUT); + + kfvs = takeKeyOpFieldValuesArray(arr); + sortKfvs(kfvs); + freeKeyOpFieldValuesArray(arr); + + ASSERT_EQ(kfvs.size(), 2); + EXPECT_EQ(kfvKey(kfvs[0]), "mykey3"); + EXPECT_EQ(kfvOp(kfvs[0]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[0]).size(), 0); + EXPECT_EQ(kfvKey(kfvs[1]), "mykey4"); + EXPECT_EQ(kfvOp(kfvs[1]), "DEL"); + EXPECT_EQ(kfvFieldsValues(kfvs[1]).size(), 0); + } + + // Server must be freed first to safely release message handlers (ZmqConsumerStateTable) + SWSSZmqServer_free(srv); + + SWSSZmqProducerStateTable_free(pst); + SWSSZmqConsumerStateTable_free(cst); + + SWSSZmqClient_free(cli); + + SWSSDBConnector_flushdb(db); + SWSSDBConnector_free(db); +} diff --git a/tests/main.cpp b/tests/main.cpp index 6cbaf251d..440978a46 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -1,5 +1,6 @@ #include "gtest/gtest.h" #include "common/dbconnector.h" +#include "common/c-api/util.h" #include using namespace std; @@ -84,6 +85,9 @@ class SwsscommonEnvironment : public ::testing::Environment { SonicDBConfig::initializeGlobalConfig(global_existing_file); cout<<"INIT: load global db config file, isInit = "<