Skip to content

Commit

Permalink
Merge pull request #1077 from Barenboim/master
Browse files Browse the repository at this point in the history
Optimize communicator.
  • Loading branch information
Barenboim authored Nov 2, 2022
2 parents 87d95be + cbd5463 commit 83d6346
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 41 deletions.
129 changes: 91 additions & 38 deletions src/kernel/Communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1141,8 +1141,12 @@ void Communicator::handler_thread_routine(void *context)
Communicator *comm = (Communicator *)context;
struct poller_result *res;

while ((res = (struct poller_result *)msgqueue_get(comm->queue)) != NULL)
while (1)
{
res = (struct poller_result *)msgqueue_get(comm->msgqueue);
if (!res)
break;

switch (res->data.operation)
{
case PD_OP_READ:
Expand Down Expand Up @@ -1174,7 +1178,8 @@ void Communicator::handler_thread_routine(void *context)
}
}

int Communicator::append(const void *buf, size_t *size, poller_message_t *msg)
int Communicator::append_request(const void *buf, size_t *size,
poller_message_t *msg)
{
CommMessageIn *in = (CommMessageIn *)msg;
struct CommConnEntry *entry = in->entry;
Expand All @@ -1186,17 +1191,37 @@ int Communicator::append(const void *buf, size_t *size, poller_message_t *msg)
if (ret > 0)
{
entry->state = CONN_STATE_SUCCESS;
if (entry->service)
timeout = -1;
else
timeout = -1;
}
else if (ret == 0 && session->timeout != 0)
timeout = Communicator::next_timeout(session);
else
return ret;

/* This set_timeout() never fails, which is very important. */
mpoller_set_timeout(entry->sockfd, timeout, entry->mpoller);
return ret;
}

int Communicator::append_reply(const void *buf, size_t *size,
poller_message_t *msg)
{
CommMessageIn *in = (CommMessageIn *)msg;
struct CommConnEntry *entry = in->entry;
CommSession *session = entry->session;
int timeout;
int ret;

ret = in->append(buf, size);
if (ret > 0)
{
entry->state = CONN_STATE_SUCCESS;
timeout = session->keep_alive_timeout();
session->timeout = timeout; /* Reuse session's timeout field. */
if (timeout == 0)
{
timeout = session->keep_alive_timeout();
session->timeout = timeout; /* Reuse session's timeout field. */
if (timeout == 0)
{
mpoller_del(entry->sockfd, entry->mpoller);
return ret;
}
mpoller_del(entry->sockfd, entry->mpoller);
return ret;
}
}
else if (ret == 0 && session->timeout != 0)
Expand Down Expand Up @@ -1256,32 +1281,50 @@ int Communicator::create_service_session(struct CommConnEntry *entry)
return -1;
}

poller_message_t *Communicator::create_message(void *context)
poller_message_t *Communicator::create_request(struct CommConnEntry *entry)
{
struct CommConnEntry *entry = (struct CommConnEntry *)context;
CommSession *session;

if (entry->state == CONN_STATE_IDLE)
{
pthread_mutex_t *mutex;
pthread_mutex_lock(&entry->target->mutex);
/* do nothing */
pthread_mutex_unlock(&entry->target->mutex);
}

if (entry->state != CONN_STATE_KEEPALIVE &&
entry->state != CONN_STATE_CONNECTED)
{
errno = EBADMSG;
return NULL;
}

if (entry->service)
mutex = &entry->target->mutex;
else
mutex = &entry->mutex;
if (Communicator::create_service_session(entry) < 0)
return NULL;

pthread_mutex_lock(mutex);
/* do nothing */
pthread_mutex_unlock(mutex);
session = entry->session;
session->in = session->message_in();
if (session->in)
{
session->in->poller_message_t::append = Communicator::append_request;
session->in->entry = entry;
}

if (entry->state == CONN_STATE_CONNECTED ||
entry->state == CONN_STATE_KEEPALIVE)
return session->in;
}

