diff --git a/orchagent/main.cpp b/orchagent/main.cpp index 0a804eb38c..1aedbd5b8d 100644 --- a/orchagent/main.cpp +++ b/orchagent/main.cpp @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize; #define DEFAULT_BATCH_SIZE 128 extern int gBatchSize; +bool gRingMode = false; + bool gSyncMode = false; sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC; string gAsicInstance; @@ -72,7 +74,7 @@ bool gTraditionalFlexCounter = false; void usage() { - cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode]" << endl; + cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-R]" << endl; cout << " -h: display this message" << endl; cout << " -r record_type: record orchagent logs with type (default 3)" << endl; cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl; @@ -92,6 +94,7 @@ void usage() cout << " -k max bulk size in bulk mode (default 1000)" << endl; cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl; cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl; + cout << " -R: enable the ring buffer mode" << endl; } void sighup_handler(int signo) @@ -346,7 +349,7 @@ int main(int argc, char **argv) string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME; int record_type = 3; // Only swss and sairedis recordings enabled by default. - while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:")) != -1) + while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:R")) != -1) { switch (opt) { @@ -437,6 +440,9 @@ int main(int argc, char **argv) enable_zmq = true; } break; + case 'R': + gRingMode = true; + break; default: /* '?' */ exit(EXIT_FAILURE); } @@ -782,6 +788,11 @@ int main(int argc, char **argv) orchDaemon = make_shared(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get()); } + if (gRingMode) { + /* Initialize the ring before OrchDaemon initializing Orchs */ + orchDaemon->enableRingBuffer(); + } + if (!orchDaemon->init()) { SWSS_LOG_ERROR("Failed to initialize orchestration daemon"); diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index d1cbdb89c8..079ddfaa10 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -13,10 +13,17 @@ #include "zmqconsumerstatetable.h" #include "sai_serialize.h" +#define PRINT_ALL 1 +#define VERBOSE true +#define LARGE_TRAFFIC 10000 + using namespace swss; int gBatchSize = 0; +OrchRing* Orch::gRingBuffer = nullptr; +OrchRing* Executor::gRingBuffer = nullptr; + Orch::Orch(DBConnector *db, const string tableName, int pri) { addConsumer(db, tableName, pri); @@ -155,6 +162,10 @@ size_t ConsumerBase::addToSync(const std::deque &entries return entries.size(); } +size_t ConsumerBase::addToSync(std::shared_ptr> entries) { + return addToSync(*entries); +} + // TODO: Table should be const size_t ConsumerBase::refillToSync(Table* table) { @@ -241,17 +252,86 @@ void Consumer::execute() { // ConsumerBase::execute_impl(); SWSS_LOG_ENTER(); + if (!gRingBuffer) { + size_t update_size = 0; + auto table = static_cast(getSelectable()); + do + { + std::deque entries; + table->pops(entries); + update_size = addToSync(entries); + } while (update_size != 0); - size_t update_size = 0; - auto table = static_cast(getSelectable()); - do - { - std::deque entries; - table->pops(entries); - update_size = addToSync(entries); - } while (update_size != 0); + drain(); + } else if (gRingBuffer->Serves(getName())) { + ring_execute(); + } else { + pushRingBuffer( + [=]() { + std::deque entries; + + getConsumerTable()->pops(entries); + addToSync(entries); + drain(); + } + ); + } +} + +void Consumer::ring_execute() +{ + SWSS_LOG_ENTER(); + static swss::PerformanceTimer timer("POPS", PRINT_ALL, VERBOSE); + size_t total = 0; + while (true) { + timer.start(); + auto entries = std::make_shared>(); + getConsumerTable()->pops(*entries); + // number of entries popped + size_t count = entries->size(); + total += count; + pushRingBuffer([=](){ + addToSync(entries); + }, this); + timer.stop(); + timer.inc((int)count); + if (total >= LARGE_TRAFFIC) { + pushRingBuffer([=](){ + drain(); + }, this); + total = 0; + } + if (!gBatchSize || count * 10 <= (size_t)gBatchSize) { + // some program doesn't initialize gBatchSize + // but use TableConsumable::DEFAULT_POP_BATCH_SIZE + break; + } + } + pushRingBuffer([=](){ + drain(); + }, this); +} - drain(); +void Executor::pushRingBuffer(AnyTask&& func, Executor* e) +{ + if (!gRingBuffer || !gRingBuffer->Started) { + func(); + } else { + if (e && gRingBuffer->Serves(e->getName())) { + while (!gRingBuffer->push(func)) { + SWSS_LOG_WARN("fail to push..ring is full..."); + } + gRingBuffer->cv.notify_one(); + + } + else + { + while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) { + std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS)); + } + func(); + } + } } void Consumer::drain() @@ -542,6 +622,16 @@ void Orch::doTask() } } +void Orch::doTask(std::string excluded_table) +{ + for (auto &it : m_consumerMap) { + if (gRingBuffer && it.second->getName() == excluded_table) { + continue; + } + it.second->drain(); + } +} + void Orch::dumpPendingTasks(vector &ts) { for (auto &it : m_consumerMap) diff --git a/orchagent/orch.h b/orchagent/orch.h index bdbecf5f5f..b2943fc02b 100644 --- a/orchagent/orch.h +++ b/orchagent/orch.h @@ -7,6 +7,7 @@ #include #include #include +#include extern "C" { #include @@ -24,6 +25,8 @@ extern "C" { #include "macaddress.h" #include "response_publisher.h" #include "recorder.h" +#include "schema.h" +#include "performancetimer.h" const char delimiter = ':'; const char list_item_delimiter = ','; @@ -49,6 +52,9 @@ const char state_db_key_delimiter = '|'; #define DEFAULT_KEY_SEPARATOR ":" #define VLAN_SUB_INTERFACE_SEPARATOR "." +#define ORCH_RING_SIZE 30 +#define SLEEP_MSECONDS 500 + const int default_orch_pri = 0; typedef enum @@ -88,6 +94,11 @@ typedef std::pair table_name_with_pri_t; class Orch; +using AnyTask = std::function; +template +class RingBuffer; +typedef RingBuffer OrchRing; + // Design assumption // 1. one Orch can have one or more Executor // 2. one Executor must belong to one and only one Orch @@ -117,6 +128,7 @@ class Executor : public swss::Selectable // Execute on event happening virtual void execute() { } + virtual void ring_execute() {} virtual void drain() { } virtual std::string getName() const @@ -124,6 +136,10 @@ class Executor : public swss::Selectable return m_name; } + Orch *getOrch() const { return m_orch; } + static OrchRing* gRingBuffer; + static void pushRingBuffer(AnyTask&& func, Executor* e=nullptr); + protected: swss::Selectable *m_selectable; Orch *m_orch; @@ -135,6 +151,8 @@ class Executor : public swss::Selectable swss::Selectable *getSelectable() const { return m_selectable; } }; +typedef std::map> ConsumerMap; + class ConsumerBase : public Executor { public: ConsumerBase(swss::Selectable *selectable, Orch *orch, const std::string &name) @@ -164,10 +182,127 @@ class ConsumerBase : public Executor { // Returns: the number of entries added to m_toSync size_t addToSync(const std::deque &entries); + size_t addToSync(std::shared_ptr> ptr); size_t refillToSync(); size_t refillToSync(swss::Table* table); }; +template +class RingBuffer +{ +private: + static RingBuffer* instance; + std::vector buffer; + int head = 0; + int tail = 0; + ConsumerMap m_consumerMap; +protected: + RingBuffer(): buffer(RingSize) {} + ~RingBuffer() { + delete instance; + } +public: + RingBuffer(const RingBuffer&) = delete; + RingBuffer(RingBuffer&&) = delete; + RingBuffer& operator= (const RingBuffer&) = delete; + RingBuffer& operator= (RingBuffer&&) = delete; + static RingBuffer* Get(); + bool Started = false; + bool Idle = true; + std::mutex mtx; + std::condition_variable cv; + bool IsFull(); + bool IsEmpty(); + bool push(DataType entry); + bool pop(DataType& entry); + DataType& HeadEntry(); + void addExecutor(Executor* executor); + void doTask(); + bool tasksPending(); + bool Serves(const std::string& tableName); +}; +template +RingBuffer* RingBuffer::instance = nullptr; +template +RingBuffer* RingBuffer::Get() +{ + if (instance == nullptr) { + instance = new RingBuffer(); + SWSS_LOG_NOTICE("Orchagent RingBuffer created at %p!", (void *)instance); + } + return instance; +} +template +bool RingBuffer::IsFull() +{ + return (tail + 1) % RingSize == head; +} +template +bool RingBuffer::IsEmpty() +{ + return tail == head; +} +template +bool RingBuffer::push(DataType ringEntry) +{ + if (IsFull()) + return false; + buffer[tail] = std::move(ringEntry); + tail = (tail + 1) % RingSize; + return true; +} +template +DataType& RingBuffer::HeadEntry() { + return buffer[head]; +} +template +bool RingBuffer::pop(DataType& ringEntry) +{ + if (IsEmpty()) + return false; + ringEntry = std::move(buffer[head]); + head = (head + 1) % RingSize; + return true; +} +template +void RingBuffer::addExecutor(Executor* executor) +{ + auto inserted = m_consumerMap.emplace(std::piecewise_construct, + std::forward_as_tuple(executor->getName()), + std::forward_as_tuple(executor)); + // If there is duplication of executorName in m_consumerMap, logic error + if (!inserted.second) + { + SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str()); + } +} +template +void RingBuffer::doTask() +{ + for (auto &it : m_consumerMap) { + it.second->drain(); + } +} +template +bool RingBuffer::tasksPending() +{ + for (auto &it : m_consumerMap) { + auto consumer = dynamic_cast(it.second.get()); + if (consumer->m_toSync.size()) + return true; + } + return false; +} +template +bool RingBuffer::Serves(const std::string& tableName) +{ + for (auto &it : m_consumerMap) { + if (it.first == tableName) + return true; + } + return false; +} + class Consumer : public ConsumerBase { public: Consumer(swss::ConsumerTableBase *select, Orch *orch, const std::string &name) @@ -198,11 +333,10 @@ class Consumer : public ConsumerBase { } void execute() override; + void ring_execute() override; void drain() override; }; -typedef std::map> ConsumerMap; - typedef enum { success, @@ -225,6 +359,8 @@ class Orch Orch(const std::vector& tables); virtual ~Orch() = default; + static OrchRing* gRingBuffer; + std::vector getSelectables(); // add the existing table data (left by warm reboot) to the consumer todo task list. @@ -242,6 +378,7 @@ class Orch virtual void doTask(Consumer &consumer) { }; virtual void doTask(swss::NotificationConsumer &consumer) { } virtual void doTask(swss::SelectableTimer &timer) { } + virtual void doTask(std::string excluded_table); void dumpPendingTasks(std::vector &ts); diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 047263c93a..152a945871 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -87,6 +87,12 @@ OrchDaemon::~OrchDaemon() { SWSS_LOG_ENTER(); + // Stop the ring thread before delete orch pointers + if (gRingBuffer) { + ring_thread_exited = true; + ring_thread.detach(); + } + /* * Some orchagents call other agents in their destructor. * To avoid accessing deleted agent, do deletion in reverse order. @@ -105,6 +111,38 @@ OrchDaemon::~OrchDaemon() events_deinit_publisher(g_events_handle); } +void OrchDaemon::popRingBuffer() +{ + if (!gRingBuffer || gRingBuffer->Started) + return; + SWSS_LOG_ENTER(); + gRingBuffer->Started = true; + SWSS_LOG_NOTICE("OrchDaemon starts the popRingBuffer thread!"); + while (!ring_thread_exited) + { + std::unique_lock lock(gRingBuffer->mtx); + gRingBuffer->cv.wait(lock, [&](){ return !gRingBuffer->IsEmpty(); }); + gRingBuffer->Idle = false; + AnyTask func; + while (gRingBuffer->pop(func)) { + func(); + } + gRingBuffer->doTask(); + gRingBuffer->Idle = true; + } +} +/** + * This function initializes gRingBuffer for the OrchDaemon instance, + * (otherwise gRingBuffer is nullptr, which indicates ring mode is not enabled) + * then syncs this gRingBuffer with Executor and Orch. + * Hence the whole program shares this single global RingBuffer. + */ +void OrchDaemon::enableRingBuffer() { + gRingBuffer = OrchRing::Get(); + Executor::gRingBuffer = gRingBuffer; + Orch::gRingBuffer = gRingBuffer; +} + bool OrchDaemon::init() { SWSS_LOG_ENTER(); @@ -818,6 +856,8 @@ void OrchDaemon::start() Recorder::Instance().sairedis.setRotate(false); + ring_thread = std::thread(&OrchDaemon::popRingBuffer, this); + for (Orch *o : m_orchList) { m_select->addSelectables(o->getSelectables()); @@ -876,9 +916,13 @@ void OrchDaemon::start() * execute all the remaining tasks that need to be retried. */ /* TODO: Abstract Orch class to have a specific todo list */ - for (Orch *o : m_orchList) - o->doTask(); - + for (Orch *o : m_orchList) { + if (c->getName() == APP_ROUTE_TABLE_NAME) { + o->doTask(APP_ROUTE_TABLE_NAME); + } else { + o->doTask(); + } + } /* * Asked to check warm restart readiness. * Not doing this under Select::TIMEOUT condition because of @@ -906,8 +950,8 @@ void OrchDaemon::start() } } - // Flush sairedis's redis pipeline - flush(); + // Flush sairedis's redis pipeline after the ring becomes empty + while (gRingBuffer && !gRingBuffer->IsEmpty()) {std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));} SWSS_LOG_WARN("Orchagent is frozen for warm restart!"); freezeAndHeartBeat(UINT_MAX); diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index 2473848bf5..d23901630b 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -83,7 +83,21 @@ class OrchDaemon m_fabricQueueStatEnabled = enabled; } void logRotate(); + + // Two required API to support ring buffer feature + /** + * This method is used by a ring buffer consumer [Orchdaemon] to initialzie its ring, + * and populate this ring's pointer to the producers [Orch, Consumer], to make sure that + * they are connected to the same ring. + */ + void enableRingBuffer(); + /** + * This method describes how the ring consumer consumes this ring. + */ + void popRingBuffer(); + private: + std::thread ring_thread; DBConnector *m_applDb; DBConnector *m_configDb; DBConnector *m_stateDb; @@ -96,7 +110,7 @@ class OrchDaemon std::vector m_orchList; Select *m_select; - + std::chrono::time_point m_lastHeartBeat; void flush(); @@ -104,6 +118,11 @@ class OrchDaemon void heartBeat(std::chrono::time_point tcurrent); void freezeAndHeartBeat(unsigned int duration); + +protected: + /* Orchdaemon instance points to the same ring buffer during its lifetime */ + OrchRing* gRingBuffer = nullptr; + std::atomic ring_thread_exited{false}; }; class FabricOrchDaemon : public OrchDaemon