Skip to content

Commit

Permalink
Offload replication writes to IO threads (#1485)
Browse files Browse the repository at this point in the history
This PR offloads the write to replica clients to IO threads.

## Main Changes

* Replica writes will be offloaded but only after the replica is in
online mode..
* Replica reads will still be done in the main thread to reduce
complexity and because read traffic from replicas is negligible.
### Implementation Details

In order to offload the writes, `writeToReplica` has been split into 2
parts:
1. The write itself made by the IO thread or by the main thread
2. The post write where we update the replication buffers refcount will
be done in the main-thread after the write-job is done in the IO thread
(similar to what we do with a regular client)

### Additional Changes

* In `writeToReplica` we now use `writev` in case more than 1 buffer
exists.
* Changed client `nwritten` field to `ssize_t` since with a replica the
`nwritten` can theoretically exceed `int` size (not subject to
`NET_MAX_WRITES_PER_EVENT` limit).
* Changed parsing code to use `memchr` instead of `strchr`:
* During parsing command, ASAN got stuck for unknown reason when called
to `strchr` to look for the next `\r`
  * Adding assert for null-terminated querybuf didn't resolve the issue.
  * Switched to `memchr` as it's more secure and resolves the issue

### Testing
* Added integration tests
* Added unit tests

**Related issue:** #761

---------

Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage authored Feb 9, 2025
1 parent 7fa784a commit de3672a
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 47 deletions.
34 changes: 21 additions & 13 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica reads are not offloaded to IO threads. */
/* For simplicity, don't offload replica clients reads as read traffic from replica is negligible */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
Expand Down Expand Up @@ -364,8 +364,8 @@ int trySendWriteToIOThreads(client *c) {
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica writes are not offloaded to IO threads. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* For simplicity, avoid offloading non-online replicas */
if (getClientType(c) == CLIENT_TYPE_REPLICA && c->repl_data->repl_state != REPLICA_STATE_ONLINE) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;

Expand All @@ -392,21 +392,29 @@ int trySendWriteToIOThreads(client *c) {
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);

/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
replBufBlock *o = listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = o->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
}
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0 || is_replica);

/* The main-thread will update the client state after the I/O thread completes the write. */
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = 0;
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
c->io_write_state = CLIENT_PENDING_IO;

