From 6ce2ea4d1f8f92e32f166ab8d36a230ed589fc15 Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Mon, 8 Apr 2024 08:39:35 +0000 Subject: [PATCH] use shared query buffer for clients read Signed-off-by: Uri Yagelnik --- src/networking.c | 84 ++++++++++++++++++++++++++++++------ src/replication.c | 3 ++ src/server.c | 18 ++++++-- src/server.h | 1 + tests/unit/introspection.tcl | 4 +- tests/unit/querybuf.tcl | 20 ++++++++- 6 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index fdfb9b3fda..e014fc1a44 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,6 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ +__thread sds shared_qb = NULL; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -152,7 +153,7 @@ client *createClient(connection *conn) { c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; - c->querybuf = sdsempty(); + c->querybuf = NULL; c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; @@ -2116,6 +2117,48 @@ void resetClient(client *c) { } } +/* Initializes the shared query buffer to a new sds with the default capacity */ +void initSharedQueryBuf(void) { + shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(shared_qb); +} + +/* Resets the client's query buffer. + * + * If the client is using the shared query buffer, the remaining data is copied + * to a new private buffer. If the client is using a private buffer, the buffer + * is trimmed to the current position. */ +void resetClientQueryBuffer(client *c) { + if (c->querybuf == NULL) { + return; + } + + serverAssert(c->qb_pos <= sdslen(c->querybuf)); + + if (c->querybuf != shared_qb) { + if (c->qb_pos > 0) { + sdsrange(c->querybuf, c->qb_pos, -1); + c->qb_pos = 0; + } + return; + } + + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + + if (remaining > 0) { + /* Allocate large enough buf to avoid re-allocation in the next read */ + size_t size = remaining <= PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN : remaining; + c->querybuf = sdsnewlen(NULL, size); + memcpy(c->querybuf, shared_qb + c->qb_pos, remaining); + sdssetlen(c->querybuf, remaining); + } else { + c->querybuf = NULL; + } + + sdsclear(shared_qb); + c->qb_pos = 0; +} + /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2363,7 +2406,7 @@ int processMultibulkBuffer(client *c) { } c->qb_pos = newline-c->querybuf+2; - if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { + if (c->querybuf != shared_qb && !(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { /* When the client is not a master client (because master * client's querybuf can only be trimmed after data applied * and sent to replicas). @@ -2542,7 +2585,7 @@ int processPendingCommandAndInputBuffer(client *c) { * return C_ERR in case the client was freed during the processing */ int processInputBuffer(client *c) { /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while(c->querybuf && c->qb_pos < sdslen(c->querybuf)) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2593,6 +2636,12 @@ int processInputBuffer(client *c) { break; } + if (c->querybuf == shared_qb) { + /* Before processing the command, reset the shared query buffer to its default state. + * This avoids unintentionally modifying the shared qb during processCommand */ + resetClientQueryBuffer(c); + } + /* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this @@ -2621,10 +2670,8 @@ int processInputBuffer(client *c) { c->qb_pos -= c->repl_applied; c->repl_applied = 0; } - } else if (c->qb_pos) { - /* Trim to pos */ - sdsrange(c->querybuf,c->qb_pos,-1); - c->qb_pos = 0; + } else { + resetClientQueryBuffer(c); } /* Update client memory usage after processing the query buffer, this is @@ -2649,6 +2696,7 @@ void readQueryFromClient(connection *conn) { atomicIncr(server.stat_total_reads_processed, 1); readlen = PROTO_IOBUF_LEN; + qblen = c->querybuf ? sdslen(c->querybuf) : 0; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even @@ -2658,7 +2706,7 @@ void readQueryFromClient(connection *conn) { if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { - ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); + ssize_t remaining = (size_t)(c->bulklen+2)-(qblen-c->qb_pos); big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, @@ -2671,7 +2719,12 @@ void readQueryFromClient(connection *conn) { readlen = PROTO_IOBUF_LEN; } - qblen = sdslen(c->querybuf); + if (c->querybuf == NULL) { + serverAssert(sdslen(shared_qb) == 0); + c->querybuf = big_arg ? sdsempty() : shared_qb; + qblen = sdslen(c->querybuf); + } + if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg @@ -2692,7 +2745,7 @@ void readQueryFromClient(connection *conn) { nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { - return; + goto done; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); @@ -2744,6 +2797,10 @@ void readQueryFromClient(connection *conn) { c = NULL; done: + if (c && c->querybuf == shared_qb) { + sdsclear(shared_qb); + c->querybuf = NULL; + } beforeNextClient(c); } @@ -2859,8 +2916,8 @@ sds catClientInfoString(sds s, client *client) { " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, " watch=%i", (int) listLength(client->watched_keys), - " qbuf=%U", (unsigned long long) sdslen(client->querybuf), - " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), + " qbuf=%U", (unsigned long long) client->querybuf ? sdslen(client->querybuf) : 0, + " qbuf-free=%U", (unsigned long long) client->querybuf ? sdsavail(client->querybuf) : 0, " argv-mem=%U", (unsigned long long) client->argv_len_sum, " multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, " rbs=%U", (unsigned long long) client->buf_usable_size, @@ -3831,7 +3888,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { size_t mem = getClientOutputBufferMemoryUsage(c); if (output_buffer_mem_usage != NULL) *output_buffer_mem_usage = mem; - mem += sdsZmallocSize(c->querybuf); + mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; mem += zmalloc_size(c); mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory @@ -4228,6 +4285,7 @@ void *IOThreadMain(void *myid) { redis_set_thread_title(thdname); serverSetCpuAffinity(server.server_cpulist); makeThreadKillable(); + initSharedQueryBuf(); while(1) { /* Wait for start */ diff --git a/src/replication.c b/src/replication.c index 5fb477cd8b..c06b417b78 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1781,6 +1781,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) { * connection. */ server.master->flags |= CLIENT_MASTER; + /* Allocate a private query buffer for the master client instead of using the shared query buffer. + * This is done because the master's query buffer data needs to be preserved for its sub-replicas to use. */ + server.master->querybuf = sdsempty(); server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/server.c b/src/server.c index 756039a071..c870b5b13e 100644 --- a/src/server.c +++ b/src/server.c @@ -735,6 +735,7 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -744,7 +745,15 @@ int clientsCronResizeQueryBuffer(client *c) { /* There are two conditions to resize the query buffer: */ if (idletime > 2) { /* 1) Query is idle for a long time. */ - c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + if (!(c->flags & CLIENT_MASTER) && !remaining) { + /* If the client is not a master and no data is pending, + * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ + sdsfree(c->querybuf); + c->querybuf = NULL; + } else { + c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1); + } } else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) { /* 2) Query buffer is too big for latest peak and is larger than * resize threshold. Trim excess space but only up to a limit, @@ -760,7 +769,7 @@ int clientsCronResizeQueryBuffer(client *c) { /* Reset the peak again to capture the peak memory usage in the next * cycle. */ - c->querybuf_peak = sdslen(c->querybuf); + c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0; /* We reset to either the current used, or currently processed bulk size, * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2; @@ -835,7 +844,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0}; int clientsCronTrackExpansiveClients(client *c, int time_idx) { - size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum + + size_t in_usage = (c->querybuf ? sdsZmallocSize(c->querybuf): 0) + c->argv_len_sum + (c->argv ? zmalloc_size(c->argv) : 0); size_t out_usage = getClientOutputBufferMemoryUsage(c); @@ -2782,6 +2791,7 @@ void initServer(void) { } slowlogInit(); latencyMonitorInit(); + initSharedQueryBuf(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); @@ -6553,7 +6563,7 @@ void dismissMemory(void* ptr, size_t size_hint) { void dismissClientMemory(client *c) { /* Dismiss client query buffer and static reply buffer. */ dismissMemory(c->buf, c->buf_usable_size); - dismissSds(c->querybuf); + if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { diff --git a/src/server.h b/src/server.h index 0ca24e3c92..673cc17fcd 100644 --- a/src/server.h +++ b/src/server.h @@ -2671,6 +2671,7 @@ void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); void initThreadedIO(void); +void initSharedQueryBuf(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 1e6e38625c..7820a84a8d 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl index f4859dd278..6a2f6834b7 100644 --- a/tests/unit/querybuf.tcl +++ b/tests/unit/querybuf.tcl @@ -24,8 +24,21 @@ start_server {tags {"querybuf slow"}} { # The test will run at least 2s to check if client query # buffer will be resized when client idle 2s. test "query buffer resized correctly" { - set rd [redis_client] + set rd [redis_deferring_client] $rd client setname test_client + $rd read + + # Make sure query buff has size of 0 bytes at start as the client uses the shared qb. + assert {[client_query_buffer test_client] == 0} + + # Send partial command to client to make sure it doesn't use the shared qb. + $rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" + $rd flush + after 100 + # send the rest of the command + $rd write "a\r\n" + $rd flush + set orig_test_client_qbuf [client_query_buffer test_client] # Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k # but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k @@ -78,6 +91,11 @@ start_server {tags {"querybuf slow"}} { $rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" $rd flush + after 200 + # Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size. + $rd write "a" + $rd flush + after 20 if {[client_query_buffer test_client] < 1000000} { fail "query buffer should not be resized when client idle time smaller than 2s"