poller_message_t *Communicator::create_reply(struct CommConnEntry *entry)
{
CommSession *session;

if (entry->state == CONN_STATE_IDLE)
{
if (Communicator::create_service_session(entry) < 0)
return NULL;
pthread_mutex_lock(&entry->mutex);
/* do nothing */
pthread_mutex_unlock(&entry->mutex);
}
else if (entry->state != CONN_STATE_RECEIVING)

if (entry->state != CONN_STATE_RECEIVING)
{
errno = EBADMSG;
return NULL;
Expand All @@ -1291,13 +1334,23 @@ poller_message_t *Communicator::create_message(void *context)
session->in = session->message_in();
if (session->in)
{
session->in->poller_message_t::append = Communicator::append;
session->in->poller_message_t::append = Communicator::append_reply;
session->in->entry = entry;
}

return session->in;
}

poller_message_t *Communicator::create_message(void *context)
{
struct CommConnEntry *entry = (struct CommConnEntry *)context;

if (entry->service)
return Communicator::create_request(entry);
else
return Communicator::create_reply(entry);
}

int Communicator::partial_written(size_t n, void *context)
{
struct CommConnEntry *entry = (struct CommConnEntry *)context;
Expand All @@ -1311,8 +1364,8 @@ int Communicator::partial_written(size_t n, void *context)

void Communicator::callback(struct poller_result *res, void *context)
{
Communicator *comm = (Communicator *)context;
msgqueue_put(res, comm->queue);
msgqueue_t *msgqueue = (msgqueue_t *)context;
msgqueue_put(res, msgqueue);
}

void *Communicator::accept(const struct sockaddr *addr, socklen_t addrlen,
Expand Down Expand Up @@ -1359,7 +1412,7 @@ int Communicator::create_handler_threads(size_t handler_threads)
if (i == handler_threads)
return 0;

msgqueue_set_nonblock(this->queue);
msgqueue_set_nonblock(this->msgqueue);
thrdpool_destroy(NULL, this->thrdpool);
}

Expand All @@ -1373,15 +1426,15 @@ int Communicator::create_poller(size_t poller_threads)
.create_message = Communicator::create_message,
.partial_written = Communicator::partial_written,
.callback = Communicator::callback,
.context = this
};

if ((ssize_t)params.max_open_files < 0)
return -1;

this->queue = msgqueue_create(4096, sizeof (struct poller_result));
if (this->queue)
this->msgqueue = msgqueue_create(4096, sizeof (struct poller_result));
if (this->msgqueue)
{
params.context = this->msgqueue;
this->mpoller = mpoller_create(&params, poller_threads);
if (this->mpoller)
{
Expand All @@ -1391,7 +1444,7 @@ int Communicator::create_poller(size_t poller_threads)
mpoller_destroy(this->mpoller);
}

msgqueue_destroy(this->queue);
msgqueue_destroy(this->msgqueue);
}

return -1;
Expand All @@ -1415,7 +1468,7 @@ int Communicator::init(size_t poller_threads, size_t handler_threads)

mpoller_stop(this->mpoller);
mpoller_destroy(this->mpoller);
msgqueue_destroy(this->queue);
msgqueue_destroy(this->msgqueue);
}

return -1;
Expand All @@ -1425,10 +1478,10 @@ void Communicator::deinit()
{
this->stop_flag = 1;
mpoller_stop(this->mpoller);
msgqueue_set_nonblock(this->queue);
msgqueue_set_nonblock(this->msgqueue);
thrdpool_destroy(NULL, this->thrdpool);
mpoller_destroy(this->mpoller);
msgqueue_destroy(this->queue);
msgqueue_destroy(this->msgqueue);
}

int Communicator::nonblock_connect(CommTarget *target)
Expand Down
10 changes: 8 additions & 2 deletions src/kernel/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class Communicator

private:
struct __mpoller *mpoller;
struct __msgqueue *queue;
struct __msgqueue *msgqueue;
struct __thrdpool *thrdpool;
int stop_flag;

Expand Down Expand Up @@ -341,10 +341,16 @@ class Communicator
static int first_timeout_send(CommSession *session);
static int first_timeout_recv(CommSession *session);

static int append(const void *buf, size_t *size, poller_message_t *msg);
static int append_request(const void *buf, size_t *size,
poller_message_t *msg);
static int append_reply(const void *buf, size_t *size,
poller_message_t *msg);

static int create_service_session(struct CommConnEntry *entry);

static poller_message_t *create_request(struct CommConnEntry *entry);
static poller_message_t *create_reply(struct CommConnEntry *entry);

static poller_message_t *create_message(void *context);

static int partial_written(size_t n, void *context);
Expand Down
2 changes: 1 addition & 1 deletion src/util/json_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ struct __json_array

struct __json_value
{
int type;
union
{
char *string;
double number;
json_object_t object;
json_array_t array;
} value;
int type;
};

struct __json_member
Expand Down

0 comments on commit 83d6346

Please sign in to comment.