Skip to content

Commit

Permalink
[orchagent] implement ring buffer feature with a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Oct 22, 2024
1 parent 90fcead commit 2ccb3dc
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 21 deletions.
7 changes: 7 additions & 0 deletions fpmsyncd/fpmsyncd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#ifndef __FPMSYNCD__
#define __FPMSYNCD__

// redispipeline has a maximum capacity of 50000 entries
#define ROUTE_SYNC_PPL_SIZE 50000

#endif
2 changes: 1 addition & 1 deletion neighsyncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ neighsyncd_SOURCES = neighsyncd.cpp neighsync.cpp $(top_srcdir)/warmrestart/warm

neighsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN)
neighsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN)
neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon
neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon -lpthread

if GCOV_ENABLED
neighsyncd_SOURCES += ../gcovpreload/gcovpreload.cpp
Expand Down
15 changes: 13 additions & 2 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize;
#define DEFAULT_BATCH_SIZE 128
extern int gBatchSize;

bool gRingMode = false;

bool gSyncMode = false;
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
string gAsicInstance;
Expand All @@ -73,7 +75,7 @@ uint32_t create_switch_timeout = 0;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout]" << endl;
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout] [-R]" << endl;
cout << " -h: display this message" << endl;
cout << " -r record_type: record orchagent logs with type (default 3)" << endl;
cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl;
Expand All @@ -94,6 +96,7 @@ void usage()
cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl;
cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl;
cout << " -t Override create switch timeout, in sec" << endl;
cout << " -R enable the ring buffer thread" << endl;
}

void sighup_handler(int signo)
Expand Down Expand Up @@ -348,7 +351,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:R")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -442,6 +445,9 @@ int main(int argc, char **argv)
case 't':
create_switch_timeout = atoi(optarg);
break;
case 'R':
gRingMode = true;
break;
default: /* '?' */
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -792,6 +798,11 @@ int main(int argc, char **argv)
orchDaemon = make_shared<FabricOrchDaemon>(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get());
}

if (gRingMode) {
/* Initialize the ring before OrchDaemon initializing Orchs */
orchDaemon->enableRingBuffer();
}

if (!orchDaemon->init())
{
SWSS_LOG_ERROR("Failed to initialize orchestration daemon");
Expand Down
74 changes: 65 additions & 9 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
#include "zmqserver.h"
#include "zmqconsumerstatetable.h"
#include "sai_serialize.h"
#include "fpmsyncd/fpmsyncd.h"

using namespace swss;

int gBatchSize = 0;

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down Expand Up @@ -155,6 +159,10 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
return entries.size();
}

size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
return addToSync(*entries);
}

// TODO: Table should be const
size_t ConsumerBase::refillToSync(Table* table)
{
Expand Down Expand Up @@ -239,23 +247,67 @@ void ConsumerBase::dumpPendingTasks(vector<string> &ts)

void Consumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
SWSS_LOG_ENTER();

auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
size_t total_size = 0;

while (true)
{
size_t popped_size = 0; // number of entries popped from the redis table

auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
getConsumerTable()->pops(*entries);

// add to sync
addToSync(entries);
popped_size = entries->size();
total_size += popped_size;

drain();
pushRingBuffer([=](){
addToSync(entries);
});

if (!gBatchSize || popped_size * 10 <= (size_t)gBatchSize || total_size >= ROUTE_SYNC_PPL_SIZE) {
// some program doesn't initialize gBatchSize and use TableConsumable::DEFAULT_POP_BATCH_SIZE instead
break;
}
}

pushRingBuffer([=](){
drain();
});
}

void Executor::pushRingBuffer(AnyTask&& task)
{
if (!gRingBuffer || !gRingBuffer->threadCreated)
{
// execute the task right now in this thread if gRingBuffer is not initialized
// or the ring thread is not created, or this executor is not served by gRingBuffer
task();
}
else if (!gRingBuffer->Serves(getName())) // not served by ring thread
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
gRingBuffer->notify();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
// if ring thread is enabled, make sure to execute task after the ring finishes its work
task();
}
else
{
// if this executor is served by gRingBuffer, push the task to gRingBuffer
// and notify the ring thread to flush gRingBuffer
while (!gRingBuffer->push(task)) {
gRingBuffer->notify();
SWSS_LOG_WARN("ring is full...push again");
}
gRingBuffer->notify();
}
}

void Consumer::drain()
{
if (!m_toSync.empty())
((Orch *)m_orch)->doTask((Consumer&)*this);
m_orch->doTask(*this);
}

size_t Orch::addExistingData(const string& tableName)
Expand Down Expand Up @@ -802,6 +854,10 @@ void Orch::addExecutor(Executor* executor)
{
SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str());
}

if (gRingBuffer && executor->getName() == APP_ROUTE_TABLE_NAME) {
gRingBuffer->addExecutor(executor);
}
}

Executor *Orch::getExecutor(string executorName)
Expand Down
Loading

0 comments on commit 2ccb3dc

Please sign in to comment.