Skip to content

Commit

Permalink
Add scoped RDB loading context
Browse files Browse the repository at this point in the history
This commit introduces a new mechanism for temporarily changing the
server's loading_rio context during RDB loading operations. The new
RDB_SCOPED_LOADING_RIO macro allows for a scoped change of the
server.loading_rio value, ensuring that it's automatically restored
to its original value when the scope ends.

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Oct 29, 2024
1 parent f89b716 commit 9849350
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 7 deletions.
10 changes: 6 additions & 4 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2930,10 +2930,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
loadingAbsProgress(r->processed_bytes);
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
if (server.repl_provisional_primary.close_asap == 1) {
serverLog(LL_WARNING, "Primary main connection dropped during RDB load callback");
return -1;
}
}
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
server.stat_net_repl_input_bytes += len;
Expand Down Expand Up @@ -2984,6 +2980,11 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
return res;
}

/* Cleanup function to restore the original loading_rio value. */
static void _restore_loading_rio(rio **old_rio_ptr) {
server.loading_rio = *old_rio_ptr;
}

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
Expand All @@ -3007,6 +3008,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
char buf[1024];
int error;
long long empty_keys_skipped = 0;
RDB_SCOPED_LOADING_RIO(rdb);

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
Expand Down
6 changes: 6 additions & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */


/* Macro to temporarily set server.loading_rio within a scope. */
#define RDB_SCOPED_LOADING_RIO(new_rio) \
__attribute__((cleanup(_restore_loading_rio))) rio *_old_rio __attribute__((unused)) = server.loading_rio; \
server.loading_rio = new_rio;

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
Expand Down
1 change: 1 addition & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
if (server.loading_rio) server.loading_rio->flags |= RIO_FLAG_CLOSE_ASAP;
cancelReplicationHandshake(1);
return C_ERR;
}
Expand Down
10 changes: 8 additions & 2 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down Expand Up @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}

static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand Down Expand Up @@ -168,8 +169,13 @@ static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

/* Like rioGetReadError() but for async close errors. */
static inline int rioGetAsyncCloseError(rio *r) {
return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
}

void rioInitWithFile(rio *r, FILE *fp);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,7 @@ struct valkeyServer {
int dbid;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
rio *loading_rio; /* Pointer to the Rio object currently used for loading data. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a replica */
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
} else {
fail "Primary did not free repl buf block after sync failure"
}
wait_for_log_messages 0 {"*Primary main connection dropped during RDB load callback*"} $loglines 1000 10
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
# Replica should retry
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
Expand Down

0 comments on commit 9849350

Please sign in to comment.