Skip to content

Commit

Permalink
use shared query buffer for clients read
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed Apr 8, 2024
1 parent 6411629 commit 6ce2ea4
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 20 deletions.
84 changes: 71 additions & 13 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds shared_qb = NULL;

/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -152,7 +153,7 @@ client *createClient(connection *conn) {
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
Expand Down Expand Up @@ -2116,6 +2117,48 @@ void resetClient(client *c) {
}
}

/* Initializes the shared query buffer to a new sds with the default capacity */
void initSharedQueryBuf(void) {
shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(shared_qb);
}

/* Resets the client's query buffer.
*
* If the client is using the shared query buffer, the remaining data is copied
* to a new private buffer. If the client is using a private buffer, the buffer
* is trimmed to the current position. */
void resetClientQueryBuffer(client *c) {
if (c->querybuf == NULL) {
return;
}

serverAssert(c->qb_pos <= sdslen(c->querybuf));

if (c->querybuf != shared_qb) {
if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
}
return;
}

size_t remaining = sdslen(c->querybuf) - c->qb_pos;

if (remaining > 0) {
/* Allocate large enough buf to avoid re-allocation in the next read */
size_t size = remaining <= PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN : remaining;
c->querybuf = sdsnewlen(NULL, size);
memcpy(c->querybuf, shared_qb + c->qb_pos, remaining);
sdssetlen(c->querybuf, remaining);
} else {
c->querybuf = NULL;
}

sdsclear(shared_qb);
c->qb_pos = 0;
}

