From 2e71875e06550f06fb03919a1ed2d1fd55ac2779 Mon Sep 17 00:00:00 2001 From: naglera Date: Tue, 15 Oct 2024 07:02:51 +0000 Subject: [PATCH] Add ASAP abort flag to provisional primary for safer replication disconnection handling Introduces a dedicated flag in provisional primary struct to signal immediate abort, preventing potential use-after-free scenarios during replication disconnection in dual-channel load. This ensures proper termination of rdbLoadRioWithLoadingCtx when replication is cancelled due to connection loss on main connection. Fixes https://github.com/valkey-io/valkey/issues/1152 Signed-off-by: naglera --- src/rdb.c | 7 ++++++- src/replication.c | 2 ++ src/rio.c | 3 ++- src/rio.h | 10 ++++++---- src/server.h | 1 + src/valkey-check-rdb.c | 2 +- 6 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index bc2d03e86c..c90ef369e8 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2920,7 +2920,7 @@ void stopSaving(int success) { /* Track loading progress in order to serve client's from time to time and if needed calculate rdb checksum */ -void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { +int rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.rdb_checksum) rioGenericUpdateChecksum(r, buf, len); if (server.loading_process_events_interval_bytes && (r->processed_bytes + len) / server.loading_process_events_interval_bytes > @@ -2933,6 +2933,11 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { server.stat_net_repl_input_bytes += len; } + if (server.repl_provisional_primary.close_asap == 1) { + serverLog(LL_WARNING, "Primary main connection dropped during RDB load callback"); + return -1; + } + return 0; } /* Save the given functions_ctx to the rdb. diff --git a/src/replication.c b/src/replication.c index 63433de865..d17c6d2118 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2691,6 +2691,7 @@ static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) { server.repl_provisional_primary.reploff = reploffset; server.repl_provisional_primary.read_reploff = reploffset; server.repl_provisional_primary.dbid = dbid; + server.repl_provisional_primary.close_asap = 0; /* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the * main connection accordingly.*/ @@ -2823,6 +2824,7 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t } if (nread == 0) { serverLog(LL_VERBOSE, "Provisional primary closed connection"); + server.repl_provisional_primary.close_asap = 1; cancelReplicationHandshake(1); return C_ERR; } diff --git a/src/rio.c b/src/rio.c index f2bf1fdb3c..4b988c84f5 100644 --- a/src/rio.c +++ b/src/rio.c @@ -424,8 +424,9 @@ void rioFreeFd(rio *r) { /* This function can be installed both in memory and file streams when checksum * computation is needed. */ -void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { +int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { r->cksum = crc64(r->cksum, buf, len); + return 1; } /* Set the file-based rio object to auto-fsync every 'bytes' file written. diff --git a/src/rio.h b/src/rio.h index ee0f27aa7e..cdb4ce0e02 100644 --- a/src/rio.h +++ b/src/rio.h @@ -57,8 +57,10 @@ struct _rio { * all the data that was read or written so far. The method should be * designed so that can be called with the current checksum, and the buf * and len fields pointing to the new block of data to add to the checksum - * computation. */ - void (*update_cksum)(struct _rio *, const void *buf, size_t len); + * computation. + * The method should return -1 to indicate that the rio operation should be + * terminated, or a non-negative value to continue processing. */ + int (*update_cksum)(struct _rio *, const void *buf, size_t len); /* The current checksum and flags (see RIO_FLAG_*) */ uint64_t cksum, flags; @@ -140,7 +142,7 @@ static inline size_t rioRead(rio *r, void *buf, size_t len) { r->flags |= RIO_FLAG_READ_ERROR; return 0; } - if (r->update_cksum) r->update_cksum(r, buf, bytes_to_read); + if (r->update_cksum && r->update_cksum(r, buf, bytes_to_read) < 0) return 0; buf = (char *)buf + bytes_to_read; len -= bytes_to_read; r->processed_bytes += bytes_to_read; @@ -188,7 +190,7 @@ size_t rioWriteBulkDouble(rio *r, double d); struct serverObject; int rioWriteBulkObject(rio *r, struct serverObject *obj); -void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); +int rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); void rioSetAutoSync(rio *r, off_t bytes); void rioSetReclaimCache(rio *r, int enabled); uint8_t rioCheckType(rio *r); diff --git a/src/server.h b/src/server.h index 4fad8d2508..c7953520d6 100644 --- a/src/server.h +++ b/src/server.h @@ -2038,6 +2038,7 @@ struct valkeyServer { long long reploff; long long read_reploff; int dbid; + uint64_t close_asap : 1; } repl_provisional_primary; client *cached_primary; /* Cached primary to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ diff --git a/src/valkey-check-rdb.c b/src/valkey-check-rdb.c index ba94c172c7..c109c061d4 100644 --- a/src/valkey-check-rdb.c +++ b/src/valkey-check-rdb.c @@ -37,7 +37,7 @@ #include void createSharedObjects(void); -void rdbLoadProgressCallback(rio *r, const void *buf, size_t len); +int rdbLoadProgressCallback(rio *r, const void *buf, size_t len); int rdbCheckMode = 0; struct {