Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[orchagent] implement ring buffer feature with a flag #3242

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fpmsyncd/fpmsyncd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#ifndef __FPMSYNCD__
#define __FPMSYNCD__

// redispipeline has a maximum capacity of 50000 entries
#define ROUTE_SYNC_PPL_SIZE 50000

#endif
2 changes: 1 addition & 1 deletion neighsyncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ neighsyncd_SOURCES = neighsyncd.cpp neighsync.cpp $(top_srcdir)/warmrestart/warm

neighsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN)
neighsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_ASAN)
neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon
neighsyncd_LDADD = $(LDFLAGS_ASAN) -lnl-3 -lnl-route-3 -lswsscommon -lpthread

if GCOV_ENABLED
neighsyncd_SOURCES += ../gcovpreload/gcovpreload.cpp
Expand Down
15 changes: 13 additions & 2 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize;
#define DEFAULT_BATCH_SIZE 128
extern int gBatchSize;

bool gRingMode = false;

bool gSyncMode = false;
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
string gAsicInstance;
Expand All @@ -73,7 +75,7 @@ uint32_t create_switch_timeout = 0;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout]" << endl;
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-t create_switch_timeout] [-R]" << endl;
cout << " -h: display this message" << endl;
cout << " -r record_type: record orchagent logs with type (default 3)" << endl;
cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl;
Expand All @@ -94,6 +96,7 @@ void usage()
cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl;
cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl;
cout << " -t Override create switch timeout, in sec" << endl;
cout << " -R enable the ring buffer thread" << endl;
}

void sighup_handler(int signo)
Expand Down Expand Up @@ -348,7 +351,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:t:R")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -442,6 +445,9 @@ int main(int argc, char **argv)
case 't':
create_switch_timeout = atoi(optarg);
break;
case 'R':
gRingMode = true;
break;
default: /* '?' */
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -792,6 +798,11 @@ int main(int argc, char **argv)
orchDaemon = make_shared<FabricOrchDaemon>(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get());
}

if (gRingMode) {
/* Initialize the ring before OrchDaemon initializing Orchs */
orchDaemon->enableRingBuffer();
}

if (!orchDaemon->init())
{
SWSS_LOG_ERROR("Failed to initialize orchestration daemon");
Expand Down
74 changes: 65 additions & 9 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
#include "zmqserver.h"
#include "zmqconsumerstatetable.h"
#include "sai_serialize.h"
#include "fpmsyncd/fpmsyncd.h"

using namespace swss;

int gBatchSize = 0;

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down Expand Up @@ -155,6 +159,10 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
return entries.size();
}

size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
return addToSync(*entries);
}

// TODO: Table should be const
size_t ConsumerBase::refillToSync(Table* table)
{
Expand Down Expand Up @@ -239,23 +247,67 @@ void ConsumerBase::dumpPendingTasks(vector<string> &ts)

void Consumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
SWSS_LOG_ENTER();

auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
size_t total_size = 0;

while (true)
{
size_t popped_size = 0; // number of entries popped from the redis table

auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
getConsumerTable()->pops(*entries);

// add to sync
addToSync(entries);
popped_size = entries->size();
total_size += popped_size;

drain();
pushRingBuffer([=](){
addToSync(entries);
});

if (!gBatchSize || popped_size * 10 <= (size_t)gBatchSize || total_size >= ROUTE_SYNC_PPL_SIZE) {
// some program doesn't initialize gBatchSize and use TableConsumable::DEFAULT_POP_BATCH_SIZE instead
break;
}
}

pushRingBuffer([=](){
drain();
});
}

void Executor::pushRingBuffer(AnyTask&& task)
{
if (!gRingBuffer || !gRingBuffer->threadCreated)
{
// execute the task right now in this thread if gRingBuffer is not initialized
// or the ring thread is not created, or this executor is not served by gRingBuffer
task();
}
else if (!gRingBuffer->Serves(getName())) // not served by ring thread
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
gRingBuffer->notify();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
// if ring thread is enabled, make sure to execute task after the ring finishes its work
task();
}
else
{
// if this executor is served by gRingBuffer, push the task to gRingBuffer
// and notify the ring thread to flush gRingBuffer
while (!gRingBuffer->push(task)) {
gRingBuffer->notify();
SWSS_LOG_WARN("ring is full...push again");
}
gRingBuffer->notify();
}
}

void Consumer::drain()
{
if (!m_toSync.empty())
((Orch *)m_orch)->doTask((Consumer&)*this);
m_orch->doTask(*this);
}

size_t Orch::addExistingData(const string& tableName)
Expand Down Expand Up @@ -802,6 +854,10 @@ void Orch::addExecutor(Executor* executor)
{
SWSS_LOG_THROW("Duplicated executorName in m_consumerMap: %s", executor->getName().c_str());
}

if (gRingBuffer && executor->getName() == APP_ROUTE_TABLE_NAME) {
gRingBuffer->addExecutor(executor);
}
}

Executor *Orch::getExecutor(string executorName)
Expand Down
Loading
Loading