diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 0a804eb38c..1aedbd5b8d 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize; #define DEFAULT_BATCH_SIZE 128 extern int gBatchSize; +bool gRingMode = false; + bool gSyncMode = false; sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC; string gAsicInstance; @@ -72,7 +74,7 @@ bool gTraditionalFlexCounter = false; void usage() { - cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode]" << endl; + cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-R]" << endl; cout << " -h: display this message" << endl; cout << " -r record_type: record orchagent logs with type (default 3)" << endl; cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl; @@ -92,6 +94,7 @@ void usage() cout << " -k max bulk size in bulk mode (default 1000)" << endl; cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl; cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl; + cout << " -R: enable the ring buffer mode" << endl; } void sighup_handler(int signo) @@ -346,7 +349,7 @@ int main(int argc, char **argv) string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME; int record_type = 3; // Only swss and sairedis recordings enabled by default. - while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:")) != -1) + while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:R")) != -1) { switch (opt) { @@ -437,6 +440,9 @@ int main(int argc, char **argv) enable_zmq = true; } break; + case 'R': + gRingMode = true; + break; default: /* '?' */ exit(EXIT_FAILURE); } @@ -782,6 +788,11 @@ int main(int argc, char **argv) orchDaemon = make_shared(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get()); } + if (gRingMode) { + /* Initialize the ring before OrchDaemon initializing Orchs */ + orchDaemon->enableRingBuffer(); + } + if (!orchDaemon->init()) { SWSS_LOG_ERROR("Failed to initialize orchestration daemon"); diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index d1cbdb89c8..eb27f9fd20 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -13,9 +13,19 @@ #include "zmqconsumerstatetable.h" #include "sai_serialize.h" +#define PRINT_ALL 1 +#define VERBOSE true + using namespace swss; int gBatchSize = 0; +// no need for further pops if entries popped in the iteration fewer than gBatchSize divided by POPS_SCALE +#define POPS_SCALE 10 +// if #entries popped exceeds LARGE_TRAFFIC, drain them immediately instead of waiting for more pops +#define LARGE_TRAFFIC 10000 + +OrchRing* Orch::gRingBuffer = nullptr; +OrchRing* Executor::gRingBuffer = nullptr; Orch::Orch(DBConnector *db, const string tableName, int pri) { @@ -155,6 +165,10 @@ size_t ConsumerBase::addToSync(const std::deque &entries return entries.size(); } +size_t ConsumerBase::addToSync(std::shared_ptr> entries) { + return addToSync(*entries); +} + // TODO: Table should be const size_t ConsumerBase::refillToSync(Table* table) { @@ -239,25 +253,88 @@ void ConsumerBase::dumpPendingTasks(vector &ts) void Consumer::execute() { - // ConsumerBase::execute_impl(); + static swss::PerformanceTimer timer("POPS", PRINT_ALL, VERBOSE); + SWSS_LOG_ENTER(); - size_t update_size = 0; - auto table = static_cast(getSelectable()); - do - { - std::deque entries; - table->pops(entries); - update_size = addToSync(entries); - } while (update_size != 0); + size_t popped_size = 0; // number of entries popped per iteration + swss::ConsumerTableBase *table = getConsumerTable(); + + size_t drain_size = 0; // number of entries popped, batched for a drain + + do { + + if (gRingBuffer && gRingBuffer->Serves(getName())) { + timer.start(); + } + + auto entries = std::make_shared>(); + table->pops(*entries); - drain(); + popped_size = entries->size(); + drain_size += popped_size; + + pushRingBuffer([=](){ + addToSync(entries); + }); + + if (gRingBuffer && gRingBuffer->Serves(getName())) { + timer.stop(); + timer.inc((int)popped_size); + } + + if (drain_size >= LARGE_TRAFFIC) { + pushRingBuffer([=](){ + drain(); + }); + drain_size = 0; + } + + } while (gBatchSize && popped_size * POPS_SCALE > (size_t)gBatchSize); + + pushRingBuffer([=](){ + drain(); + }); +} + +void Executor::pushRingBuffer(AnyTask&& func) +{ + if (!gRingBuffer || !gRingBuffer->Started) + { + func(); + } + else if (!gRingBuffer->Serves(getName())) + { + while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) { + std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS)); + } + func(); + } else { + while (!gRingBuffer->push(func)) { + SWSS_LOG_WARN("fail to push..ring is full..."); + } + gRingBuffer->cv.notify_one(); + } } void Consumer::drain() { - if (!m_toSync.empty()) + if (m_toSync.empty()) + return; + + if (getName() == APP_ROUTE_TABLE_NAME) { + static swss::PerformanceTimer timer("DRAIN", PRINT_ALL, VERBOSE); + size_t before = m_toSync.size(); + timer.start(); + ((Orch *)m_orch)->doTask((Consumer&)*this); + timer.stop(); + size_t after = m_toSync.size(); + timer.inc(before - after); + } + else + { ((Orch *)m_orch)->doTask((Consumer&)*this); + } } size_t Orch::addExistingData(const string& tableName) @@ -542,6 +619,16 @@ void Orch::doTask() } } +void Orch::doTask(std::string excluded_table) +{ + for (auto &it : m_consumerMap) { + if (gRingBuffer && it.second->getName() == excluded_table) { + continue; + } + it.second->drain(); + } +} + void Orch::dumpPendingTasks(vector &ts) { for (auto &it : m_consumerMap) diff --git a/orchagent/orch.h b/orchagent/orch.h index bdbecf5f5f..73186e8bf4 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -7,6 +7,7 @@ #include #include #include +#include extern "C" { #include @@ -24,6 +25,8 @@ extern "C" { #include "macaddress.h" #include "response_publisher.h" #include "recorder.h" +#include "schema.h" +#include "performancetimer.h" const char delimiter = ':'; const char list_item_delimiter = ','; @@ -49,6 +52,9 @@ const char state_db_key_delimiter = '|'; #define DEFAULT_KEY_SEPARATOR ":" #define VLAN_SUB_INTERFACE_SEPARATOR "." +#define ORCH_RING_SIZE 30 +#define SLEEP_MSECONDS 500 + const int default_orch_pri = 0; typedef enum @@ -88,6 +94,11 @@ typedef std::pair table_name_with_pri_t; class Orch; +using AnyTask = std::function; +template +class RingBuffer; +typedef RingBuffer OrchRing; + // Design assumption // 1. one Orch can have one or more Executor // 2. one Executor must belong to one and only one Orch @@ -124,6 +135,10 @@ class Executor : public swss::Selectable return m_name; } + Orch *getOrch() const { return m_orch; } + static OrchRing* gRingBuffer; + void pushRingBuffer(AnyTask&& func); + protected: swss::Selectable *m_selectable; Orch *m_orch; @@ -135,6 +150,8 @@ class Executor : public swss::Selectable swss::Selectable *getSelectable() const { return m_selectable; } }; +typedef std::map> ConsumerMap; + class ConsumerBase : public Executor { public: ConsumerBase(swss::Selectable *selectable, Orch *orch, const std::string &name) @@ -164,10 +181,127 @@ class ConsumerBase : public Executor { // Returns: the number of entries added to m_toSync size_t addToSync(const std::deque &entries); + size_t addToSync(std::shared_ptr> ptr); size_t refillToSync(); size_t refillToSync(swss::Table* table); }; +template +class RingBuffer +{ +private: + static RingBuffer* instance; + std::vector buffer; + int head = 0; + int tail = 0; + ConsumerMap m_consumerMap; +protected: + RingBuffer(): buffer(RingSize) {} + ~RingBuffer() { + delete instance; + } +public: + RingBuffer(const RingBuffer&) = delete; + RingBuffer(RingBuffer&&) = delete; + RingBuffer& operator= (const RingBuffer&) = delete; + RingBuffer& operator= (RingBuffer&&) = delete; + static RingBuffer* Get(); + bool Started = false; + bool Idle = true; + std::mutex mtx; + std::condition_variable cv; + bool IsFull(); + bool IsEmpty(); + bool push(DataType entry); + bool pop(DataType& entry); + DataType& HeadEntry(); + void addExecutor(Executor* executor); + void doTask(); + bool tasksPending(); + bool Serves(const std::string& tableName); +}; +template +RingBuffer* RingBuffer::instance = nullptr; +template +RingBuffer* RingBuffer::Get() +{ + if (instance == nullptr) { + instance = new RingBuffer(); + SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance); + } + return instance; +} +template +bool RingBuffer::IsFull() +{ + return (tail + 1) % RingSize == head; +} +template +bool RingBuffer::IsEmpty() +{ + return tail == head; +} +template +bool RingBuffer::push(DataType ringEntry) +{ + if (IsFull()) + return false; + buffer[tail] = std::move(ringEntry); + tail = (tail + 1) % RingSize; + return true; +} +template +DataType& RingBuffer::HeadEntry() { + return buffer[head]; +} +template +bool RingBuffer::pop(DataType& ringEntry) +{ + if (IsEmpty()) + return false; + ringEntry = std::move(buffer[head]); + head = (head + 1) % RingSize; + return true; +} +template +void RingBuffer::addExecutor(Executor* executor) +{ + auto inserted = m_consumerMap.emplace(std::piecewise_construct, + std::forward_as_tuple(executor->getName()), + std::forward_as_tuple(executor)); + // If there is duplication of executorName in m_consumerMap, logic error + if (!inserted.second) + { + SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str()); + } +} +template +void RingBuffer::doTask() +{ + for (auto &it : m_consumerMap) { + it.second->drain(); + } +} +template +bool RingBuffer::tasksPending() +{ + for (auto &it : m_consumerMap) { + auto consumer = dynamic_cast(it.second.get()); + if (consumer->m_toSync.size()) + return true; + } + return false; +} +template +bool RingBuffer::Serves(const std::string& tableName) +{ + for (auto &it : m_consumerMap) { + if (it.first == tableName) + return true; + } + return false; +} + class Consumer : public ConsumerBase { public: Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name) @@ -175,7 +309,7 @@ class Consumer : public ConsumerBase { { } - swss::TableBase *getConsumerTable() const override + swss::ConsumerTableBase *getConsumerTable() const override { // ConsumerTableBase is a subclass of TableBase return static_cast(getSelectable()); @@ -201,8 +335,6 @@ class Consumer : public ConsumerBase { void drain() override; }; -typedef std::map> ConsumerMap; - typedef enum { success, @@ -225,6 +357,8 @@ class Orch Orch(const std::vector& tables); virtual ~Orch() = default; + static OrchRing* gRingBuffer; + std::vector getSelectables(); // add the existing table data (left by warm reboot) to the consumer todo task list. @@ -242,6 +376,7 @@ class Orch virtual void doTask(Consumer &consumer) { }; virtual void doTask(swss::NotificationConsumer &consumer) { } virtual void doTask(swss::SelectableTimer &timer) { } + virtual void doTask(std::string excluded_table); void dumpPendingTasks(std::vector &ts); diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 047263c93a..152a945871 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -87,6 +87,12 @@ OrchDaemon::~OrchDaemon() { SWSS_LOG_ENTER(); + // Stop the ring thread before delete orch pointers + if (gRingBuffer) { + ring_thread_exited = true; + ring_thread.detach(); + } + /* * Some orchagents call other agents in their destructor. * To avoid accessing deleted agent, do deletion in reverse order. @@ -105,6 +111,38 @@ OrchDaemon::~OrchDaemon() events_deinit_publisher(g_events_handle); } +void OrchDaemon::popRingBuffer() +{ + if (!gRingBuffer || gRingBuffer->Started) + return; + SWSS_LOG_ENTER(); + gRingBuffer->Started = true; + SWSS_LOG_NOTICE("OrchDaemon starts the popRingBuffer thread!"); + while (!ring_thread_exited) + { + std::unique_lock lock(gRingBuffer->mtx); + gRingBuffer->cv.wait(lock, [&](){ return !gRingBuffer->IsEmpty(); }); + gRingBuffer->Idle = false; + AnyTask func; + while (gRingBuffer->pop(func)) { + func(); + } + gRingBuffer->doTask(); + gRingBuffer->Idle = true; + } +} +/** + * This function initializes gRingBuffer for the OrchDaemon instance, + * (otherwise gRingBuffer is nullptr, which indicates ring mode is not enabled) + * then syncs this gRingBuffer with Executor and Orch. + * Hence the whole program shares this single global RingBuffer. + */ +void OrchDaemon::enableRingBuffer() { + gRingBuffer = OrchRing::Get(); + Executor::gRingBuffer = gRingBuffer; + Orch::gRingBuffer = gRingBuffer; +} + bool OrchDaemon::init() { SWSS_LOG_ENTER(); @@ -818,6 +856,8 @@ void OrchDaemon::start() Recorder::Instance().sairedis.setRotate(false); + ring_thread = std::thread(&OrchDaemon::popRingBuffer, this); + for (Orch *o : m_orchList) { m_select->addSelectables(o->getSelectables()); @@ -876,9 +916,13 @@ void OrchDaemon::start() * execute all the remaining tasks that need to be retried. */ /* TODO: Abstract Orch class to have a specific todo list */ - for (Orch *o : m_orchList) - o->doTask(); - + for (Orch *o : m_orchList) { + if (c->getName() == APP_ROUTE_TABLE_NAME) { + o->doTask(APP_ROUTE_TABLE_NAME); + } else { + o->doTask(); + } + } /* * Asked to check warm restart readiness. * Not doing this under Select::TIMEOUT condition because of @@ -906,8 +950,8 @@ void OrchDaemon::start() } } - // Flush sairedis's redis pipeline - flush(); + // Flush sairedis's redis pipeline after the ring becomes empty + while (gRingBuffer && !gRingBuffer->IsEmpty()) {std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));} SWSS_LOG_WARN("Orchagent is frozen for warm restart!"); freezeAndHeartBeat(UINT_MAX); diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index 2473848bf5..d23901630b 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -83,7 +83,21 @@ class OrchDaemon m_fabricQueueStatEnabled = enabled; } void logRotate(); + + // Two required API to support ring buffer feature + /** + * This method is used by a ring buffer consumer [Orchdaemon] to initialzie its ring, + * and populate this ring's pointer to the producers [Orch, Consumer], to make sure that + * they are connected to the same ring. + */ + void enableRingBuffer(); + /** + * This method describes how the ring consumer consumes this ring. + */ + void popRingBuffer(); + private: + std::thread ring_thread; DBConnector *m_applDb; DBConnector *m_configDb; DBConnector *m_stateDb; @@ -96,7 +110,7 @@ class OrchDaemon std::vector m_orchList; Select *m_select; - + std::chrono::time_point m_lastHeartBeat; void flush(); @@ -104,6 +118,11 @@ class OrchDaemon void heartBeat(std::chrono::time_point tcurrent); void freezeAndHeartBeat(unsigned int duration); + +protected: + /* Orchdaemon instance points to the same ring buffer during its lifetime */ + OrchRing* gRingBuffer = nullptr; + std::atomic ring_thread_exited{false}; }; class FabricOrchDaemon : public OrchDaemon