diff --git a/fpmsyncd/fpmsyncd.h b/fpmsyncd/fpmsyncd.h new file mode 100644 index 0000000000..b22b63fafb --- /dev/null +++ b/fpmsyncd/fpmsyncd.h @@ -0,0 +1,7 @@ +#ifndef __FPMSYNCD__ +#define __FPMSYNCD__ + +// redispipeline has a maximum capacity of 50000 entries +#define ROUTE_SYNC_PPL_SIZE 50000 + +#endif diff --git a/neighsyncd/Makefile.am b/neighsyncd/Makefile.am index 1f34e9e92f..bea80a05e7 100644 --- a/neighsyncd/Makefile.am +++ b/neighsyncd/Makefile.am @@ -12,7 +12,7 @@ neighsyncd_SOURCES = neighsyncd.cpp neighsync.cpp $(top_srcdir)/warmrestart/warm neighsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN) neighsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN) -neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon +neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon -lpthread if GCOV_ENABLED neighsyncd_SOURCES += ../gcovpreload/gcovpreload.cpp diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 556bb70892..1cff19137d 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize; #define DEFAULT_BATCH_SIZE 128 extern int gBatchSize; +bool gRingMode = false; + bool gSyncMode = false; sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC; string gAsicInstance; @@ -73,7 +75,7 @@ uint32_t create_switch_timeout = 0; void usage() { - cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout]" << endl; + cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout] [-R]" << endl; cout << " -h: display this message" << endl; cout << " -r record_type: record orchagent logs with type (default 3)" << endl; cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl; @@ -94,6 +96,7 @@ void usage() cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl; cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl; cout << " -t Override create switch timeout, in sec" << endl; + cout << " -R enable the ring buffer thread" << endl; } void sighup_handler(int signo) @@ -348,7 +351,7 @@ int main(int argc, char **argv) string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME; int record_type = 3; // Only swss and sairedis recordings enabled by default. - while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:")) != -1) + while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:R")) != -1) { switch (opt) { @@ -442,6 +445,9 @@ int main(int argc, char **argv) case 't': create_switch_timeout = atoi(optarg); break; + case 'R': + gRingMode = true; + break; default: /* '?' */ exit(EXIT_FAILURE); } @@ -792,6 +798,11 @@ int main(int argc, char **argv) orchDaemon = make_shared(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get()); } + if (gRingMode) { + /* Initialize the ring before OrchDaemon initializing Orchs */ + orchDaemon->enableRingBuffer(); + } + if (!orchDaemon->init()) { SWSS_LOG_ERROR("Failed to initialize orchestration daemon"); diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index 708a86280a..c9a32f9767 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -12,11 +12,15 @@ #include "zmqserver.h" #include "zmqconsumerstatetable.h" #include "sai_serialize.h" +#include "fpmsyncd/fpmsyncd.h" using namespace swss; int gBatchSize = 0; +OrchRing* Orch::gRingBuffer = nullptr; +OrchRing* Executor::gRingBuffer = nullptr; + Orch::Orch(DBConnector *db, const string tableName, int pri) { addConsumer(db, tableName, pri); @@ -155,6 +159,10 @@ size_t ConsumerBase::addToSync(const std::deque &entries return entries.size(); } +size_t ConsumerBase::addToSync(std::shared_ptr> entries) { + return addToSync(*entries); +} + // TODO: Table should be const size_t ConsumerBase::refillToSync(Table* table) { @@ -239,23 +247,67 @@ void ConsumerBase::dumpPendingTasks(vector &ts) void Consumer::execute() { - // ConsumerBase::execute_impl(); SWSS_LOG_ENTER(); - auto table = static_cast(getSelectable()); - std::deque entries; - table->pops(entries); + size_t total_size = 0; + + while (true) + { + size_t popped_size = 0; // number of entries popped from the redis table + + auto entries = std::make_shared>(); + getConsumerTable()->pops(*entries); - // add to sync - addToSync(entries); + popped_size = entries->size(); + total_size += popped_size; - drain(); + pushRingBuffer([=](){ + addToSync(entries); + }); + + if (!gBatchSize || popped_size * 10 <= (size_t)gBatchSize || total_size >= ROUTE_SYNC_PPL_SIZE) { + // some program doesn't initialize gBatchSize and use TableConsumable::DEFAULT_POP_BATCH_SIZE instead + break; + } + } + + pushRingBuffer([=](){ + drain(); + }); +} + +void Executor::pushRingBuffer(AnyTask&& task) +{ + if (!gRingBuffer || !gRingBuffer->threadCreated) + { + // execute the task right now in this thread if gRingBuffer is not initialized + // or the ring thread is not created, or this executor is not served by gRingBuffer + task(); + } + else if (!gRingBuffer->Serves(getName())) // not served by ring thread + { + while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) { + gRingBuffer->notify(); + std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS)); + } + // if ring thread is enabled, make sure to execute task after the ring finishes its work + task(); + } + else + { + // if this executor is served by gRingBuffer, push the task to gRingBuffer + // and notify the ring thread to flush gRingBuffer + while (!gRingBuffer->push(task)) { + gRingBuffer->notify(); + SWSS_LOG_WARN("ring is full...push again"); + } + gRingBuffer->notify(); + } } void Consumer::drain() { - if (!m_toSync.empty()) - ((Orch *)m_orch)->doTask((Consumer&)*this); + m_orch->doTask(*this); } size_t Orch::addExistingData(const string& tableName) @@ -802,6 +854,10 @@ void Orch::addExecutor(Executor* executor) { SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str()); } + + if (gRingBuffer && executor->getName() == APP_ROUTE_TABLE_NAME) { + gRingBuffer->addExecutor(executor); + } } Executor *Orch::getExecutor(string executorName) diff --git a/orchagent/orch.h b/orchagent/orch.h index bdbecf5f5f..4dc2c2e924 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -7,6 +7,7 @@ #include #include #include +#include extern "C" { #include @@ -24,6 +25,7 @@ extern "C" { #include "macaddress.h" #include "response_publisher.h" #include "recorder.h" +#include "schema.h" const char delimiter = ':'; const char list_item_delimiter = ','; @@ -49,6 +51,9 @@ const char state_db_key_delimiter = '|'; #define DEFAULT_KEY_SEPARATOR ":" #define VLAN_SUB_INTERFACE_SEPARATOR "." +#define ORCH_RING_SIZE 30 +#define SLEEP_MSECONDS 500 + const int default_orch_pri = 0; typedef enum @@ -88,6 +93,11 @@ typedef std::pair table_name_with_pri_t; class Orch; +using AnyTask = std::function; +template +class RingBuffer; +typedef RingBuffer OrchRing; + // Design assumption // 1. one Orch can have one or more Executor // 2. one Executor must belong to one and only one Orch @@ -124,6 +134,10 @@ class Executor : public swss::Selectable return m_name; } + Orch *getOrch() const { return m_orch; } + static OrchRing* gRingBuffer; + void pushRingBuffer(AnyTask&& func); + protected: swss::Selectable *m_selectable; Orch *m_orch; @@ -135,6 +149,8 @@ class Executor : public swss::Selectable swss::Selectable *getSelectable() const { return m_selectable; } }; +typedef std::map> ConsumerMap; + class ConsumerBase : public Executor { public: ConsumerBase(swss::Selectable *selectable, Orch *orch, const std::string &name) @@ -163,11 +179,139 @@ class ConsumerBase : public Executor { // Returns: the number of entries added to m_toSync size_t addToSync(const std::deque &entries); + size_t addToSync(std::shared_ptr> entries); size_t refillToSync(); size_t refillToSync(swss::Table* table); }; +template +class RingBuffer +{ +private: + static RingBuffer* instance; + std::vector buffer; + int head = 0; + int tail = 0; + ConsumerMap m_consumerMap; + + std::condition_variable cv; + std::mutex mtx; + +protected: + RingBuffer(): buffer(RingSize) {} + ~RingBuffer() { + delete instance; + } + +public: + RingBuffer(const RingBuffer&) = delete; + RingBuffer(RingBuffer&&) = delete; + RingBuffer& operator= (const RingBuffer&) = delete; + RingBuffer& operator= (RingBuffer&&) = delete; + + static RingBuffer* Get(); + bool threadCreated = false; + bool Idle = true; + + // pause the ring thread if the buffer is empty + void pause_thread(); + // wake up the ring thread in case it's locked but not empty + void notify(); + + bool IsFull(); + bool IsEmpty(); + + bool push(DataType entry); + bool pop(DataType& entry); + + void addExecutor(Executor* executor); + bool Serves(const std::string& tableName); +}; + +template +void RingBuffer::pause_thread() +{ + std::unique_lock lock(mtx); + cv.wait(lock, [&](){ return !IsEmpty(); }); +} + +template +void RingBuffer::notify() +{ + if (!IsEmpty() && Idle) + cv.notify_all(); +} + +template +RingBuffer* RingBuffer::instance = nullptr; + +template +RingBuffer* RingBuffer::Get() +{ + if (instance == nullptr) { + instance = new RingBuffer(); + SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance); + } + return instance; +} + +template +bool RingBuffer::IsFull() +{ + return (tail + 1) % RingSize == head; +} + +template +bool RingBuffer::IsEmpty() +{ + return tail == head; +} + +template +bool RingBuffer::push(DataType ringEntry) +{ + if (IsFull()) + return false; + buffer[tail] = std::move(ringEntry); + tail = (tail + 1) % RingSize; + return true; +} + +template +bool RingBuffer::pop(DataType& ringEntry) +{ + if (IsEmpty()) + return false; + ringEntry = std::move(buffer[head]); + head = (head + 1) % RingSize; + return true; +} + +template +void RingBuffer::addExecutor(Executor* executor) +{ + auto inserted = m_consumerMap.emplace(std::piecewise_construct, + std::forward_as_tuple(executor->getName()), + std::forward_as_tuple(executor)); + + // If there is duplication of executorName in m_consumerMap, logic error + if (!inserted.second) + { + SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str()); + } +} + +template +bool RingBuffer::Serves(const std::string& tableName) +{ + for (auto &it : m_consumerMap) { + if (it.first == tableName) + return true; + } + return false; +} + class Consumer : public ConsumerBase { public: Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name) @@ -175,7 +319,7 @@ class Consumer : public ConsumerBase { { } - swss::TableBase *getConsumerTable() const override + swss::ConsumerTableBase *getConsumerTable() const override { // ConsumerTableBase is a subclass of TableBase return static_cast(getSelectable()); @@ -201,8 +345,6 @@ class Consumer : public ConsumerBase { void drain() override; }; -typedef std::map> ConsumerMap; - typedef enum { success, @@ -225,6 +367,8 @@ class Orch Orch(const std::vector& tables); virtual ~Orch() = default; + static OrchRing* gRingBuffer; + std::vector getSelectables(); // add the existing table data (left by warm reboot) to the consumer todo task list. diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 0d2ab1c200..e43dbdfdc0 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -87,6 +87,12 @@ OrchDaemon::~OrchDaemon() { SWSS_LOG_ENTER(); + // Stop the ring thread before delete orch pointers + if (gRingBuffer) { + ring_thread_exited = true; + ring_thread.detach(); + } + /* * Some orchagents call other agents in their destructor. * To avoid accessing deleted agent, do deletion in reverse order. @@ -105,6 +111,44 @@ OrchDaemon::~OrchDaemon() events_deinit_publisher(g_events_handle); } +void OrchDaemon::popRingBuffer() +{ + SWSS_LOG_ENTER(); + + // make sure there is only one thread created to run popRingBuffer() + if (!gRingBuffer || gRingBuffer->threadCreated) + return; + + gRingBuffer->threadCreated = true; + SWSS_LOG_NOTICE("OrchDaemon starts the popRingBuffer thread!"); + + while (!ring_thread_exited) + { + gRingBuffer->pause_thread(); + + gRingBuffer->Idle = false; + + AnyTask func; + while (gRingBuffer->pop(func)) { + func(); + } + + gRingBuffer->Idle = true; + } +} + +/** + * This function initializes gRingBuffer for the OrchDaemon instance, + * (otherwise gRingBuffer is nullptr, which indicates ring mode is not enabled) + * then syncs this gRingBuffer with Executor and Orch. + * Hence the whole program shares this single global RingBuffer. + */ +void OrchDaemon::enableRingBuffer() { + gRingBuffer = OrchRing::Get(); + Executor::gRingBuffer = gRingBuffer; + Orch::gRingBuffer = gRingBuffer; +} + bool OrchDaemon::init() { SWSS_LOG_ENTER(); @@ -820,6 +864,8 @@ void OrchDaemon::start() Recorder::Instance().sairedis.setRotate(false); + ring_thread = std::thread(&OrchDaemon::popRingBuffer, this); + for (Orch *o : m_orchList) { m_select->addSelectables(o->getSelectables()); @@ -860,6 +906,17 @@ void OrchDaemon::start() * requests live in it. When the daemon has nothing to do, it * is a good chance to flush the pipeline */ flush(); + + /* Normally the ring thread would not get locked if buffer isn't empty. + * Still it's possible that the previous cv notification was missed. + * Make sure there is a cv notification being sent periodically. */ + if (gRingBuffer) + gRingBuffer->notify(); + + if (!gRingBuffer || (gRingBuffer->IsEmpty() && gRingBuffer->Idle)) + for (Orch *o : m_orchList) + o->doTask(); + continue; } @@ -877,9 +934,9 @@ void OrchDaemon::start() /* After each iteration, periodically check all m_toSync map to * execute all the remaining tasks that need to be retried. */ - /* TODO: Abstract Orch class to have a specific todo list */ - for (Orch *o : m_orchList) - o->doTask(); + if (!gRingBuffer || (gRingBuffer->IsEmpty() && gRingBuffer->Idle)) + for (Orch *o : m_orchList) + o->doTask(); /* * Asked to check warm restart readiness. @@ -908,8 +965,8 @@ void OrchDaemon::start() } } - // Flush sairedis's redis pipeline - flush(); + // Flush sairedis's redis pipeline after the ring becomes empty + while (gRingBuffer && !(gRingBuffer->IsEmpty() && gRingBuffer->Idle)) {std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));} SWSS_LOG_WARN("Orchagent is frozen for warm restart!"); freezeAndHeartBeat(UINT_MAX); diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index 2473848bf5..d23901630b 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -83,7 +83,21 @@ class OrchDaemon m_fabricQueueStatEnabled = enabled; } void logRotate(); + + // Two required API to support ring buffer feature + /** + * This method is used by a ring buffer consumer [Orchdaemon] to initialzie its ring, + * and populate this ring's pointer to the producers [Orch, Consumer], to make sure that + * they are connected to the same ring. + */ + void enableRingBuffer(); + /** + * This method describes how the ring consumer consumes this ring. + */ + void popRingBuffer(); + private: + std::thread ring_thread; DBConnector *m_applDb; DBConnector *m_configDb; DBConnector *m_stateDb; @@ -96,7 +110,7 @@ class OrchDaemon std::vector m_orchList; Select *m_select; - + std::chrono::time_point m_lastHeartBeat; void flush(); @@ -104,6 +118,11 @@ class OrchDaemon void heartBeat(std::chrono::time_point tcurrent); void freezeAndHeartBeat(unsigned int duration); + +protected: + /* Orchdaemon instance points to the same ring buffer during its lifetime */ + OrchRing* gRingBuffer = nullptr; + std::atomic ring_thread_exited{false}; }; class FabricOrchDaemon : public OrchDaemon