From 0b3455772b68932bdce6a64b459d77109b09bf7e Mon Sep 17 00:00:00 2001 From: Uri Yagelnik Date: Thu, 9 May 2024 05:25:58 +0000 Subject: [PATCH] PR changes Signed-off-by: Uri Yagelnik --- src/networking.c | 83 +++++++++++++++++++++++++++--------------------- src/server.c | 4 +++ 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/networking.c b/src/networking.c index 76cb4995ff..87bf0b54fe 100644 --- a/src/networking.c +++ b/src/networking.c @@ -43,7 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); char *getClientSockname(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ -__thread sds shared_qb = NULL; +__thread sds thread_shared_qb = NULL; /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -1617,7 +1617,11 @@ void freeClient(client *c) { } /* Free the query buffer */ - if (c->querybuf != shared_qb) sdsfree(c->querybuf); + if (c->querybuf == thread_shared_qb) { + sdsclear(c->querybuf); + } else { + sdsfree(c->querybuf); + } c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ @@ -2119,44 +2123,44 @@ void resetClient(client *c) { /* Initializes the shared query buffer to a new sds with the default capacity */ void initSharedQueryBuf(void) { - shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); - sdsclear(shared_qb); + thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(thread_shared_qb); } -/* Resets the client's query buffer. - * - * If the client is using the shared query buffer, the remaining data is copied - * to a new private buffer. If the client is using a private buffer, the buffer - * is trimmed to the current position. */ -void trimClientQueryBuffer(client *c) { - if (c->querybuf == NULL) { +/* Resets the shared query buffer used by the given client. + * If any data remained in the buffer, the client will take ownership of the buffer + * and a new empty buffer will be allocated for the shared buffer. */ +void resetSharedQueryBuf(client *c) { + if (c->querybuf != thread_shared_qb) return; + size_t remaining = sdslen(c->querybuf) - c->qb_pos; + + if (remaining > 0) { + /* Let the client take ownership of the shared buffer. */ + initSharedQueryBuf(); return; } - serverAssert(c->qb_pos <= sdslen(c->querybuf)); + c->querybuf = NULL; + sdsclear(thread_shared_qb); + c->qb_pos = 0; +} - if (c->querybuf != shared_qb) { - if (c->qb_pos > 0) { - sdsrange(c->querybuf, c->qb_pos, -1); - c->qb_pos = 0; - } +/* Trims the client query buffer to the current position. */ +void trimClientQueryBuffer(client *c) { + if (c->querybuf == thread_shared_qb) { + resetSharedQueryBuf(c); + } + + if (c->querybuf == NULL) { return; } - size_t remaining = sdslen(c->querybuf) - c->qb_pos; + serverAssert(c->qb_pos <= sdslen(c->querybuf)); - if (remaining > 0) { - /* Allocate large enough buf to avoid re-allocation in the next read */ - size_t size = remaining <= PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN : remaining; - c->querybuf = sdsnewlen(NULL, size); - memcpy(c->querybuf, shared_qb + c->qb_pos, remaining); - sdssetlen(c->querybuf, remaining); - } else { - c->querybuf = NULL; + if (c->qb_pos > 0) { + sdsrange(c->querybuf, c->qb_pos, -1); + c->qb_pos = 0; } - - sdsclear(shared_qb); - c->qb_pos = 0; } /* This function is used when we want to re-enter the event loop but there @@ -2406,7 +2410,7 @@ int processMultibulkBuffer(client *c) { } c->qb_pos = newline-c->querybuf+2; - if (c->querybuf != shared_qb && !(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { + if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) { /* When the client is not a master client (because master * client's querybuf can only be trimmed after data applied * and sent to replicas). @@ -2421,6 +2425,10 @@ int processMultibulkBuffer(client *c) { * ll+2, trimming querybuf is just a waste of time, because * at this time the querybuf contains not only our bulk. */ if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { + if (c->querybuf == thread_shared_qb) { + /* Let the client take the ownership of the shared buffer. */ + initSharedQueryBuf(); + } sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is @@ -2636,10 +2644,11 @@ int processInputBuffer(client *c) { break; } - if (c->querybuf == shared_qb) { + if (c->querybuf == thread_shared_qb) { /* Before processing the command, reset the shared query buffer to its default state. - * This avoids unintentionally modifying the shared qb during processCommand */ - trimClientQueryBuffer(c); + * This avoids unintentionally modifying the shared qb during processCommand as we may use + * the shared qb for other clients during processEventsWhileBlocked */ + resetSharedQueryBuf(c); } /* We are finally ready to execute the command. */ @@ -2720,8 +2729,8 @@ void readQueryFromClient(connection *conn) { } if (c->querybuf == NULL) { - serverAssert(sdslen(shared_qb) == 0); - c->querybuf = big_arg ? sdsempty() : shared_qb; + serverAssert(sdslen(thread_shared_qb) == 0); + c->querybuf = big_arg ? sdsempty() : thread_shared_qb; qblen = sdslen(c->querybuf); } @@ -2797,8 +2806,8 @@ void readQueryFromClient(connection *conn) { c = NULL; done: - if (c && c->querybuf == shared_qb) { - sdsclear(shared_qb); + if (c && c->querybuf == thread_shared_qb) { + sdsclear(thread_shared_qb); c->querybuf = NULL; } beforeNextClient(c); diff --git a/src/server.c b/src/server.c index c870b5b13e..d4acc06a53 100644 --- a/src/server.c +++ b/src/server.c @@ -735,6 +735,7 @@ long long getInstantaneousMetric(int metric) { * * The function always returns 0 as it never terminates the client. */ int clientsCronResizeQueryBuffer(client *c) { + /* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */ if (c->querybuf == NULL) return 0; size_t querybuf_size = sdsalloc(c->querybuf); time_t idletime = server.unixtime - c->lastinteraction; @@ -750,6 +751,9 @@ int clientsCronResizeQueryBuffer(client *c) { /* If the client is not a master and no data is pending, * The client can safely use the shared query buffer in the next read - free the client's querybuf. */ sdsfree(c->querybuf); + /* By setting the querybuf to NULL, the client will use the shared query buffer in the next read. + * We don't move the client to the shared query buffer immediately, because if we allocated a private + * query buffer for the client, it's likely that the client will use it again soon. */ c->querybuf = NULL; } else { c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);