IOJobQueue_push(jq, ioThreadWriteToClient, c);
Expand Down
152 changes: 123 additions & 29 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,9 @@ void disconnectReplicas(void) {
void unlinkClient(client *c) {
listNode *ln;

/* Wait for IO operations to be done before unlinking the client. */
waitForClientIO(c);

/* If this is marked as current client unset it. */
if (c->conn && server.current_client == c) server.current_client = NULL;

Expand Down Expand Up @@ -1934,36 +1937,122 @@ client *lookupClientByID(uint64_t id) {
return c;
}

void writeToReplica(client *c) {
/* Can be called from main-thread only as replica write offload is not supported yet */
serverAssert(inMainThread());
int nwritten = 0;
static void postWriteToReplica(client *c) {
if (c->nwritten <= 0) return;

server.stat_net_repl_output_bytes += c->nwritten;

/* Locate the last node which has leftover data and
* decrement reference counts of all nodes in front of it.
* Set c->ref_repl_buf_node to point to the last node and
* c->ref_block_pos to the offset within that node */
listNode *curr = c->repl_data->ref_repl_buf_node;
listNode *next = NULL;
size_t nwritten = c->nwritten + c->repl_data->ref_block_pos;
replBufBlock *o = listNodeValue(curr);

while (nwritten >= o->used) {
next = listNextNode(curr);
if (!next) break; /* End of list */

nwritten -= o->used;
o->refcount--;

curr = next;
o = listNodeValue(curr);
o->refcount++;
}

serverAssert(nwritten <= o->used);
c->repl_data->ref_repl_buf_node = curr;
c->repl_data->ref_block_pos = nwritten;

incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}

static void writeToReplica(client *c) {
listNode *last_node;
size_t bufpos;

serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
while (clientHasPendingReplies(c)) {
replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node);
serverAssert(o->used >= c->repl_data->ref_block_pos);

/* Send current block if it is not fully sent. */
if (o->used > c->repl_data->ref_block_pos) {
nwritten = connWrite(c->conn, o->buf + c->repl_data->ref_block_pos, o->used - c->repl_data->ref_block_pos);
if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
return;
}
c->nwritten += nwritten;
c->repl_data->ref_block_pos += nwritten;
/* Determine the last block and buffer position based on thread context */
if (inMainThread()) {
last_node = listLast(server.repl_buffer_blocks);
if (!last_node) return;
bufpos = ((replBufBlock *)listNodeValue(last_node))->used;
} else {
last_node = c->io_last_reply_block;
serverAssert(last_node != NULL);
bufpos = c->io_last_bufpos;
}

listNode *first_node = c->repl_data->ref_repl_buf_node;

/* Handle the single block case */
if (first_node == last_node) {
replBufBlock *b = listNodeValue(first_node);
c->nwritten = connWrite(c->conn, b->buf + c->repl_data->ref_block_pos, bufpos - c->repl_data->ref_block_pos);
if (c->nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
}
return;
}

/* Multiple blocks case */
ssize_t total_bytes = 0;
int iovcnt = 0;
struct iovec iov_arr[IOV_MAX];
struct iovec *iov = iov_arr;
int iovmax = min(IOV_MAX, c->conn->iovcnt);

for (listNode *cur_node = first_node; cur_node != NULL && iovcnt < iovmax; cur_node = listNextNode(cur_node)) {
replBufBlock *cur_block = listNodeValue(cur_node);
size_t start = (cur_node == first_node) ? c->repl_data->ref_block_pos : 0;
size_t len = (cur_node == last_node) ? bufpos : cur_block->used;
len -= start;

iov[iovcnt].iov_base = cur_block->buf + start;
iov[iovcnt].iov_len = len;
total_bytes += len;
iovcnt++;
if (cur_node == last_node) break;
}

if (total_bytes == 0) return;

ssize_t totwritten = 0;
while (iovcnt > 0) {
int nwritten = connWritev(c->conn, iov, iovcnt);

if (nwritten <= 0) {
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
c->nwritten = (totwritten > 0) ? totwritten : nwritten;
return;
}

totwritten += nwritten;

if (totwritten == total_bytes) {
break;
}

/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->repl_data->ref_repl_buf_node);
if (next && c->repl_data->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->repl_data->ref_repl_buf_node = next;
c->repl_data->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
/* Update iov array */
while (nwritten > 0) {
if ((size_t)nwritten < iov[0].iov_len) {
/* partial block written */
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
iov[0].iov_len -= nwritten;
break;
}

/* full block written */
nwritten -= iov[0].iov_len;
iov++;
iovcnt--;
}
}

c->nwritten = totwritten;
}

/* This function should be called from _writeToClient when the reply list is not empty,
Expand Down Expand Up @@ -2158,7 +2247,7 @@ int postWriteToClient(client *c) {
if (getClientType(c) != CLIENT_TYPE_REPLICA) {
_postWriteToClient(c);
} else {
server.stat_net_repl_output_bytes += c->nwritten > 0 ? c->nwritten : 0;
postWriteToReplica(c);
}

if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
Expand Down Expand Up @@ -2718,7 +2807,7 @@ void processMultibulkBuffer(client *c) {
serverAssertWithInfo(c, NULL, c->argc == 0);

/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_MULTIBULK;
Expand Down Expand Up @@ -2795,7 +2884,7 @@ void processMultibulkBuffer(client *c) {
while (c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
newline = memchr(c->querybuf + c->qb_pos, '\r', sdslen(c->querybuf) - c->qb_pos);
if (newline == NULL) {
if (sdslen(c->querybuf) - c->qb_pos > PROTO_INLINE_MAX_SIZE) {
c->read_flags |= READ_FLAGS_ERROR_BIG_BULK_COUNT;
Expand Down Expand Up @@ -5042,7 +5131,12 @@ void ioThreadWriteToClient(void *data) {
client *c = data;
serverAssert(c->io_write_state == CLIENT_PENDING_IO);
c->nwritten = 0;
_writeToClient(c);
if (c->write_flags & WRITE_FLAGS_IS_REPLICA) {
writeToReplica(c);
} else {
_writeToClient(c);
}

atomic_thread_fence(memory_order_release);
c->io_write_state = CLIENT_COMPLETED_IO;
}
2 changes: 0 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4238,8 +4238,6 @@ void replicationCachePrimary(client *c) {
serverAssert(server.primary != NULL && server.cached_primary == NULL);
serverLog(LL_NOTICE, "Caching the disconnected primary state.");

/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* Unlink the client from the server structures. */
unlinkClient(c);

Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ void dictVanillaFree(void *val);

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)

#define WRITE_FLAGS_IS_REPLICA (1 << 1)

client *createClient(connection *conn);
void freeClient(client *c);
Expand Down
Loading

0 comments on commit de3672a

Please sign in to comment.