Skip to content

Commit

Permalink
Add cpu-usec metric support under CLUSTER SLOT-STATS command (valkey-…
Browse files Browse the repository at this point in the history
…io#20). (valkey-io#712)

The metric tracks cpu time in micro-seconds, sharing the same value as
`INFO COMMANDSTATS`, aggregated under per-slot context.

---------

Signed-off-by: Kyle Kim <[email protected]>
Signed-off-by: Madelyn Olson <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
  • Loading branch information
2 people authored and hwware committed Jul 25, 2024
1 parent b6cb852 commit f2e963f
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 29 deletions.
4 changes: 3 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
#include "slowlog.h"
#include "latency.h"
#include "monotonic.h"
#include "cluster_slot_stats.h"

/* forward declarations */
static void unblockClientWaitingData(client *c);
Expand Down Expand Up @@ -101,10 +102,11 @@ void blockClient(client *c, int btype) {
* he will attempt to reprocess the command which will update the statistics.
* However in case the client was timed out or in case of module blocked client is being unblocked
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slowlog and monitors.*/
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.*/
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
Expand Down
10 changes: 10 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#include <ctype.h>

Expand Down Expand Up @@ -1461,3 +1462,12 @@ void readwriteCommand(client *c) {
c->flag.readonly = 0;
addReply(c, shared.ok);
}

/* Resets transient cluster stats that we expose via INFO or other means that we want
* to reset via CONFIG RESETSTAT. The function is also used in order to
* initialize these fields in clusterInit() at server startup. */
void resetClusterStats(void) {
if (!server.cluster_enabled) return;

clusterSlotStatResetAll();
}
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
void resetClusterStats(void);
#endif /* __CLUSTER_H */
4 changes: 4 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "endianconv.h"
#include "connection.h"

Expand Down Expand Up @@ -1107,6 +1108,7 @@ void clusterInit(void) {
clusterUpdateMyselfClientIpV6();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
resetClusterStats();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4993,6 +4995,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
clusterNodeSetSlotBit(n, slot);
server.cluster->slots[slot] = n;
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand All @@ -5011,6 +5014,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand Down
7 changes: 7 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ struct _clusterNode {
Update with updateAndCountChangedNodeHealth(). */
};

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
} slotStat;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -376,6 +381,8 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
slotStat slot_stats[CLUSTER_SLOTS];
};


Expand Down
64 changes: 56 additions & 8 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
} slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand Down Expand Up @@ -47,6 +43,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
}
return slot_stat;
}
Expand Down Expand Up @@ -88,9 +86,17 @@ static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? SLOT_STAT_COUNT
: 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));

/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
}
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -121,6 +127,46 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
addReplySortedSlotStats(c, slot_stats, limit);
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
memset(&server.cluster->slot_stats[slot], 0, sizeof(slotStat));
}

void clusterSlotStatResetAll(void) {
memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}

/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
* This is due to their unique callstack, where the c->duration for
* EXEC, EVAL and FCALL already includes all of its nested commands.
* Meaning, the accumulation of cpu-usec for these nested commands
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
}

void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].cpu_usec += duration;
}

/* For cross-slot scripting, its caller client's slot must be invalidated,
* such that its slot-stats aggregation is bypassed. */
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;

ctx->original_client->slot = -1;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down Expand Up @@ -149,8 +195,10 @@ void clusterSlotStatsCommand(client *c) {
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
Expand Down
9 changes: 9 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "server.h"
#include "cluster.h"
#include "script.h"
#include "cluster_legacy.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"properties": {
"key-count": {
"type": "integer"
},
"cpu-usec": {
"type": "integer"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down Expand Up @@ -3419,6 +3420,7 @@ void configHelpCommand(client *c) {

void configResetStatCommand(client *c) {
resetServerStats();
resetClusterStats();
resetCommandTableStats(server.commands);
resetErrorTableStats();
addReply(c, shared.ok);
Expand Down
2 changes: 2 additions & 0 deletions src/script.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "script.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

scriptFlag scripts_flags_def[] = {
{.flag = SCRIPT_FLAG_NO_WRITES, .str = "no-writes"},
Expand Down Expand Up @@ -583,6 +584,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
}
call(c, call_flags);
serverAssert(c->flag.blocked == 0);
clusterSlotStatsInvalidateSlotIfApplicable(run_ctx);
return;

error:
Expand Down
6 changes: 4 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
Expand Down Expand Up @@ -3538,13 +3539,14 @@ void call(client *c, int flags) {
* If the client is blocked we will handle slowlog when it is unblocked. */
if (!c->flag.blocked) freeClientOriginalArgv(c);

/* populate the per-command statistics that we show in INFO commandstats.
* If the client is blocked we will handle latency stats and duration when it is unblocked. */
/* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS,
* respectively. If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !c->flag.blocked) {
real_cmd->calls++;
real_cmd->microseconds += c->duration;
if (server.latency_tracking_enabled && !c->flag.blocked)
updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration * 1000);
clusterSlotStatsAddCpuDuration(c, c->duration);
}

/* The duration needs to be reset after each call except for a blocked command,
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,7 @@ struct valkeyServer {
* dropping packets of a specific type */
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
* the cluster after it is forgotten with CLUSTER FORGET. */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
Expand Down
Loading

0 comments on commit f2e963f

Please sign in to comment.