diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index b52692bd15..8ef6c14348 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -132,22 +132,23 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } -static int canAddNetworkBytesOut(client *c) { - return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1; +/* Accumulates egress bytes for the slot. */ +void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) { + if (!clusterSlotStatsEnabled(slot)) return; + + serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); + server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out; } /* Accumulates egress bytes upon sending RESP responses back to user clients. */ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { - if (!canAddNetworkBytesOut(c)) return; - - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; + clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd); } /* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) { client *c = server.current_client; - if (c == NULL || !canAddNetworkBytesOut(c)) return; + if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return; serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); serverAssert(nodeIsPrimary(server.cluster->myself)); @@ -174,24 +175,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) { * This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation. * This function covers the internal propagation component. */ void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { - /* For a blocked client, c->slot could be pre-filled. - * Thus c->slot is backed-up for restoration after aggregation is completed. */ - int _slot = c->slot; - c->slot = slot; - if (!canAddNetworkBytesOut(c)) { - /* c->slot should not change as a side effect of this function, - * regardless of the function's early return condition. */ - c->slot = _slot; - return; - } + if (!clusterSlotStatsEnabled(slot)) return; - serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); - server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; + serverAssert(slot >= 0 && slot < CLUSTER_SLOTS); + server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd; /* For sharded pubsub, the client's network bytes metrics must be reset here, * as resetClient() is not called until subscription ends. */ c->net_output_bytes_curr_cmd = 0; - c->slot = _slot; } /* Adds reply for the ORDERBY variant. @@ -219,9 +210,7 @@ void clusterSlotStatResetAll(void) { * would equate to repeating the same calculation twice. */ static int canAddCpuDuration(client *c) { - return server.cluster_slot_stats_enabled && /* Config should be enabled. */ - server.cluster_enabled && /* Cluster mode should be enabled. */ - c->slot != -1 && /* Command should be slot specific. */ + return clusterSlotStatsEnabled(c->slot) && (!server.execution_nesting || /* Either; */ (server.execution_nesting && /* 1) Command should not be nested, or */ c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */ @@ -248,8 +237,7 @@ static int canAddNetworkBytesIn(client *c) { * Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. * Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of * EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */ - return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) && - !server.in_exec; + return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec; } /* Adds network ingress bytes of the current command in execution, @@ -343,3 +331,7 @@ void clusterSlotStatsCommand(client *c) { addReplySubcommandSyntaxError(c); } } + +int clusterSlotStatsEnabled(int slot) { + return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1; +} diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index 2e9da70aae..f5c103e9ed 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -6,6 +6,7 @@ /* General use-cases. */ void clusterSlotStatReset(int slot); void clusterSlotStatResetAll(void); +int clusterSlotStatsEnabled(int slot); /* cpu-usec metric. */ void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration); @@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len); void clusterSlotStatsResetClusterMsgLength(void); /* network-bytes-out metric. */ +void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out); void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c); void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len); void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len); diff --git a/src/config.c b/src/config.c index 12c999e747..1bb09a5137 100644 --- a/src/config.c +++ b/src/config.c @@ -3256,6 +3256,10 @@ standardConfig static_configs[] = { createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-io-threads-value-prefetch-off", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_value_prefetch_off, 10, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL), + createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */ diff --git a/src/io_threads.c b/src/io_threads.c index 66ef4948b6..3b14e7a177 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -398,9 +398,13 @@ int trySendWriteToIOThreads(client *c) { * threads from reading data that might be invalid in their local CPU cache. */ c->io_last_reply_block = listLast(c->reply); if (c->io_last_reply_block) { - c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used; + clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block); + c->io_last_bufpos = block->used; + /* If reply offload enabled force new header */ + block->last_header = NULL; } else { c->io_last_bufpos = (size_t)c->bufpos; + c->last_header = NULL; } serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0); diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index ef6a6c6d02..68722a9c44 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -9,6 +9,7 @@ #include "memory_prefetch.h" #include "server.h" +#include "io_threads.h" typedef enum { PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */ @@ -119,6 +120,10 @@ static void prefetchEntry(KeyPrefetchInfo *info) { if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) { /* Not done yet */ moveToNextKey(); + } else if (server.io_threads_num >= server.min_io_threads_value_prefetch_off) { + /* Copy avoidance should be more efficient without value prefetch + * starting certain number of I/O threads */ + markKeyAsdone(info); } else { info->state = PREFETCH_VALUE; } diff --git a/src/networking.c b/src/networking.c index 093d579ef4..7993a9acef 100644 --- a/src/networking.c +++ b/src/networking.c @@ -66,6 +66,31 @@ typedef struct { int skipme; } clientFilter; +/* Types of payloads in reply buffers (c->buf and c->reply) + * Unencoded buffers contain plain replies only + * Encoded buffers contain headers followed by either plain replies or + * by bulk string references */ +typedef enum { + PLAIN_REPLY = 0, /* plain reply */ + BULK_STR_REF /* bulk string references */ +} payloadType; + +/* Encoded reply buffers consist from chunks + * Each chunk contains header followed by payload */ +typedef struct __attribute__((__packed__)) payloadHeader { + size_t len; /* payload length in a reply buffer */ + size_t actual_len; /* actual reply length for non-plain payloads */ + uint8_t type; /* one of payloadType */ + int16_t slot; /* to report network-bytes-out for BULK_STR_REF chunks */ +} payloadHeader; + +/* To avoid copy of whole string in reply buffer + * we store store pointers to object and string itself */ +typedef struct __attribute__((__packed__)) bulkStrRef { + robj *obj; /* pointer to object used for reference count management */ + sds str; /* pointer to string to optimize memory access by I/O thread */ +} bulkStrRef; + static void setProtocolError(const char *errstr, client *c); static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); @@ -149,6 +174,36 @@ static inline int isReplicaReadyForReplData(client *replica) { !(replica->flag.close_asap); } +/* Decides if copy avoidance is preferred according to client type, number of I/O threads, object size + * Maybe called with NULL obj for evaluation with no regard to object size */ +static int isCopyAvoidPreferred(client *c, robj *obj) { + if (c->flag.fake) return 0; + + /* Copy avoidance can be allowed only for regular Valkey clients + * that use _writeToClient handler to write replies to client connection */ + int type = getClientType(c); + if (type != CLIENT_TYPE_NORMAL && type != CLIENT_TYPE_PUBSUB) return 0; + + if (obj) { + if (obj->encoding != OBJ_ENCODING_RAW) return 0; + if (obj->refcount == OBJ_STATIC_REFCOUNT) return 0; + } + + /* Copy avoidance is preferred for any string size starting certain number of I/O threads */ + if (server.min_io_threads_copy_avoid && server.io_threads_num >= server.min_io_threads_copy_avoid) return 1; + + if (!obj) return 0; + size_t len = sdslen(obj->ptr); + + /* Main thread only. No I/O threads */ + if (server.io_threads_num == 1) { + /* Copy avoidance is preferred starting certain string size */ + return server.min_string_size_copy_avoid && len >= (size_t)server.min_string_size_copy_avoid; + } + /* Main thread + I/O threads */ + return server.min_string_size_copy_avoid_threaded && len >= (size_t)server.min_string_size_copy_avoid_threaded; +} + client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); @@ -178,6 +233,7 @@ client *createClient(connection *conn) { c->lib_name = NULL; c->lib_ver = NULL; c->bufpos = 0; + c->last_header = NULL; c->buf_peak = c->buf_usable_size; c->buf_peak_last_reset_time = server.unixtime; c->qb_pos = 0; @@ -197,7 +253,6 @@ client *createClient(connection *conn) { c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; - c->sentlen = 0; c->raw_flag = 0; c->capa = 0; c->slot = -1; @@ -236,6 +291,9 @@ client *createClient(connection *conn) { c->commands_processed = 0; c->io_last_reply_block = NULL; c->io_last_bufpos = 0; + c->io_last_written_buf = NULL; + c->io_last_written_bufpos = 0; + c->io_last_written_data_len = 0; return c; } @@ -379,6 +437,41 @@ void deleteCachedResponseClient(client *recording_client) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +/* Updates an existing header, if possible; otherwise inserts a new one + * Returns the length of data that can be added to the reply buffer (i.e. min(available, requested)) */ +static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) { + /* Enforce min len for BULK_STR_REF chunks as whole pointers must be written to the buffer */ + size_t min_len = (type == BULK_STR_REF ? len : 1); + if (min_len > available) return 0; + size_t reply_len = min(available, len); + + // If cluster slots stats disabled set slot to -1 to prevent excessive per slot headers + if (!clusterSlotStatsEnabled(slot)) slot = -1; + + /* Try to add payload to last chunk if possible */ + if (*last_header != NULL && (*last_header)->type == type && (*last_header)->slot == slot) { + (*last_header)->len += reply_len; + return reply_len; + } + + /* Recheck min len condition and recalculate allowed len with a new header to be added */ + if (sizeof(payloadHeader) + min_len > available) return 0; + available -= sizeof(payloadHeader); + if (len > available) reply_len = available; + + /* Start a new payload chunk */ + *last_header = (payloadHeader *)(buf + *bufpos); + + (*last_header)->type = type; + (*last_header)->len = reply_len; + (*last_header)->slot = slot; + (*last_header)->actual_len = 0; + + *bufpos += sizeof(payloadHeader); + + return reply_len; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns the length of data that is added to the reply buffer. * @@ -386,26 +479,51 @@ void deleteCachedResponseClient(client *recording_client) { * zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses * sanitizer and generates a false positive out-of-bounds error */ VALKEY_NO_SANITIZE("bounds") -size_t _addReplyToBuffer(client *c, const char *s, size_t len) { - size_t available = c->buf_usable_size - c->bufpos; - +static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t len, uint8_t payload_type) { /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return 0; - size_t reply_len = len > available ? available : len; - memcpy(c->buf + c->bufpos, s, reply_len); + size_t available = c->buf_usable_size - c->bufpos; + size_t reply_len = min(available, len); + if (c->flag.buf_encoded) { + reply_len = upsertPayloadHeader(c->buf, &c->bufpos, &c->last_header, payload_type, len, c->slot, available); + } + if (!reply_len) return 0; + + memcpy(c->buf + c->bufpos, payload, reply_len); c->bufpos += reply_len; /* We update the buffer peak after appending the reply to the buffer */ if (c->buf_peak < (size_t)c->bufpos) c->buf_peak = (size_t)c->bufpos; return reply_len; } -/* Adds the reply to the reply linked list. +static size_t _addReplyToBuffer(client *c, const char *s, size_t len) { + if (!len) return 0; + if (!c->bufpos) { + c->flag.buf_encoded = isCopyAvoidPreferred(c, NULL); + } + return _addReplyPayloadToBuffer(c, s, len, PLAIN_REPLY); +} + +/* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to static buffer + * Returns non-zero value if succeeded to add */ +static size_t _addBulkStrRefToBuffer(client *c, const void *payload, size_t len) { + if (!c->flag.buf_encoded) { + /* If buffer is plain and not empty then can't add bulk string reference to it */ + if (c->bufpos) return 0; + c->flag.buf_encoded = 1; + } + return _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF); +} + +/* Adds the payload to the reply linked list. * Note: some edits to this function need to be relayed to AddReplyFromClient. */ -void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { +static void _addReplyPayloadToList(client *c, list *reply_list, const char *payload, size_t len, uint8_t payload_type) { listNode *ln = listLast(reply_list); clientReplyBlock *tail = ln ? listNodeValue(ln) : NULL; + /* Determine if encoded buffer is required */ + int encoded = payload_type == BULK_STR_REF || isCopyAvoidPreferred(c, NULL); /* Note that 'tail' may be NULL even if we have a tail node, because when * addReplyDeferredLen() is used, it sets a dummy node to NULL just @@ -417,21 +535,39 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len * new node */ size_t avail = tail->size - tail->used; size_t copy = avail >= len ? len : avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; + + if (tail->flag.buf_encoded) { + copy = upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, avail); + } else if (encoded) { + /* If encoded buffer is required but tail is unencoded then pretend nothing can be added to it + * and, as consequence, cause addition of a new tail */ + copy = 0; + } + + if (copy) { + memcpy(tail->buf + tail->used, payload, copy); + tail->used += copy; + payload += copy; + len -= copy; + } } if (len) { /* Create a new node, make sure it is allocated to at * least PROTO_REPLY_CHUNK_BYTES */ size_t usable_size; - size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len; + size_t required_size = encoded ? len + sizeof(payloadHeader) : len; + size_t size = required_size < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : required_size; tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size); /* take over the allocation's internal fragmentation */ tail->size = usable_size - sizeof(clientReplyBlock); - tail->used = len; - memcpy(tail->buf, s, len); + tail->used = 0; + tail->flag.buf_encoded = encoded; + tail->last_header = NULL; + if (tail->flag.buf_encoded) { + upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, tail->size); + } + memcpy(tail->buf + tail->used, payload, len); + tail->used += len; listAddNodeTail(reply_list, tail); c->reply_bytes += tail->size; @@ -439,6 +575,16 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len } } +void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { + if (!len) return; + _addReplyPayloadToList(c, reply_list, s, len, PLAIN_REPLY); +} + +/* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to reply list */ +static void _addBulkStrRefToToList(client *c, const void *payload, size_t len) { + _addReplyPayloadToList(c, c->reply, payload, len, BULK_STR_REF); +} + /* The subscribe / unsubscribe command family has a push as a reply, * or in other words, it responds with a push (or several of them * depending on how many arguments it got), and has no reply. */ @@ -484,6 +630,20 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { if (len > reply_len) _addReplyProtoToList(c, c->reply, s + reply_len, len - reply_len); } +/* Increment reference to object and add pointer to object and + * pointer to string itself to current reply buffer */ +static void _addBulkStrRefToBufferOrList(client *c, robj *obj) { + if (c->flag.close_after_reply) return; + + /* Refcount will be decremented in write completion handler by the main thread */ + incrRefCount(obj); + + bulkStrRef str_ref = {.obj = obj, .str = obj->ptr}; + if (!_addBulkStrRefToBuffer(c, (void *)&str_ref, sizeof(str_ref))) { + _addBulkStrRefToToList(c, (void *)&str_ref, sizeof(str_ref)); + } +} + /* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. @@ -776,7 +936,7 @@ void trimReplyUnusedTailSpace(client *c) { * Also, to avoid large memmove which happens as part of realloc, we only do * that if the used part is small. */ if (tail->size - tail->used > tail->size / 4 && tail->used < PROTO_REPLY_CHUNK_BYTES && - c->io_write_state != CLIENT_PENDING_IO) { + c->io_write_state != CLIENT_PENDING_IO && !tail->flag.buf_encoded) { size_t usable_size; size_t old_size = tail->size; tail = zrealloc_usable(tail, tail->used + sizeof(clientReplyBlock), &usable_size); @@ -837,8 +997,8 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { * - It has enough room already allocated * - And not too large (avoid large memmove) * - And the client is not in a pending I/O state */ - if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size - prev->used > 0 && - c->io_write_state != CLIENT_PENDING_IO) { + if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used && + c->io_write_state != CLIENT_PENDING_IO && !prev->flag.buf_encoded) { size_t len_to_copy = prev->size - prev->used; if (len_to_copy > length) len_to_copy = length; memcpy(prev->buf + prev->used, s, len_to_copy); @@ -852,7 +1012,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { } if (ln->next != NULL && (next = listNodeValue(ln->next)) && next->size - next->used >= length && - next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO) { + next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO && !next->flag.buf_encoded) { memmove(next->buf + length, next->buf, next->used); memcpy(next->buf, s, length); next->used += length; @@ -864,6 +1024,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { /* Take over the allocation's internal fragmentation */ buf->size = usable_size - sizeof(clientReplyBlock); buf->used = length; + buf->flag.buf_encoded = 0; memcpy(buf->buf, s, length); listNodeValue(ln) = buf; c->reply_bytes += buf->size; @@ -1127,8 +1288,20 @@ void addReplyBulkLen(client *c, robj *obj) { _addReplyLongLongWithPrefix(c, len, '$'); } +/* Try to avoid whole bulk string copy to a reply buffer + * If copy avoidance allowed then only pointer to object and string will be copied to the buffer */ +static int tryAvoidBulkStrCopyToReply(client *c, robj *obj) { + if (!isCopyAvoidPreferred(c, obj)) return C_ERR; + if (prepareClientToWrite(c) != C_OK) return C_ERR; + + _addBulkStrRefToBufferOrList(c, obj); + + return C_OK; +} + /* Add an Object as a bulk reply */ void addReplyBulk(client *c, robj *obj) { + if (tryAvoidBulkStrCopyToReply(c, obj) == C_OK) return; addReplyBulkLen(c, obj); addReply(c, obj); addReplyProto(c, "\r\n", 2); @@ -1299,6 +1472,7 @@ void AddReplyFromClient(client *dst, client *src) { } /* First add the static buffer (either into the static buffer or reply list) */ + serverAssert(src->flag.buf_encoded == 0); addReplyProto(dst, src->buf, src->bufpos); /* We need to check with prepareClientToWrite again (after addReplyProto) @@ -1729,6 +1903,7 @@ void freeClient(client *c) { freeClientPubSubData(c); /* Free data structures. */ + releaseReplyReferences(c); listRelease(c->reply); c->reply = NULL; zfree_with_size(c->buf, c->buf_usable_size); @@ -1966,19 +2141,198 @@ void writeToReplica(client *c) { } } +/* Bulk string reply requires 3 iov entries - + * length prefix ($\r\n), string () and suffix (\r\n) */ +#define NUM_OF_IOV_PER_BULK_STR 3 +/* Bulk string prefix max size */ +#define BULK_STR_LEN_PREFIX_MAX_SIZE (LONG_STR_SIZE + 3) + +/* This struct is used by writevToClient to prepare iovec array for submitting to connWritev */ +typedef struct replyIOV { + int iovcnt; /* number of elements in iov array */ + int iovsize; /* capacity of iov array */ + struct iovec *iov; + ssize_t iov_len_total; /* Total length of data pointed by iov array */ + size_t last_written_len; /* Length of data in the last written buffer + * partially written in previous writevToClient invocation */ + int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit + * reached during iovec array preparation */ + /* Auxiliary fields for scattering BUFSTR_REF chunks from encoded buffers */ + int prfxcnt; /* number of prefixes */ + char (*prefixes)[BULK_STR_LEN_PREFIX_MAX_SIZE]; /* bulk string prefixes */ + char *crlf; /* bulk string suffix */ +} replyIOV; + +/* The bufWriteMetadata struct is used by writevToClient to record metadata + * about scattering of reply buffer to iov array */ +typedef struct bufWriteMetadata { + char *buf; + size_t bufpos; + uint64_t data_len; /* Actual bytes out. Differs from bufpos if buffer encoded */ + int complete; /* Was the buffer completely scattered to iov or + process stopped due encountered limit */ +} bufWriteMetadata; + +static void initReplyIOV(client *c, int iovsize, struct iovec *iov_arr, char (*prefixes)[], char *crlf, replyIOV *reply) { + reply->iovcnt = 0; + reply->iovsize = iovsize; + reply->limit_reached = 0; + reply->iov = iov_arr; + reply->iov_len_total = 0; + reply->last_written_len = c->io_last_written_data_len; + reply->prfxcnt = 0; + reply->prefixes = prefixes; + reply->crlf = crlf; +} + +static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + if (reply->limit_reached) return; + + if (reply->iovcnt == reply->iovsize) { + reply->limit_reached = 1; + return; + } + + /* Aggregate data length from the beginning of the buffer even though + * part of the data can be skipped in this writevToClient invocation due to last_written_len */ + metadata->data_len += buf_len; + + /* Skip data written in the previous writevToClient invocation(s) */ + if (reply->last_written_len >= buf_len) { + reply->last_written_len -= buf_len; + return; + } + + reply->iov[reply->iovcnt].iov_base = buf + reply->last_written_len; + reply->iov[reply->iovcnt].iov_len = buf_len - reply->last_written_len; + reply->last_written_len = 0; + + reply->iov_len_total += reply->iov[reply->iovcnt++].iov_len; +} + +static void addBulkStringToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) { + bulkStrRef *str_ref = (bulkStrRef *)buf; + while (buf_len > 0 && !reply->limit_reached) { + size_t str_len = sdslen(str_ref->str); + + /* RESP encodes bulk strings as $\r\n\r\n */ + char *prefix = reply->prefixes[reply->prfxcnt]; + prefix[0] = '$'; + size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len); + prefix[num_len + 1] = '\r'; + prefix[num_len + 2] = '\n'; + + int cnt = reply->iovcnt; + addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata); + /* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */ + if (reply->iovcnt > cnt) reply->prfxcnt++; + addPlainBufferToReplyIOV(str_ref->str, str_len, reply, metadata); + addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata); + + str_ref++; + buf_len -= sizeof(bulkStrRef); + } +} + +static void addEncodedBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { + char *ptr = buf; + while (ptr < buf + bufpos && !reply->limit_reached) { + payloadHeader *header = (payloadHeader *)ptr; + ptr += sizeof(payloadHeader); + if (header->type == PLAIN_REPLY) { + addPlainBufferToReplyIOV(ptr, header->len, reply, metadata); + } else { + uint64_t data_len = metadata->data_len; + addBulkStringToReplyIOV(ptr, header->len, reply, metadata); + /* Store actual reply len for cluster slot stats */ + header->actual_len = metadata->data_len - data_len; + } + ptr += header->len; + } +} + +static void addBufferToReplyIOV(int encoded, char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) { + metadata->data_len = 0; + + if (encoded) { + addEncodedBufferToReplyIOV(buf, bufpos, reply, metadata); + metadata->complete = !reply->limit_reached; + } else { + addPlainBufferToReplyIOV(buf, bufpos, reply, metadata); + metadata->complete = 1; + } + + if (reply->iov_len_total > NET_MAX_WRITES_PER_EVENT) { + reply->limit_reached = 1; + } + + metadata->buf = buf; + metadata->bufpos = bufpos; +} + +/* + * This function calculates and stores on the client next: + * io_last_written_buf - Last buffer that has been written to the client connection + * io_last_written_bufpos - The buffer has been written until this position + * io_last_written_data_len - The actual length of the data written from this buffer + * This length differs from written bufpos in case of copy avoidance + * + * The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient + * to detect last client reply buffer that can be released + * + * The io_last_written_data_len is used by writevToClient for resuming write from the point + * where previous writevToClient invocation stopped + **/ +static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) { + int last = bufcnt - 1; + if (totwritten == totlen) { + c->io_last_written_buf = metadata[last].buf; + /* Zero io_last_written_bufpos indicates buffer written incompletely */ + c->io_last_written_bufpos = (metadata[last].complete ? metadata[last].bufpos : 0); + c->io_last_written_data_len = metadata[last].data_len; + return; + } + + last = -1; + int64_t remaining = totwritten + c->io_last_written_data_len; + while (remaining > 0) remaining -= metadata[++last].data_len; + serverAssert(last < bufcnt); + + c->io_last_written_buf = metadata[last].buf; + /* Zero io_last_written_bufpos indicates buffer written incompletely */ + c->io_last_written_bufpos = (metadata[last].complete && remaining == 0 ? metadata[last].bufpos : 0); + c->io_last_written_data_len = (size_t)(metadata[last].data_len + remaining); +} + +void proceedToUnwritten(replyIOV *reply, int nwritten) { + while (nwritten > 0) { + if ((size_t)nwritten < reply->iov[0].iov_len) { + reply->iov[0].iov_base = (char *)reply->iov[0].iov_base + nwritten; + reply->iov[0].iov_len -= nwritten; + break; + } + nwritten -= reply->iov[0].iov_len; + reply->iov++; + reply->iovcnt--; + } +} + /* This function should be called from _writeToClient when the reply list is not empty, * it gathers the scattered buffers from reply list and sends them away with connWritev. * If we write successfully, it returns C_OK, otherwise, C_ERR is returned. * Sets the c->nwritten to the number of bytes the server wrote to the client. * Can be called from the main thread or an I/O thread */ static int writevToClient(client *c) { - int iovcnt = 0; int iovmax = min(IOV_MAX, c->conn->iovcnt); struct iovec iov_arr[iovmax]; - struct iovec *iov = iov_arr; - ssize_t bufpos, iov_bytes_len = 0; - listNode *lastblock; + /* iov_arr can accommodate iovmax / NUM_OF_IOV_PER_BULK_STR full bulk string replies + * and one partial bulk reply */ + char prefixes[iovmax / NUM_OF_IOV_PER_BULK_STR + 1][BULK_STR_LEN_PREFIX_MAX_SIZE]; + char crlf[2] = {'\r', '\n'}; + int bufcnt = 0; + size_t bufpos = 0; + listNode *lastblock; if (inMainThread()) { lastblock = listLast(c->reply); bufpos = c->bufpos; @@ -1987,50 +2341,49 @@ static int writevToClient(client *c) { bufpos = lastblock ? (size_t)c->bufpos : c->io_last_bufpos; } + int reply_blocks = (lastblock ? listLength(c->reply) : 0); + /* +1 is for c->buf */ + bufWriteMetadata buf_metadata[min(reply_blocks + 1, iovmax)]; + + replyIOV reply; + initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply); + /* If the static reply buffer is not empty, * add it to the iov array for writev() as well. */ if (bufpos > 0) { - iov[iovcnt].iov_base = c->buf + c->sentlen; - iov[iovcnt].iov_len = bufpos - c->sentlen; - iov_bytes_len += iov[iovcnt++].iov_len; + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, bufpos, &reply, &buf_metadata[bufcnt++]); } - /* The first node of reply list might be incomplete from the last call, - * thus it needs to be calibrated to get the actual data address and length. */ - size_t sentlen = bufpos > 0 ? 0 : c->sentlen; - listIter iter; - listNode *next; - clientReplyBlock *o; - size_t used; - listRewind(c->reply, &iter); - while ((next = listNext(&iter)) && iovcnt < iovmax && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) { - o = listNodeValue(next); - used = o->used; - /* Use c->io_last_bufpos as the currently used portion of the block. - * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the - * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data - * that may not yet be visible to the current thread*/ - if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; + if (lastblock) { + listIter iter; + listNode *next; + listRewind(c->reply, &iter); + while ((next = listNext(&iter)) && !reply.limit_reached) { + clientReplyBlock *o = listNodeValue(next); - if (used == 0) { /* empty node, skip over it. */ - if (next == lastblock) break; - sentlen = 0; - continue; - } + size_t used = o->used; + /* Use c->io_last_bufpos as the currently used portion of the block. + * We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the + * current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data + * that may not yet be visible to the current thread*/ + if (!inMainThread() && next == lastblock) used = c->io_last_bufpos; - iov[iovcnt].iov_base = o->buf + sentlen; - iov[iovcnt].iov_len = used - sentlen; - iov_bytes_len += iov[iovcnt++].iov_len; + if (used == 0) { /* empty node, skip over it. */ + if (next == lastblock) break; + continue; + } - sentlen = 0; - if (next == lastblock) break; - } + addBufferToReplyIOV(o->flag.buf_encoded, o->buf, used, &reply, &buf_metadata[bufcnt]); + if (!buf_metadata[bufcnt].data_len) break; + bufcnt++; - serverAssert(iovcnt != 0); + if (next == lastblock) break; + } + } ssize_t totwritten = 0; while (1) { - int nwritten = connWritev(c->conn, iov, iovcnt); + int nwritten = connWritev(c->conn, reply.iov, reply.iovcnt); if (nwritten <= 0) { c->write_flags |= WRITE_FLAGS_WRITE_ERROR; totwritten = totwritten > 0 ? totwritten : nwritten; @@ -2038,7 +2391,7 @@ static int writevToClient(client *c) { } totwritten += nwritten; - if (totwritten == iov_bytes_len) break; + if (totwritten == reply.iov_len_total) break; if (totwritten > NET_MAX_WRITES_PER_EVENT) { /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT @@ -2055,20 +2408,13 @@ static int writevToClient(client *c) { } } - /* proceed to the unwritten blocks */ - while (nwritten > 0) { - if ((size_t)nwritten < iov[0].iov_len) { - iov[0].iov_base = (char *)iov[0].iov_base + nwritten; - iov[0].iov_len -= nwritten; - break; - } - nwritten -= iov[0].iov_len; - iov++; - iovcnt--; - } + proceedToUnwritten(&reply, nwritten); } c->nwritten = totwritten; + if (totwritten > 0) { + saveLastWrittenBuf(c, buf_metadata, bufcnt, reply.iov_len_total, totwritten); + } return totwritten > 0 ? C_OK : C_ERR; } @@ -2090,13 +2436,14 @@ int _writeToClient(client *c) { } /* If the reply list is not empty, use writev to save system calls and TCP packets */ - if (lastblock) return writevToClient(c); + if (lastblock || c->flag.buf_encoded) return writevToClient(c); - ssize_t bytes_to_write = bufpos - c->sentlen; + serverAssert(c->io_last_written_data_len == 0 || c->io_last_written_buf == c->buf); + ssize_t bytes_to_write = bufpos - c->io_last_written_data_len; ssize_t tot_written = 0; while (tot_written < bytes_to_write) { - int nwritten = connWrite(c->conn, c->buf + c->sentlen + tot_written, bytes_to_write - tot_written); + int nwritten = connWrite(c->conn, c->buf + c->io_last_written_data_len + tot_written, bytes_to_write - tot_written); if (nwritten <= 0) { c->write_flags |= WRITE_FLAGS_WRITE_ERROR; tot_written = tot_written > 0 ? tot_written : nwritten; @@ -2106,44 +2453,100 @@ int _writeToClient(client *c) { } c->nwritten = tot_written; + if (tot_written > 0) { + c->io_last_written_buf = c->buf; + c->io_last_written_bufpos = (tot_written == bytes_to_write ? bufpos : 0); + c->io_last_written_data_len = c->io_last_written_data_len + tot_written; + } return tot_written > 0 ? C_OK : C_ERR; } -static void _postWriteToClient(client *c) { - if (c->nwritten <= 0) return; +void resetLastWrittenBuf(client *c) { + c->io_last_written_buf = NULL; + c->io_last_written_bufpos = 0; + c->io_last_written_data_len = 0; +} + +static void releaseBufReferences(char *buf, size_t bufpos) { + char *ptr = buf; + while (ptr < buf + bufpos) { + payloadHeader *header = (payloadHeader *)ptr; + ptr += sizeof(payloadHeader); + + if (header->type == BULK_STR_REF) { + clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->actual_len); + + bulkStrRef *str_ref = (bulkStrRef *)ptr; + size_t len = header->len; + while (len > 0) { + decrRefCount(str_ref->obj); + str_ref++; + len -= sizeof(bulkStrRef); + } + } else { + serverAssert(header->type == PLAIN_REPLY); + } + + ptr += header->len; + } + serverAssert(ptr == buf + bufpos); +} + +void releaseReplyReferences(client *c) { + if (c->bufpos > 0 && c->flag.buf_encoded) { + releaseBufReferences(c->buf, c->bufpos); + } listIter iter; listNode *next; - clientReplyBlock *o; + listRewind(c->reply, &iter); + while ((next = listNext(&iter))) { + clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next); + if (o->flag.buf_encoded) { + releaseBufReferences(o->buf, o->used); + } + } +} +static void _postWriteToClient(client *c) { + if (c->nwritten <= 0) return; server.stat_net_output_bytes += c->nwritten; - /* Locate the new node which has leftover data and - * release all nodes in front of it. */ - ssize_t remaining = c->nwritten; - if (c->bufpos > 0) { /* Deal with static reply buffer first. */ - int buf_len = c->bufpos - c->sentlen; - c->sentlen += c->nwritten; - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if (c->nwritten >= buf_len) { + int last_written = 0; + if (c->bufpos > 0) { + /* Is this buffer is last written? */ + last_written = (c->buf == c->io_last_written_buf); + /* If buffer is completely written */ + if (!last_written || c->bufpos == c->io_last_written_bufpos) { + /* If encoded then release references to bulk string objects */ + if (c->flag.buf_encoded) releaseBufReferences(c->buf, c->bufpos); + /* Reset buffer metadata */ c->bufpos = 0; - c->sentlen = 0; + c->flag.buf_encoded = 0; + c->last_header = NULL; + /* If completely written buffer is last written then reset last written state */ + if (last_written) resetLastWrittenBuf(c); } - remaining -= buf_len; + if (last_written) return; } + + listIter iter; + listNode *next; listRewind(c->reply, &iter); - while (remaining > 0) { - next = listNext(&iter); - o = listNodeValue(next); - if (remaining < (ssize_t)(o->used - c->sentlen)) { - c->sentlen += remaining; - break; + while ((next = listNext(&iter))) { + clientReplyBlock *o = listNodeValue(next); + /* Is this buffer is last written? */ + last_written = (o->buf == c->io_last_written_buf); + /* If buffer is completely written */ + if (!last_written || o->used == c->io_last_written_bufpos) { + c->reply_bytes -= o->size; + /* If encoded then release references to bulk string objects */ + if (o->flag.buf_encoded) releaseBufReferences(o->buf, o->used); + listDelNode(c->reply, next); + /* If completely written buffer is last written then reset last written state */ + if (last_written) resetLastWrittenBuf(c); } - remaining -= (ssize_t)(o->used - c->sentlen); - c->reply_bytes -= o->size; - listDelNode(c->reply, next); - c->sentlen = 0; + if (last_written) return; } } @@ -2177,7 +2580,7 @@ int postWriteToClient(client *c) { if (!c->flag.primary) c->last_interaction = server.unixtime; } if (!clientHasPendingReplies(c)) { - c->sentlen = 0; + resetLastWrittenBuf(c); if (connHasWriteHandler(c->conn)) { connSetWriteHandler(c->conn, NULL); } diff --git a/src/replication.c b/src/replication.c index 447938f22b..baa62be928 100644 --- a/src/replication.c +++ b/src/replication.c @@ -889,6 +889,8 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) { * 4) Send the backlog data (from the offset to the end) to the replica. */ waitForClientIO(c); c->flag.replica = 1; + serverAssert(c->bufpos == 0); + c->flag.buf_encoded = 0; if (c->repl_data->associated_rdb_client_id && lookupRdbClientByID(c->repl_data->associated_rdb_client_id)) { c->repl_data->repl_state = REPLICA_STATE_BG_RDB_LOAD; removeReplicaFromPsyncWait(c); @@ -1155,6 +1157,8 @@ void syncCommand(client *c) { if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repl_data->repldbfd = -1; c->flag.replica = 1; + serverAssert(c->bufpos == 0); + c->flag.buf_encoded = 0; listAddNodeTail(server.replicas, c); /* Create the replication backlog if needed. */ @@ -4252,8 +4256,9 @@ void replicationCachePrimary(client *c) { server.primary->repl_data->repl_applied = 0; server.primary->repl_data->read_reploff = server.primary->repl_data->reploff; if (c->flag.multi) discardTransaction(c); + releaseReplyReferences(c); + resetLastWrittenBuf(c); listEmpty(c->reply); - c->sentlen = 0; c->reply_bytes = 0; c->bufpos = 0; resetClient(c); diff --git a/src/server.h b/src/server.h index 1d80484b3e..ee7766f882 100644 --- a/src/server.h +++ b/src/server.h @@ -785,10 +785,22 @@ char *getObjectTypeName(robj *); struct evictionPoolEntry; /* Defined in evict.c */ +typedef struct payloadHeader payloadHeader; /* Defined in networking.c */ + +typedef struct ClientReplyBlockFlags { + uint8_t buf_encoded : 1; /* True if reply block buf content is encoded (e.g. for copy avoidance) */ + uint8_t reserved : 7; +} ClientReplyBlockFlags; + /* This structure is used in order to represent the output buffer of a client, * which is actually a linked list of blocks like that, that is: client->reply. */ typedef struct clientReplyBlock { size_t size, used; + payloadHeader *last_header; + union { + uint8_t raw_flag; + ClientReplyBlockFlags flag; + }; char buf[]; } clientReplyBlock; @@ -1016,7 +1028,7 @@ typedef struct { /* General */ int saved; /* 1 if we already saved the offset (first time we call addReply*) */ /* Offset within the static reply buffer */ - int bufpos; + size_t bufpos; /* Offset within the reply block list */ struct { int index; @@ -1057,6 +1069,7 @@ typedef struct ClientFlags { uint64_t prevent_prop : 1; /* Don't propagate to AOF or replicas. */ uint64_t pending_write : 1; /* Client has output to send but a write handler is yet not installed. */ uint64_t pending_read : 1; /* Client has output to send but a write handler is yet not installed. */ + uint64_t buf_encoded : 1; /* True if c->buf content is encoded (e.g. for copy avoidance) */ uint64_t reply_off : 1; /* Don't send replies to client. */ uint64_t reply_skip_next : 1; /* Set CLIENT_REPLY_SKIP for next cmd */ uint64_t reply_skip : 1; /* Don't send just this reply. */ @@ -1109,7 +1122,7 @@ typedef struct ClientFlags { * flag, we won't cache the primary in freeClient. */ uint64_t fake : 1; /* This is a fake client without a real connection. */ uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */ - uint64_t reserved : 4; /* Reserved for future use */ + uint64_t reserved : 3; /* Reserved for future use */ } ClientFlags; typedef struct ClientPubSubData { @@ -1210,12 +1223,17 @@ typedef struct client { list *reply; /* List of reply objects to send to the client. */ listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */ size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */ + char *io_last_written_buf; /* Last buffer that has been written to the client connection + * Last buffer is either c->buf or c->reply list node (i.e. buf from a clientReplyBlock) */ + size_t io_last_written_bufpos; /* The buffer has been written until this position */ + size_t io_last_written_data_len; /* The actual length of the data written from this buffer + This length differs from written bufpos in case of reply offload */ unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ - size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ listNode clients_pending_write_node; /* list node in clients_pending_write or in clients_pending_io_write list */ - int bufpos; - int original_argc; /* Num of arguments of original command if arguments were rewritten. */ - robj **original_argv; /* Arguments of original command if arguments were rewritten. */ + size_t bufpos; + payloadHeader *last_header; /* Pointer to the last header in a buffer in reply offload mode */ + int original_argc; /* Num of arguments of original command if arguments were rewritten. */ + robj **original_argv; /* Arguments of original command if arguments were rewritten. */ /* Client flags and state indicators */ union { uint64_t raw_flag; @@ -1665,6 +1683,12 @@ struct valkeyServer { int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_assert; /* Enable debug asserts */ + /* Reply construction copy avoidance */ + int min_io_threads_copy_avoid; /* Minimum number of IO threads for copy avoidance in reply construction */ + int min_io_threads_value_prefetch_off; /* Minimum number of IO threads for disabling value prefetch */ + int min_string_size_copy_avoid_threaded; /* Minimum bulk string size for copy avoidance in reply construction when IO threads enabled */ + int min_string_size_copy_avoid; /* Minimum bulk string size for copy avoidance in reply construction when IO threads disabled */ + /* RDB / AOF loading information */ volatile sig_atomic_t loading; /* We are loading data from disk if true */ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */ @@ -2764,6 +2788,9 @@ void ioThreadWriteToClient(void *data); int canParseCommand(client *c); int processIOThreadsReadDone(void); int processIOThreadsWriteDone(void); +void releaseReplyReferences(client *c); +void resetLastWrittenBuf(client *c); + /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf); diff --git a/src/unit/test_files.h b/src/unit/test_files.h index f25e320452..8c30d7625d 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -102,6 +102,9 @@ int test_listpackBenchmarkLpCompareWithNumber(int argc, char **argv, int flags); int test_listpackBenchmarkFree(int argc, char **argv, int flags); int test_backupAndUpdateClientArgv(int argc, char **argv, int flags); int test_rewriteClientCommandArgument(int argc, char **argv, int flags); +int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags); +int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags); +int test_addBufferToReplyIOV(int argc, char **argv, int flags); int test_object_with_key(int argc, char **argv, int flags); int test_quicklistCreateList(int argc, char **argv, int flags); int test_quicklistAddToTailOfEmptyList(int argc, char **argv, int flags); @@ -236,7 +239,7 @@ unitTest __test_hashtable_c[] = {{"test_cursor", test_cursor}, {"test_set_hash_f unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEncodings}, {"test_intsetBasicAdding", test_intsetBasicAdding}, {"test_intsetLargeNumberRandomAdd", test_intsetLargeNumberRandomAdd}, {"test_intsetUpgradeFromint16Toint32", test_intsetUpgradeFromint16Toint32}, {"test_intsetUpgradeFromint16Toint64", test_intsetUpgradeFromint16Toint64}, {"test_intsetUpgradeFromint32Toint64", test_intsetUpgradeFromint32Toint64}, {"test_intsetStressLookups", test_intsetStressLookups}, {"test_intsetStressAddDelete", test_intsetStressAddDelete}, {NULL, NULL}}; unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable}, {NULL, NULL}}; unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}}; -unitTest __test_networking_c[] = {{"test_backupAndUpdateClientArgv", test_backupAndUpdateClientArgv}, {"test_rewriteClientCommandArgument", test_rewriteClientCommandArgument}, {NULL, NULL}}; +unitTest __test_networking_c[] = {{"test_backupAndUpdateClientArgv", test_backupAndUpdateClientArgv}, {"test_rewriteClientCommandArgument", test_rewriteClientCommandArgument}, {"test_addRepliesWithOffloadsToBuffer", test_addRepliesWithOffloadsToBuffer}, {"test_addRepliesWithOffloadsToList", test_addRepliesWithOffloadsToList}, {"test_addBufferToReplyIOV", test_addBufferToReplyIOV}, {NULL, NULL}}; unitTest __test_object_c[] = {{"test_object_with_key", test_object_with_key}, {NULL, NULL}}; unitTest __test_quicklist_c[] = {{"test_quicklistCreateList", test_quicklistCreateList}, {"test_quicklistAddToTailOfEmptyList", test_quicklistAddToTailOfEmptyList}, {"test_quicklistAddToHeadOfEmptyList", test_quicklistAddToHeadOfEmptyList}, {"test_quicklistAddToTail5xAtCompress", test_quicklistAddToTail5xAtCompress}, {"test_quicklistAddToHead5xAtCompress", test_quicklistAddToHead5xAtCompress}, {"test_quicklistAddToTail500xAtCompress", test_quicklistAddToTail500xAtCompress}, {"test_quicklistAddToHead500xAtCompress", test_quicklistAddToHead500xAtCompress}, {"test_quicklistRotateEmpty", test_quicklistRotateEmpty}, {"test_quicklistComprassionPlainNode", test_quicklistComprassionPlainNode}, {"test_quicklistNextPlainNode", test_quicklistNextPlainNode}, {"test_quicklistRotatePlainNode", test_quicklistRotatePlainNode}, {"test_quicklistRotateOneValOnce", test_quicklistRotateOneValOnce}, {"test_quicklistRotate500Val5000TimesAtCompress", test_quicklistRotate500Val5000TimesAtCompress}, {"test_quicklistPopEmpty", test_quicklistPopEmpty}, {"test_quicklistPop1StringFrom1", test_quicklistPop1StringFrom1}, {"test_quicklistPopHead1NumberFrom1", test_quicklistPopHead1NumberFrom1}, {"test_quicklistPopHead500From500", test_quicklistPopHead500From500}, {"test_quicklistPopHead5000From500", test_quicklistPopHead5000From500}, {"test_quicklistIterateForwardOver500List", test_quicklistIterateForwardOver500List}, {"test_quicklistIterateReverseOver500List", test_quicklistIterateReverseOver500List}, {"test_quicklistInsertAfter1Element", test_quicklistInsertAfter1Element}, {"test_quicklistInsertBefore1Element", test_quicklistInsertBefore1Element}, {"test_quicklistInsertHeadWhileHeadNodeIsFull", test_quicklistInsertHeadWhileHeadNodeIsFull}, {"test_quicklistInsertTailWhileTailNodeIsFull", test_quicklistInsertTailWhileTailNodeIsFull}, {"test_quicklistInsertOnceInElementsWhileIteratingAtCompress", test_quicklistInsertOnceInElementsWhileIteratingAtCompress}, {"test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistDuplicateEmptyList", test_quicklistDuplicateEmptyList}, {"test_quicklistDuplicateListOf1Element", test_quicklistDuplicateListOf1Element}, {"test_quicklistDuplicateListOf500", test_quicklistDuplicateListOf500}, {"test_quicklistIndex1200From500ListAtFill", test_quicklistIndex1200From500ListAtFill}, {"test_quicklistIndex12From500ListAtFill", test_quicklistIndex12From500ListAtFill}, {"test_quicklistIndex100From500ListAtFill", test_quicklistIndex100From500ListAtFill}, {"test_quicklistIndexTooBig1From50ListAtFill", test_quicklistIndexTooBig1From50ListAtFill}, {"test_quicklistDeleteRangeEmptyList", test_quicklistDeleteRangeEmptyList}, {"test_quicklistDeleteRangeOfEntireNodeInListOfOneNode", test_quicklistDeleteRangeOfEntireNodeInListOfOneNode}, {"test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts", test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts}, {"test_quicklistDeleteMiddle100Of500List", test_quicklistDeleteMiddle100Of500List}, {"test_quicklistDeleteLessThanFillButAcrossNodes", test_quicklistDeleteLessThanFillButAcrossNodes}, {"test_quicklistDeleteNegative1From500List", test_quicklistDeleteNegative1From500List}, {"test_quicklistDeleteNegative1From500ListWithOverflowCounts", test_quicklistDeleteNegative1From500ListWithOverflowCounts}, {"test_quicklistDeleteNegative100From500List", test_quicklistDeleteNegative100From500List}, {"test_quicklistDelete10Count5From50List", test_quicklistDelete10Count5From50List}, {"test_quicklistNumbersOnlyListRead", test_quicklistNumbersOnlyListRead}, {"test_quicklistNumbersLargerListRead", test_quicklistNumbersLargerListRead}, {"test_quicklistNumbersLargerListReadB", test_quicklistNumbersLargerListReadB}, {"test_quicklistLremTestAtCompress", test_quicklistLremTestAtCompress}, {"test_quicklistIterateReverseDeleteAtCompress", test_quicklistIterateReverseDeleteAtCompress}, {"test_quicklistIteratorAtIndexTestAtCompress", test_quicklistIteratorAtIndexTestAtCompress}, {"test_quicklistLtrimTestAAtCompress", test_quicklistLtrimTestAAtCompress}, {"test_quicklistLtrimTestBAtCompress", test_quicklistLtrimTestBAtCompress}, {"test_quicklistLtrimTestCAtCompress", test_quicklistLtrimTestCAtCompress}, {"test_quicklistLtrimTestDAtCompress", test_quicklistLtrimTestDAtCompress}, {"test_quicklistVerifySpecificCompressionOfInteriorNodes", test_quicklistVerifySpecificCompressionOfInteriorNodes}, {"test_quicklistBookmarkGetUpdatedToNextItem", test_quicklistBookmarkGetUpdatedToNextItem}, {"test_quicklistBookmarkLimit", test_quicklistBookmarkLimit}, {"test_quicklistCompressAndDecompressQuicklistListpackNode", test_quicklistCompressAndDecompressQuicklistListpackNode}, {"test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX", test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRandomWalk", test_raxRandomWalk}, {"test_raxIteratorUnitTests", test_raxIteratorUnitTests}, {"test_raxTryInsertUnitTests", test_raxTryInsertUnitTests}, {"test_raxRegressionTest1", test_raxRegressionTest1}, {"test_raxRegressionTest2", test_raxRegressionTest2}, {"test_raxRegressionTest3", test_raxRegressionTest3}, {"test_raxRegressionTest4", test_raxRegressionTest4}, {"test_raxRegressionTest5", test_raxRegressionTest5}, {"test_raxRegressionTest6", test_raxRegressionTest6}, {"test_raxBenchmark", test_raxBenchmark}, {"test_raxHugeKey", test_raxHugeKey}, {"test_raxFuzz", test_raxFuzz}, {NULL, NULL}}; diff --git a/src/unit/test_networking.c b/src/unit/test_networking.c index 566583bcc5..75a57d19c4 100644 --- a/src/unit/test_networking.c +++ b/src/unit/test_networking.c @@ -129,3 +129,260 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) { return 0; } + +static client *createTestClient(void) { + client *c = zcalloc(sizeof(client)); + + c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size); + c->reply = listCreate(); + listSetFreeMethod(c->reply, freeClientReplyValue); + listSetDupMethod(c->reply, dupClientReplyValue); + /* dummy connection to bypass assert in closeClientOnOutputBufferLimitReached */ + c->conn = (connection *)c; + + return c; +} + +static void freeReplyOffloadClient(client *c) { + listRelease(c->reply); + zfree(c->buf); + zfree(c); +} + +/* Each bulk offload puts 2 pointers to a reply buffer */ +#define PTRS_LEN (sizeof(void *) * 2) + +int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + client *c = createTestClient(); + + /* Test 1: Add bulk offloads to the buffer */ + robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); + _addBulkStrRefToBufferOrList(c, obj); + + TEST_ASSERT(obj->refcount == 2); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + PTRS_LEN); + + payloadHeader *header1 = c->last_header; + TEST_ASSERT(header1->type == BULK_STR_REF); + TEST_ASSERT(header1->len == PTRS_LEN); + + robj **ptr = (robj **)(c->buf + sizeof(payloadHeader)); + TEST_ASSERT(obj == *ptr); + + robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2")); + _addBulkStrRefToBufferOrList(c, obj2); + + /* 2 offloads expected in c->buf */ + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == BULK_STR_REF); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); + + ptr = (robj **)(c->buf + sizeof(payloadHeader) + PTRS_LEN); + TEST_ASSERT(obj2 == *ptr); + + /* Test 2: Add plain reply to the buffer */ + const char *plain = "+OK\r\n"; + size_t plain_len = strlen(plain); + _addReplyToBufferOrList(c, plain, plain_len); + + /* 2 offloads and plain reply expected in c->buf. So 2 headers expected as well */ + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + plain_len); + TEST_ASSERT(header1->type == BULK_STR_REF); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); + payloadHeader *header2 = c->last_header; + TEST_ASSERT(header2->type == PLAIN_REPLY); + TEST_ASSERT(header2->len == plain_len); + + /* Add more plain replies. Check same plain reply header updated properly */ + for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len); + TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + 10 * plain_len); + TEST_ASSERT(header2->type == PLAIN_REPLY); + TEST_ASSERT(header2->len == plain_len * 10); + + /* Test 3: Add one more bulk offload to the buffer */ + _addBulkStrRefToBufferOrList(c, obj); + TEST_ASSERT(obj->refcount == 3); + TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * PTRS_LEN + 10 * plain_len); + payloadHeader *header3 = c->last_header; + TEST_ASSERT(header3->type == BULK_STR_REF); + ptr = (robj **)((char *)c->last_header + sizeof(payloadHeader)); + TEST_ASSERT(obj == *ptr); + + releaseReplyReferences(c); + decrRefCount(obj); + decrRefCount(obj2); + + freeReplyOffloadClient(c); + + return 0; +} + +int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + /* Required for isCopyAvoidPreferred / isCopyAvoidIndicatedByIOThreads */ + int io_threads_num = server.io_threads_num; + int min_io_threads_for_copy_avoid = server.min_io_threads_copy_avoid; + server.io_threads_num = 1; + server.min_io_threads_copy_avoid = 1; + + client *c = createTestClient(); + + /* Test 1: Add bulk offloads to the reply list */ + + /* Select reply length so that there is place for 2 headers and 4 bytes only + * 4 bytes is not enough for object pointer(s) + * This will force bulk offload to be added to reply list + */ + size_t reply_len = c->buf_usable_size - 2 * sizeof(payloadHeader) - 4; + char *reply = zmalloc(reply_len); + memset(reply, 'a', reply_len); + _addReplyToBufferOrList(c, reply, reply_len); + TEST_ASSERT(c->flag.buf_encoded); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); + TEST_ASSERT(listLength(c->reply) == 0); + + /* As bulk offload header+pointer can't be accommodated in c->buf + * then one block is expected in c->reply */ + robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test")); + _addBulkStrRefToBufferOrList(c, obj); + TEST_ASSERT(obj->refcount == 2); + TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len); + TEST_ASSERT(listLength(c->reply) == 1); + + /* Check bulk offload header+pointer inside c->reply */ + listIter iter; + listRewind(c->reply, &iter); + listNode *next = listNext(&iter); + clientReplyBlock *blk = listNodeValue(next); + + TEST_ASSERT(blk->used == sizeof(payloadHeader) + PTRS_LEN); + payloadHeader *header1 = blk->last_header; + TEST_ASSERT(header1->type == BULK_STR_REF); + TEST_ASSERT(header1->len == PTRS_LEN); + + robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader)); + TEST_ASSERT(obj == *ptr); + + /* Test 2: Add one more bulk offload to the reply list */ + _addBulkStrRefToBufferOrList(c, obj); + TEST_ASSERT(obj->refcount == 3); + TEST_ASSERT(listLength(c->reply) == 1); + TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * PTRS_LEN); + TEST_ASSERT(header1->type == BULK_STR_REF); + TEST_ASSERT(header1->len == 2 * PTRS_LEN); + + /* Test 3: Add plain replies to cause reply list grow */ + while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len); + _addReplyToBufferOrList(c, reply, reply_len); + + TEST_ASSERT(listLength(c->reply) == 2); + /* last header in 1st block */ + payloadHeader *header2 = blk->last_header; + listRewind(c->reply, &iter); + listNext(&iter); + next = listNext(&iter); + clientReplyBlock *blk2 = listNodeValue(next); + /* last header in 2nd block */ + payloadHeader *header3 = blk2->last_header; + TEST_ASSERT(header2->type == PLAIN_REPLY && header3->type == PLAIN_REPLY); + TEST_ASSERT((header2->len + header3->len) % reply_len == 0); + + releaseReplyReferences(c); + decrRefCount(obj); + + zfree(reply); + + freeReplyOffloadClient(c); + + /* Restore modified values */ + server.io_threads_num = io_threads_num; + server.min_io_threads_copy_avoid = min_io_threads_for_copy_avoid; + + return 0; +} + +int test_addBufferToReplyIOV(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + const char *expected_reply = "$5\r\nhello\r\n"; + ssize_t total_len = strlen(expected_reply); + const int iovmax = 16; + char crlf[2] = {'\r', '\n'}; + + /* Test 1: 1st writevToclient invocation */ + client *c = createTestClient(); + robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "hello")); + _addBulkStrRefToBufferOrList(c, obj); + + struct iovec iov_arr[iovmax]; + char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3]; + bufWriteMetadata metadata[1]; + + replyIOV reply; + initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply, &metadata[0]); + + TEST_ASSERT(reply.iov_len_total == total_len); + TEST_ASSERT(reply.iovcnt == 3); + const char *ptr = expected_reply; + for (int i = 0; i < reply.iovcnt; ++i) { + TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0); + ptr += reply.iov[i].iov_len; + } + + /* Test 2: Last written buf/pos/data_len after 1st invocation */ + saveLastWrittenBuf(c, metadata, 1, reply.iov_len_total, 1); /* only 1 byte has been written */ + TEST_ASSERT(c->io_last_written_buf == c->buf); + TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ + TEST_ASSERT(c->io_last_written_data_len == 1); + + /* Test 3: 2nd writevToclient invocation */ + struct iovec iov_arr2[iovmax]; + char prefixes2[iovmax / 3 + 1][LONG_STR_SIZE + 3]; + bufWriteMetadata metadata2[1]; + + replyIOV reply2; + initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply2, &metadata2[0]); + TEST_ASSERT(reply2.iov_len_total == total_len - 1); + TEST_ASSERT((*(char *)reply2.iov[0].iov_base) == '5'); + + /* Test 4: Last written buf/pos/data_len after 2nd invocation */ + saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */ + TEST_ASSERT(c->io_last_written_buf == c->buf); + TEST_ASSERT(c->io_last_written_bufpos == 0); /* incomplete write */ + TEST_ASSERT(c->io_last_written_data_len == 5); /* 1 + 4 */ + + /* Test 5: 3rd writevToclient invocation */ + struct iovec iov_arr3[iovmax]; + char prefixes3[iovmax / 3 + 1][LONG_STR_SIZE + 3]; + bufWriteMetadata metadata3[1]; + + replyIOV reply3; + initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3); + addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply3, &metadata3[0]); + TEST_ASSERT(reply3.iov_len_total == total_len - 5); + TEST_ASSERT((*(char *)reply3.iov[0].iov_base) == 'e'); + + /* Test 6: Last written buf/pos/data_len after 3rd invocation */ + saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */ + TEST_ASSERT(c->io_last_written_buf == c->buf); + TEST_ASSERT(c->io_last_written_bufpos == c->bufpos); + TEST_ASSERT(c->io_last_written_data_len == (size_t)total_len); + + decrRefCount(obj); + decrRefCount(obj); + + freeReplyOffloadClient(c); + + return 0; +} diff --git a/src/unit/test_ziplist.c b/src/unit/test_ziplist.c index 58687d81fc..e8488d4782 100644 --- a/src/unit/test_ziplist.c +++ b/src/unit/test_ziplist.c @@ -639,7 +639,7 @@ int test_ziplistStressWithRandomPayloadsOfDifferentEncoding(int argc, char **arg /* Hold temp vars from ziplist */ unsigned char *sstr; unsigned int slen; - long long sval; + long long sval = 0; iteration = accurate ? 20000 : 20; for (i = 0; i < iteration; i++) { diff --git a/tests/instances.tcl b/tests/instances.tcl index 5cc96b0edb..42f0385507 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -111,6 +111,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} { if {$::io_threads} { puts $cfg "io-threads 2" puts $cfg "events-per-io-thread 0" + puts $cfg "min-io-threads-avoid-copy-reply 2" } if {$::log_req_res} { diff --git a/tests/support/server.tcl b/tests/support/server.tcl index c081e51338..53c5667b29 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -534,6 +534,7 @@ proc start_server {options {code undefined}} { if {$::io_threads} { dict set config "io-threads" 2 dict set config "events-per-io-thread" 0 + dict set config "min-io-threads-avoid-copy-reply" 2 } foreach line $data { diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index ceeb20f7b6..451017e3e7 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -52,6 +52,10 @@ proc kb {v} { start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 test "client evicted due to large argv" { r flushdb @@ -332,6 +336,10 @@ start_server {} { set obuf_limit [mb 3] r config set maxmemory-clients $maxmemory_clients r config set client-output-buffer-limit "normal $obuf_limit 0 0" + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 test "avoid client eviction when client is freed by output buffer limit" { r flushdb @@ -385,13 +393,17 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + test "decrease maxmemory-clients causes client eviction" { set maxmemory_clients [mb 4] set client_count 10 set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count] r config set maxmemory-clients $maxmemory_clients - # Make multiple clients consume together roughly 1mb less than maxmemory_clients set rrs {} for {set j 0} {$j < $client_count} {incr j} { @@ -426,6 +438,11 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + test "evict clients only until below limit" { set client_count 10 set client_mem [mb 1] @@ -434,6 +451,7 @@ start_server {} { r client setname control r client no-evict on + # Make multiple clients consume together roughly 1mb less than maxmemory_clients set total_client_mem 0 set max_client_mem 0 @@ -488,6 +506,11 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucketing works @@ -555,6 +578,11 @@ start_server {} { } start_server {} { + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + foreach type {"client no-evict" "maxmemory-clients disabled"} { r flushall r client no-evict on diff --git a/tests/unit/cluster/slot-stats.tcl b/tests/unit/cluster/slot-stats.tcl index 99f9c1c03a..2034a51d97 100644 --- a/tests/unit/cluster/slot-stats.tcl +++ b/tests/unit/cluster/slot-stats.tcl @@ -524,9 +524,12 @@ start_cluster 1 0 {tags {external:skip cluster}} { R 0 SET $key value # +OK\r\n --> 5 bytes + R 0 GET $key + # $3\r\nvalue\r\n -> 11 bytes + set expected_slot_stats [ dict create $key_slot [ - dict create network-bytes-out 5 + dict create network-bytes-out 16 ] ] set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] diff --git a/tests/unit/commandlog.tcl b/tests/unit/commandlog.tcl index 39038d7950..846903acea 100644 --- a/tests/unit/commandlog.tcl +++ b/tests/unit/commandlog.tcl @@ -27,12 +27,18 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal [r commandlog len large-request] 1 # for large-reply + set copy_avoid [lindex [r config get min-io-threads-avoid-copy-reply] 1] + r config set min-io-threads-avoid-copy-reply 0 + r config set commandlog-reply-larger-than 1024 r ping assert_equal [r commandlog len large-reply] 0 r get testkey assert_equal [r commandlog len large-reply] 1 - } {} {needs:debug} + + # Restore min-io-threads-avoid-copy-reply value + r config set min-io-threads-avoid-copy-reply $copy_avoid + } {OK} {needs:debug} test {COMMANDLOG - zero max length is correctly handled} { r commandlog reset slow @@ -120,6 +126,9 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal {foobar} [lindex $e 5] # for large-reply + set copy_avoid [lindex [r config get min-io-threads-avoid-copy-reply] 1] + r config set min-io-threads-avoid-copy-reply 0 + r get testkey set e [lindex [r commandlog get -1 large-reply] 0] assert_equal [llength $e] 6 @@ -129,7 +138,10 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10 assert_equal [expr {[lindex $e 2] > 1024}] 1 assert_equal [lindex $e 3] {get testkey} assert_equal {foobar} [lindex $e 5] - } {} {needs:debug} + + # Restore min-io-threads-avoid-copy-reply value + r config set min-io-threads-avoid-copy-reply $copy_avoid + } {OK} {needs:debug} test {COMMANDLOG slow - Certain commands are omitted that contain sensitive information} { r config set commandlog-slow-execution-max-len 100 diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 4a638cac80..cba8752a3e 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -384,6 +384,14 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { } test {stats: client input and output buffer limit disconnections} { + # Disable copy avoidance + set min_threads [lindex [r config get min-io-threads-avoid-copy-reply] 1] + set min_size [lindex [r config get min-string-size-avoid-copy-reply] 1] + set min_size_threaded [lindex [r config get min-string-size-avoid-copy-reply-threaded] 1] + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + r config resetstat set info [r info stats] assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0} @@ -407,6 +415,12 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} { r set key [string repeat a 100000] ;# to trigger output buffer limit check this needs to be big catch {r get key} r config set client-output-buffer-limit $org_outbuf_limit + + # Restore copy avoidance configs + r config set min-io-threads-avoid-copy-reply $min_threads + r config set min-string-size-avoid-copy-reply $min_size + r config set min-string-size-avoid-copy-reply-threaded $min_size_threaded + set info [r info stats] assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1} } {} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 5b76f44645..f0a6829894 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -1,7 +1,11 @@ -start_server {tags {"maxmemory" "external:skip"}} { +start_server {tags {"maxmemory external:skip"}} { r config set maxmemory 11mb r config set maxmemory-policy allkeys-lru set server_pid [s process_id] + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 proc init_test {client_eviction} { r flushdb sync diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index b0fd184afe..8b819ab0ab 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,4 +1,7 @@ start_server {tags {"obuf-limits external:skip logreqres:skip"}} { + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + test {CONFIG SET client-output-buffer-limit} { set oldval [lindex [r config get client-output-buffer-limit] 1] diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl index ae3b914ea6..026ffc04be 100644 --- a/tests/unit/replybufsize.tcl +++ b/tests/unit/replybufsize.tcl @@ -9,7 +9,11 @@ proc get_reply_buffer_size {cname} { } start_server {tags {"replybufsize"}} { - + # Disable copy avoidance + r config set min-io-threads-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply 0 + r config set min-string-size-avoid-copy-reply-threaded 0 + test {verify reply buffer limits} { # In order to reduce test time we can set the peak reset time very low r debug replybuffer peak-reset-time 100