Skip to content

Commit

Permalink
Minor revision.
Browse files Browse the repository at this point in the history
- Added integration test cases for functions.
- Fixed existing integration test cases.
- Added comments for bypassing EXEC, EVAL and FCALL calculations.
- Moved slot_stats array under clusterState.

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 15, 2024
1 parent c7f1ae3 commit 9659a40
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 45 deletions.
3 changes: 1 addition & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ void blockClient(client *c, int btype) {
* he will attempt to reprocess the command which will update the statistics.
* However in case the client was timed out or in case of module blocked client is being unblocked
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slowlog and monitors.*/
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.*/
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
/* Populate per-slot statistics for cpu time. */
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->lastcmd->calls++;
c->commands_processed++;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
clusterSlotStatsReset();
clusterSlotStatResetAll();
}

void clusterInitLast(void) {
Expand Down
7 changes: 7 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ struct _clusterNode {
Update with updateAndCountChangedNodeHealth(). */
};

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
} slotStat;

struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
Expand Down Expand Up @@ -362,6 +367,8 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
slotStat slot_stats[CLUSTER_SLOTS];
};


Expand Down
32 changes: 16 additions & 16 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
} slotStat;

/* Struct used for storing slot statistics, for all slots owned by the current shard. */
struct slotStat cluster_slot_stats[CLUSTER_SLOTS];

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -56,7 +48,7 @@ static uint64_t getSlotStat(int slot, int stat_type) {
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == CPU_USEC) {
slot_stat = cluster_slot_stats[slot].cpu_usec;
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
}
return slot_stat;
}
Expand Down Expand Up @@ -94,7 +86,7 @@ static void addReplySlotStat(client *c, int slot) {
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, cluster_slot_stats[slot].cpu_usec);
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -128,23 +120,31 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
memset(&cluster_slot_stats[slot], 0, sizeof(slotStat));
memset(&server.cluster->slot_stats[slot], 0, sizeof(slotStat));
}

void clusterSlotStatsReset(void) {
memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats));
void clusterSlotStatResetAll(void) {
if (server.cluster == NULL) return;

memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}

/* For cpu-usec accumulation, EXEC, EVAl and FCALL commands are skipped.
* This is due to their unique callstack, where the c->duration for
* EXEC, EVAL and FCALL already includes all of its nested commands.
* Meaning, the accumulation of cpu-usec for these wrapper commands
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_enabled && c->slot != -1 && c->cmd->proc != execCommand && c->cmd->proc != fcallCommand &&
c->cmd->proc != evalCommand;
return server.cluster_enabled && c->slot != -1 && c->cmd->proc != execCommand && c->cmd->proc != evalCommand &&
c->cmd->proc != fcallCommand;
}

void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
cluster_slot_stats[c->slot].cpu_usec += duration;
server.cluster->slot_stats[c->slot].cpu_usec += duration;
}

void clusterSlotStatsCommand(client *c) {
Expand Down
9 changes: 2 additions & 7 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatsReset(void);
void clusterSlotStatResetAll(void);
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2500,7 +2500,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatsReset();
clusterSlotStatResetAll();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down Expand Up @@ -3516,7 +3516,7 @@ void call(client *c, int flags) {
* If the client is blocked we will handle slowlog when it is unblocked. */
if (!(c->flags & CLIENT_BLOCKED)) freeClientOriginalArgv(c);