/* This function is used when we want to re-enter the event loop but there
* is the risk that the client we are dealing with will be freed in some
* way. This happens for instance in:
Expand Down Expand Up @@ -2363,7 +2406,7 @@ int processMultibulkBuffer(client *c) {
}

c->qb_pos = newline-c->querybuf+2;
if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
if (c->querybuf != shared_qb && !(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
/* When the client is not a master client (because master
* client's querybuf can only be trimmed after data applied
* and sent to replicas).
Expand Down Expand Up @@ -2542,7 +2585,7 @@ int processPendingCommandAndInputBuffer(client *c) {
* return C_ERR in case the client was freed during the processing */
int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
while(c->querybuf && c->qb_pos < sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

Expand Down Expand Up @@ -2593,6 +2636,12 @@ int processInputBuffer(client *c) {
break;
}

if (c->querybuf == shared_qb) {
/* Before processing the command, reset the shared query buffer to its default state.
* This avoids unintentionally modifying the shared qb during processCommand */
resetClientQueryBuffer(c);
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
Expand Down Expand Up @@ -2621,10 +2670,8 @@ int processInputBuffer(client *c) {
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
} else {
resetClientQueryBuffer(c);
}

/* Update client memory usage after processing the query buffer, this is
Expand All @@ -2649,6 +2696,7 @@ void readQueryFromClient(connection *conn) {
atomicIncr(server.stat_total_reads_processed, 1);

readlen = PROTO_IOBUF_LEN;
qblen = c->querybuf ? sdslen(c->querybuf) : 0;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
Expand All @@ -2658,7 +2706,7 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
ssize_t remaining = (size_t)(c->bulklen+2)-(qblen-c->qb_pos);
big_arg = 1;

/* Note that the 'remaining' variable may be zero in some edge case,
Expand All @@ -2671,7 +2719,12 @@ void readQueryFromClient(connection *conn) {
readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (c->querybuf == NULL) {
serverAssert(sdslen(shared_qb) == 0);
c->querybuf = big_arg ? sdsempty() : shared_qb;
qblen = sdslen(c->querybuf);
}

if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
Expand All @@ -2692,7 +2745,7 @@ void readQueryFromClient(connection *conn) {
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2744,6 +2797,10 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && c->querybuf == shared_qb) {
sdsclear(shared_qb);
c->querybuf = NULL;
}
beforeNextClient(c);
}

Expand Down Expand Up @@ -2859,8 +2916,8 @@ sds catClientInfoString(sds s, client *client) {
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
" qbuf=%U", (unsigned long long) client->querybuf ? sdslen(client->querybuf) : 0,
" qbuf-free=%U", (unsigned long long) client->querybuf ? sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" rbs=%U", (unsigned long long) client->buf_usable_size,
Expand Down Expand Up @@ -3831,7 +3888,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
size_t mem = getClientOutputBufferMemoryUsage(c);
if (output_buffer_mem_usage != NULL)
*output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
mem += zmalloc_size(c);
mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
Expand Down Expand Up @@ -4228,6 +4285,7 @@ void *IOThreadMain(void *myid) {
redis_set_thread_title(thdname);
serverSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
initSharedQueryBuf();

while(1) {
/* Wait for start */
Expand Down
3 changes: 3 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
* connection. */
server.master->flags |= CLIENT_MASTER;

/* Allocate a private query buffer for the master client instead of using the shared query buffer.
* This is done because the master's query buffer data needs to be preserved for its sub-replicas to use. */
server.master->querybuf = sdsempty();
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
Expand Down
18 changes: 14 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;

Expand All @@ -744,7 +745,15 @@ int clientsCronResizeQueryBuffer(client *c) {
/* There are two conditions to resize the query buffer: */
if (idletime > 2) {
/* 1) Query is idle for a long time. */
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_MASTER) && !remaining) {
/* If the client is not a master and no data is pending,
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
}
} else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) {
/* 2) Query buffer is too big for latest peak and is larger than
* resize threshold. Trim excess space but only up to a limit,
Expand All @@ -760,7 +769,7 @@ int clientsCronResizeQueryBuffer(client *c) {

/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = sdslen(c->querybuf);
c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0;
/* We reset to either the current used, or currently processed bulk size,
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2;
Expand Down Expand Up @@ -835,7 +844,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum +
size_t in_usage = (c->querybuf ? sdsZmallocSize(c->querybuf): 0) + c->argv_len_sum +
(c->argv ? zmalloc_size(c->argv) : 0);
size_t out_usage = getClientOutputBufferMemoryUsage(c);

Expand Down Expand Up @@ -2782,6 +2791,7 @@ void initServer(void) {
}
slowlogInit();
latencyMonitorInit();
initSharedQueryBuf();

/* Initialize ACL default password if it exists */
ACLUpdateDefaultUserPassword(server.requirepass);
Expand Down Expand Up @@ -6553,7 +6563,7 @@ void dismissMemory(void* ptr, size_t size_hint) {
void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
if (c->querybuf) dismissSds(c->querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
void initSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} {

test {CLIENT LIST} {
r client list
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}

test {CLIENT LIST with IDs} {
set myid [r client id]
Expand All @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} {

test {CLIENT INFO} {
r client info
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}

test {CLIENT KILL with illegal arguments} {
assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill}
Expand Down
20 changes: 19 additions & 1 deletion tests/unit/querybuf.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,21 @@ start_server {tags {"querybuf slow"}} {
# The test will run at least 2s to check if client query
# buffer will be resized when client idle 2s.
test "query buffer resized correctly" {
set rd [redis_client]
set rd [redis_deferring_client]
$rd client setname test_client
$rd read

# Make sure query buff has size of 0 bytes at start as the client uses the shared qb.
assert {[client_query_buffer test_client] == 0}

# Send partial command to client to make sure it doesn't use the shared qb.
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na"
$rd flush
after 100
# send the rest of the command
$rd write "a\r\n"
$rd flush

set orig_test_client_qbuf [client_query_buffer test_client]
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k
Expand Down Expand Up @@ -78,6 +91,11 @@ start_server {tags {"querybuf slow"}} {
$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n"
$rd flush

after 200
# Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size.
$rd write "a"
$rd flush

after 20
if {[client_query_buffer test_client] < 1000000} {
fail "query buffer should not be resized when client idle time smaller than 2s"
Expand Down

0 comments on commit 6ce2ea4

Please sign in to comment.