Skip to content

Commit

Permalink
defrag: eliminate persistent pointers
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Brunner <[email protected]>
  • Loading branch information
JimB123 committed Dec 12, 2024
1 parent 0c8ad5c commit 0ed833b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
67 changes: 41 additions & 26 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdat
// Private data for main dictionary keys
typedef struct {
kvstoreIterState kvstate;
serverDb *db;
int dbid;
} defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");

Expand Down Expand Up @@ -735,7 +735,7 @@ static void defragModule(serverDb *db, robj *obj) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, robj **elemref) {
serverDb *db = ctx->db;
serverDb *db = &server.db[ctx->dbid];
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
Expand Down Expand Up @@ -919,7 +919,7 @@ static doneStatus defragLaterStep(monotime endtime, void *privdata) {
robj *ob = found;

long long key_defragged = server.stat_active_defrag_hits;
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->db->id) == 1);
bool timeout = (defragLaterItem(ob, &defrag_later_cursor, endtime, ctx->dbid) == 1);
if (key_defragged != server.stat_active_defrag_hits) {
server.stat_active_defrag_key_hits++;
} else {
Expand Down Expand Up @@ -962,7 +962,10 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
state.cursor = 0;
return DEFRAG_NOT_DONE;
}
serverAssert(kvs == state.kvs); // Shouldn't change during the stage
if (kvs != state.kvs) {
// There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage.
return DEFRAG_DONE;
}

unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
Expand Down Expand Up @@ -1012,26 +1015,30 @@ static doneStatus defragStageKvstoreHelper(monotime endtime,
}


// Note: target is a DB, (not a KVS like most stages)
// Target is a DBID
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
serverDb *db = (serverDb *)target;
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];

static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
ctx.db = db;
ctx.dbid = dbid;
// Don't return yet. Call the helper with endtime==0 below.
}
serverAssert(ctx.db == db);
serverAssert(ctx.dbid == dbid);

return defragStageKvstoreHelper(endtime, db->keys,
dbKeysScanCallback, defragLaterStep, &ctx);
}


// Target is a DBID
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
return defragStageKvstoreHelper(endtime, (kvstore *)target,
int dbid = (uintptr_t)target;
serverDb *db = &server.db[dbid];
return defragStageKvstoreHelper(endtime, db->expires,
scanHashtableCallbackCountScanned, NULL, NULL);
}

Expand Down Expand Up @@ -1222,29 +1229,38 @@ static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long i
}

monotime starttime = getMonotonicUs();
monotime endtime = starttime + computeDefragCycleUs();
int dutyCycleUs = computeDefragCycleUs();
monotime endtime = starttime + dutyCycleUs;
bool haveMoreWork = true;

mstime_t latency;
latencyStartMonitor(latency);

if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}
do {
if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}

doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}
doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}

haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0);
/* If we've completed a stage early, and still have a standard time allotment remaining,
* we'll start another stage. This can happen when defrag is running infrequently, and
* starvation protection has increased the duty-cycle. */
} while (haveMoreWork && getMonotonicUs() <= endtime - server.active_defrag_cycle_us);

latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle", latency);

if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) {
if (haveMoreWork) {
return computeDelayMs(endtime);
} else {
endDefragCycle(true);
Expand Down Expand Up @@ -1283,9 +1299,8 @@ static void beginDefragCycle(void) {
defrag.remaining_stages = listCreate();

for (int dbid = 0; dbid < server.dbnum; dbid++) {
serverDb *db = &server.db[dbid];
addDefragStage(defragStageDbKeys, db, NULL);
addDefragStage(defragStageExpiresKvstore, db->expires, NULL);
addDefragStage(defragStageDbKeys, (void*)(uintptr_t)dbid, NULL);
addDefragStage(defragStageExpiresKvstore, (void*)(uintptr_t)dbid, NULL);
}

static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels};
Expand Down
3 changes: 1 addition & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1900,8 +1900,7 @@ struct valkeyServer {
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
int jemalloc_bg_thread; /* Enable jemalloc background thread */
int active_defrag_configuration_changed; /* defrag configuration has been changed and need to reconsider
* active_defrag_running in computeDefragCycles. */
int active_defrag_configuration_changed; /* Config changed; need to recompute active_defrag_cpu_percent. */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
Expand Down

0 comments on commit 0ed833b

Please sign in to comment.