/* populate the per-command statistics that we show in INFO commandstats.
/* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS, respectively.
* If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) {
real_cmd->calls++;
Expand Down
120 changes: 103 additions & 17 deletions tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ proc initialize_expected_slots_dict_with_range {start_slot end_slot} {
return $expected_slots
}

proc assert_empty_slot_stats {slot_stats} {
proc assert_empty_slot_stats {slot_stats metrics_to_assert} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
dict for {metric value} $stats {
assert {$value == 0}
foreach metric_name $metrics_to_assert {
set metric_value [dict get $stats $metric_name]
assert {$metric_value == 0}
}
}
}
Expand All @@ -64,6 +65,20 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_
}
}

proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 metrics_to_assert} {
set slot_stats_1 [convert_array_into_dict $slot_stats_1]
set slot_stats_2 [convert_array_into_dict $slot_stats_2]
assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}

dict for {slot stats_1} $slot_stats_1 {
assert {[dict exists $slot_stats_2 $slot]}
set stats_2 [dict get $slot_stats_2 $slot]
foreach metric_name $metrics_to_assert {
assert {[dict get $stats_1 $metric_name] == [dict get $stats_2 $metric_name]}
}
}
}

proc assert_all_slots_have_been_seen {expected_slots} {
dict for {k v} $expected_slots {
assert {$v == 1}
Expand Down Expand Up @@ -139,20 +154,34 @@ start_cluster 1 0 {tags {external:skip cluster}} {
set key_secondary_slot [R 0 cluster keyslot $key_secondary]
set metrics_to_assert [list cpu-usec]

test "CLUSTER SLOT-STATS cpu-usec reset." {
test "CLUSTER SLOT-STATS cpu-usec reset upon CONFIG RESETSTAT." {
R 0 SET $key VALUE
R 0 DEL $key
R 0 CONFIG RESETSTAT
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec reset upon slot migration." {
R 0 SET $key VALUE

R 0 CLUSTER DELSLOTS $key_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert

R 0 CLUSTER ADDSLOTS $key_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." {
R 0 INFO
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
Expand All @@ -178,7 +207,7 @@ start_cluster 1 0 {tags {external:skip cluster}} {
wait_for_blocked_clients_count 1
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
# When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS.
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert

# Unblocking command.
R 0 LPUSH $key value
Expand Down Expand Up @@ -229,7 +258,7 @@ start_cluster 1 0 {tags {external:skip cluster}} {

# CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert

# Execute transaction, and assert that all nested command times have been accumulated.
$r1 EXEC
Expand Down Expand Up @@ -285,10 +314,61 @@ start_cluster 1 0 {tags {external:skip cluster}} {
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for functions, without cross-slot keys." {
set function_str [format "#!lua name=f1
server.register_function{
function_name='f1',
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end
}" $key $key]
r function load replace $function_str
r fcall f1 0

set set_usec [get_cmdstat_usec set r]
set get_usec [get_cmdstat_usec get r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]

set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec [expr $get_usec + $set_usec]
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS cpu-usec for functions, with cross-slot keys." {
set function_str [format "#!lua name=f1
server.register_function{
function_name='f1',
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end,
flags={'allow-cross-slot-keys'}
}" $key $key_secondary]
r function load replace $function_str
r fcall f1 0

set set_usec [get_cmdstat_usec set r]
set get_usec [get_cmdstat_usec get r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]

set expected_slot_stats [
dict create \
$key_slot [
dict create cpu-usec $set_usec
] \
$key_secondary_slot [
dict create cpu-usec $get_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS correctness, without additional arguments.
# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
# -----------------------------------------------------------------------------

start_cluster 1 0 {tags {external:skip cluster}} {
Expand All @@ -305,7 +385,7 @@ start_cluster 1 0 {tags {external:skip cluster}} {

test "CLUSTER SLOT-STATS contains default value upon valkey-server startup" {
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert
}

test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" {
Expand All @@ -323,12 +403,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" {
R 0 DEL $key
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats
assert_empty_slot_stats $slot_stats $metrics_to_assert
}

test "CLUSTER SLOT-STATS slot visibility based on slot ownership changes" {
R 0 CONFIG SET cluster-require-full-coverage no

R 0 CLUSTER DELSLOTS $key_slot
set expected_slots [initialize_expected_slots_dict]
dict unset expected_slots $key_slot
Expand Down Expand Up @@ -417,7 +497,7 @@ start_cluster 1 0 {tags {external:skip cluster}} {
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}

set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
Expand Down Expand Up @@ -449,11 +529,17 @@ start_cluster 1 0 {tags {external:skip cluster}} {
# -----------------------------------------------------------------------------

start_cluster 1 1 {tags {external:skip cluster}} {

# Define shared variables.
set key "FOO"
set key_slot [R 0 CLUSTER KEYSLOT $key]

# For replication, only those metrics that are deterministic upon replication are asserted.
# * key-count is asserted, as both the primary and its replica must hold the same number of keys.
# * cpu-usec is not asserted, as its micro-seconds command duration is not guaranteed to be exact
# between the primary and its replica.
set metrics_to_assert [list key-count]

# Setup replication.
assert {[s -1 role] eq {slave}}
wait_for_condition 1000 50 {
Expand All @@ -472,7 +558,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
wait_for_replica_key_exists $key 1

set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert {$slot_stats_master eq $slot_stats_replica}
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
}

test "CLUSTER SLOT-STATS key-count replication for existing keys" {
Expand All @@ -484,7 +570,7 @@ start_cluster 1 1 {tags {external:skip cluster}} {
wait_for_replica_key_exists $key 1

set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert {$slot_stats_master eq $slot_stats_replica}
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
}

test "CLUSTER SLOT-STATS key-count replication for deleting keys" {
Expand All @@ -496,6 +582,6 @@ start_cluster 1 1 {tags {external:skip cluster}} {
wait_for_replica_key_exists $key 0

set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert {$slot_stats_master eq $slot_stats_replica}
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $metrics_to_assert
}
}

0 comments on commit 9659a40

Please sign in to comment.