From 9849350c2e5e21488580fa2168179844647bdbc2 Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 29 Oct 2024 10:32:58 +0000 Subject: [PATCH] Add scoped RDB loading context This commit introduces a new mechanism for temporarily changing the server's loading_rio context during RDB loading operations. The new RDB_SCOPED_LOADING_RIO macro allows for a scoped change of the server.loading_rio value, ensuring that it's automatically restored to its original value when the scope ends. Signed-off-by: naglera --- src/rdb.c | 10 ++++++---- src/rdb.h | 6 ++++++ src/replication.c | 1 + src/rio.h | 10 ++++++++-- src/server.h | 1 + tests/integration/dual-channel-replication.tcl | 2 +- 6 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 4b211ccef8..33eba7d422 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2930,10 +2930,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { loadingAbsProgress(r->processed_bytes); processEventsWhileBlocked(); processModuleLoadingProgressEvent(0); - if (server.repl_provisional_primary.close_asap == 1) { - serverLog(LL_WARNING, "Primary main connection dropped during RDB load callback"); - return -1; - } } if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { server.stat_net_repl_input_bytes += len; @@ -2984,6 +2980,11 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s return res; } +/* Cleanup function to restore the original loading_rio value. */ +static void _restore_loading_rio(rio **old_rio_ptr) { + server.loading_rio = *old_rio_ptr; +} + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { @@ -3007,6 +3008,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin char buf[1024]; int error; long long empty_keys_skipped = 0; + RDB_SCOPED_LOADING_RIO(rdb); rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; diff --git a/src/rdb.h b/src/rdb.h index e9d53fa398..0d465111ec 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -139,6 +139,12 @@ #define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */ #define RDB_LOAD_ERR_OTHER 2 /* Any other errors */ + +/* Macro to temporarily set server.loading_rio within a scope. */ +#define RDB_SCOPED_LOADING_RIO(new_rio) \ + __attribute__((cleanup(_restore_loading_rio))) rio *_old_rio __attribute__((unused)) = server.loading_rio; \ + server.loading_rio = new_rio; + ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len); int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); diff --git a/src/replication.c b/src/replication.c index a92bb79984..a29f597485 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2833,6 +2833,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t } if (nread == 0) { serverLog(LL_VERBOSE, "Provisional primary closed connection"); + if (server.loading_rio) server.loading_rio->flags |= RIO_FLAG_CLOSE_ASAP; cancelReplicationHandshake(1); return C_ERR; } diff --git a/src/rio.h b/src/rio.h index ee0f27aa7e..10e5ad2821 100644 --- a/src/rio.h +++ b/src/rio.h @@ -39,6 +39,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) +#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */ #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) { } static inline size_t rioRead(rio *r, void *buf, size_t len) { - if (r->flags & RIO_FLAG_READ_ERROR) return 0; + if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0; while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; @@ -168,8 +169,13 @@ static inline int rioGetWriteError(rio *r) { return (r->flags & RIO_FLAG_WRITE_ERROR) != 0; } +/* Like rioGetReadError() but for async close errors. */ +static inline int rioGetAsyncCloseError(rio *r) { + return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0; +} + static inline void rioClearErrors(rio *r) { - r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR); + r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP); } void rioInitWithFile(rio *r, FILE *fp); diff --git a/src/server.h b/src/server.h index 5cf56e9c86..6f7fe94c9f 100644 --- a/src/server.h +++ b/src/server.h @@ -2058,6 +2058,7 @@ struct valkeyServer { int dbid; } repl_provisional_primary; client *cached_primary; /* Cached primary to be reused for PSYNC. */ + rio *loading_rio; /* Pointer to the Rio object currently used for loading data. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_state; /* Replication status if the instance is a replica */ int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */ diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 93dcbda9a7..39be658d3e 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -1249,7 +1249,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Primary did not free repl buf block after sync failure" } - wait_for_log_messages 0 {"*Primary main connection dropped during RDB load callback*"} $loglines 1000 10 + wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10 # Replica should retry wait_for_condition 500 1000 { [string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&