Skip to content

Commit

Permalink
Persistence - Remove Unowned Keys
Browse files Browse the repository at this point in the history
For cluster, after startup loading, remove keys
that shouldn't be served by this server based on slot
assignment of a cluster.

Also added stat fields in server to count the total
removed keys and skipped slots from last loading.
  • Loading branch information
singku committed May 28, 2024
1 parent fd58b73 commit e827bfc
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
26 changes: 26 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6552,3 +6552,29 @@ void clusterReplicateOpenSlots(void) {

zfree(argv);
}

del_stat delUnOwnedKeys() {
del_stat result = {0, 0};
if (!server.cluster_enabled) {
return result;
}

clusterNode *master;
if (clusterNodeIsMaster(server.cluster->myself)) {
master = myself;
} else {
master = server.cluster->myself->slaveof;
}
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (server.cluster->slots[i] != master) {
unsigned int deleted = delKeysInSlot(i);
result.deleted_keys += deleted;
if (deleted > 0) {
result.unowned_slots++;
serverLog(LL_NOTICE, "Deleted %u keys from unowned slot: %d after loading RDB", deleted, i);
}
}
}
serverLog(LL_NOTICE, "Deleted totoal: %d unowned keys from total: %d unowned slots after loading RDB", result.deleted_keys, result.unowned_slots);
return result;
}
9 changes: 8 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,8 @@ void initServer(void) {
server.rdb_save_time_start = -1;
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.stat_persistence_startup_load_keys_deleted = 0;
server.stat_persistence_startup_load_unowned_slots = 0;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
Expand Down Expand Up @@ -5582,7 +5584,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
aof_bio_fsync_status == C_OK) ? "ok" : "err",
"aof_last_cow_size:%zu\r\n", server.stat_aof_cow_bytes,
"module_fork_in_progress:%d\r\n", server.child_type == CHILD_TYPE_MODULE,
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes));
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes,
"total_startup_load_deleted_keys:%lld\r\n", server.stat_persistence_startup_load_keys_deleted,
"total_startup_load_unowned_slots:%lld\r\n", server.stat_persistence_startup_load_unowned_slots));
/* clang-format on */

if (server.aof_enabled) {
Expand Down Expand Up @@ -6941,6 +6945,9 @@ int main(int argc, char **argv) {
serverLog(LL_NOTICE, "Server initialized");
aofLoadManifestFromDisk();
loadDataFromDisk();
del_stat stat = delUnOwnedKeys();
server.stat_persistence_startup_load_keys_deleted = stat.deleted_keys;
server.stat_persistence_startup_load_unowned_slots = stat.unowned_slots;
aofOpenIfNeededOnServerStart();
aofDelHistoryFiles();
if (server.cluster_enabled) {
Expand Down
9 changes: 9 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,8 @@ struct valkeyServer {
invocation of the event loop. */
unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each
invocation of the event loop. */
long long stat_persistence_startup_load_keys_deleted;
long long stat_persistence_startup_load_unowned_slots;
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
Expand Down Expand Up @@ -3535,6 +3537,13 @@ unsigned long LFUDecrAndReturn(robj *o);
int performEvictions(void);
void startEvictionTimeProc(void);

typedef struct {
unsigned int unowned_slots;
unsigned int deleted_keys;
} del_stat;
/* delete unowned keys from database at server start up after loading persistence files. */
del_stat delUnOwnedKeys(void);

/* Keys hashing / comparison functions for dict.c hash tables. */
uint64_t dictSdsHash(const void *key);
uint64_t dictSdsCaseHash(const void *key);
Expand Down

0 comments on commit e827bfc

Please sign in to comment.