Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy Avoidance #1457

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,23 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
/* Accumulates egress bytes for the slot. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
if (!clusterSlotStatsEnabled(slot)) return;

serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
Expand All @@ -174,24 +175,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
alexander-shabanov marked this conversation as resolved.
Show resolved Hide resolved
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
if (!clusterSlotStatsEnabled(slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
Expand Down Expand Up @@ -219,9 +210,7 @@ void clusterSlotStatResetAll(void) {
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
c->slot != -1 && /* Command should be slot specific. */
return clusterSlotStatsEnabled(c->slot) &&
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
Expand All @@ -248,8 +237,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
Expand Down Expand Up @@ -343,3 +331,7 @@ void clusterSlotStatsCommand(client *c) {
addReplySubcommandSyntaxError(c);
}
}

int clusterSlotStatsEnabled(int slot) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1;
}
2 changes: 2 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
int clusterSlotStatsEnabled(int slot);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
Expand All @@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
Expand Down
4 changes: 4 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3256,6 +3256,10 @@ standardConfig static_configs[] = {
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-io-threads-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-io-threads-value-prefetch-off", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_value_prefetch_off, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
Expand Down
6 changes: 5 additions & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,13 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
/* If reply offload enabled force new header */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should indeed check if reply offload is enabled before writing NULL to the header? Checking a global is cheaper than write access to the block

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, with or without reply offload, main thread just finished to write to this block

block->last_header = NULL;
alexander-shabanov marked this conversation as resolved.
Show resolved Hide resolved
} else {
c->io_last_bufpos = (size_t)c->bufpos;
c->last_header = NULL;
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

Expand Down
5 changes: 5 additions & 0 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "memory_prefetch.h"
#include "server.h"
#include "io_threads.h"
alexander-shabanov marked this conversation as resolved.
Show resolved Hide resolved

typedef enum {
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
Expand Down Expand Up @@ -119,6 +120,10 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
} else if (server.io_threads_num >= server.min_io_threads_value_prefetch_off) {
/* Copy avoidance should be more efficient without value prefetch
* starting certain number of I/O threads */
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
Expand Down
Loading