Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
Signed-off-by: Uri Yagelnik <[email protected]>
  • Loading branch information
uriyage committed May 9, 2024
1 parent dd182c5 commit 0b34557
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 37 deletions.
83 changes: 46 additions & 37 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds shared_qb = NULL;
__thread sds thread_shared_qb = NULL;

/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -1617,7 +1617,11 @@ void freeClient(client *c) {
}

/* Free the query buffer */
if (c->querybuf != shared_qb) sdsfree(c->querybuf);
if (c->querybuf == thread_shared_qb) {
sdsclear(c->querybuf);
} else {
sdsfree(c->querybuf);
}
c->querybuf = NULL;

/* Deallocate structures used to block on blocking ops. */
Expand Down Expand Up @@ -2119,44 +2123,44 @@ void resetClient(client *c) {

/* Initializes the shared query buffer to a new sds with the default capacity */
void initSharedQueryBuf(void) {
shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(shared_qb);
thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(thread_shared_qb);
}

/* Resets the client's query buffer.
*
* If the client is using the shared query buffer, the remaining data is copied
* to a new private buffer. If the client is using a private buffer, the buffer
* is trimmed to the current position. */
void trimClientQueryBuffer(client *c) {
if (c->querybuf == NULL) {
/* Resets the shared query buffer used by the given client.
* If any data remained in the buffer, the client will take ownership of the buffer
* and a new empty buffer will be allocated for the shared buffer. */
void resetSharedQueryBuf(client *c) {
if (c->querybuf != thread_shared_qb) return;
size_t remaining = sdslen(c->querybuf) - c->qb_pos;

if (remaining > 0) {
/* Let the client take ownership of the shared buffer. */
initSharedQueryBuf();
return;
}

serverAssert(c->qb_pos <= sdslen(c->querybuf));
c->querybuf = NULL;
sdsclear(thread_shared_qb);
c->qb_pos = 0;
}

if (c->querybuf != shared_qb) {
if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
}
/* Trims the client query buffer to the current position. */
void trimClientQueryBuffer(client *c) {
if (c->querybuf == thread_shared_qb) {
resetSharedQueryBuf(c);
}

if (c->querybuf == NULL) {
return;
}

size_t remaining = sdslen(c->querybuf) - c->qb_pos;
serverAssert(c->qb_pos <= sdslen(c->querybuf));

if (remaining > 0) {
/* Allocate large enough buf to avoid re-allocation in the next read */
size_t size = remaining <= PROTO_IOBUF_LEN ? PROTO_IOBUF_LEN : remaining;
c->querybuf = sdsnewlen(NULL, size);
memcpy(c->querybuf, shared_qb + c->qb_pos, remaining);
sdssetlen(c->querybuf, remaining);
} else {
c->querybuf = NULL;
if (c->qb_pos > 0) {
sdsrange(c->querybuf, c->qb_pos, -1);
c->qb_pos = 0;
}

sdsclear(shared_qb);
c->qb_pos = 0;
}

/* This function is used when we want to re-enter the event loop but there
Expand Down Expand Up @@ -2406,7 +2410,7 @@ int processMultibulkBuffer(client *c) {
}

c->qb_pos = newline-c->querybuf+2;
if (c->querybuf != shared_qb && !(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
/* When the client is not a master client (because master
* client's querybuf can only be trimmed after data applied
* and sent to replicas).
Expand All @@ -2421,6 +2425,10 @@ int processMultibulkBuffer(client *c) {
* ll+2, trimming querybuf is just a waste of time, because
* at this time the querybuf contains not only our bulk. */
if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
if (c->querybuf == thread_shared_qb) {
/* Let the client take the ownership of the shared buffer. */
initSharedQueryBuf();
}
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
/* Hint the sds library about the amount of bytes this string is
Expand Down Expand Up @@ -2636,10 +2644,11 @@ int processInputBuffer(client *c) {
break;
}

if (c->querybuf == shared_qb) {
if (c->querybuf == thread_shared_qb) {
/* Before processing the command, reset the shared query buffer to its default state.
* This avoids unintentionally modifying the shared qb during processCommand */
trimClientQueryBuffer(c);
* This avoids unintentionally modifying the shared qb during processCommand as we may use
* the shared qb for other clients during processEventsWhileBlocked */
resetSharedQueryBuf(c);
}

/* We are finally ready to execute the command. */
Expand Down Expand Up @@ -2720,8 +2729,8 @@ void readQueryFromClient(connection *conn) {
}

if (c->querybuf == NULL) {
serverAssert(sdslen(shared_qb) == 0);
c->querybuf = big_arg ? sdsempty() : shared_qb;
serverAssert(sdslen(thread_shared_qb) == 0);
c->querybuf = big_arg ? sdsempty() : thread_shared_qb;
qblen = sdslen(c->querybuf);
}

Expand Down Expand Up @@ -2797,8 +2806,8 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && c->querybuf == shared_qb) {
sdsclear(shared_qb);
if (c && c->querybuf == thread_shared_qb) {
sdsclear(thread_shared_qb);
c->querybuf = NULL;
}
beforeNextClient(c);
Expand Down
4 changes: 4 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
/* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;
Expand All @@ -750,6 +751,9 @@ int clientsCronResizeQueryBuffer(client *c) {
/* If the client is not a master and no data is pending,
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
/* By setting the querybuf to NULL, the client will use the shared query buffer in the next read.
* We don't move the client to the shared query buffer immediately, because if we allocated a private
* query buffer for the client, it's likely that the client will use it again soon. */
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
Expand Down

0 comments on commit 0b34557

Please sign in to comment.