Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scoped RDB loading context and immediate abort flag #1173

Open
wants to merge 16 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ char *rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);

#ifdef __GNUC__
void rdbReportError(int corruption_error, int linenum, char *reason, ...) __attribute__((format(printf, 3, 4)));
Expand Down Expand Up @@ -2985,7 +2986,19 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
functionsLibCtx *functions_lib_ctx = functionsLibCtxGetCurrent();
rdbLoadingCtx loading_ctx = {.dbarray = server.db, .functions_lib_ctx = functions_lib_ctx};
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, &loading_ctx);
int retval = rdbLoadRioWithLoadingCtxScopedRdb(rdb, rdbflags, rsi, &loading_ctx);
return retval;
}

/* Wrapper for rdbLoadRioWithLoadingCtx that manages a scoped RDB context.
* This method wraps the rdbLoadRioWithLoadingCtx function, providing temporary
* RDB context management. It sets a new current loading RDB, calls the wrapped
* function, and then restores the previous loading RDB context. */
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
rio *prev_rio = server.loading_rio;
server.loading_rio = rdb;
int retval = rdbLoadRioWithLoadingCtx(rdb, rdbflags, rsi, rdb_loading_ctx);
server.loading_rio = prev_rio;
return retval;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
Expand Down
16 changes: 7 additions & 9 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2244,7 +2244,7 @@ void readSyncBulkPayload(connection *conn) {

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtx(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
"from socket, check server logs.");
Expand Down Expand Up @@ -2824,18 +2824,16 @@ typedef struct replDataBufBlock {
* Reads replication data from primary into specified repl buffer block */
int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t read) {
int nread = connRead(conn, data_block->buf + data_block->used, read);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_NOTICE, "Error reading from primary: %s", connGetLastError(conn));

if (nread <= 0) {
if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Provisional primary closed connection");
/* Signal ongoing RDB load to terminate gracefully */
if (server.loading_rio) rioCloseASAP(server.loading_rio);
cancelReplicationHandshake(1);
}
return C_ERR;
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
cancelReplicationHandshake(1);
return C_ERR;
}
data_block->used += nread;
server.stat_total_reads_processed++;
return read - nread;
Expand Down
16 changes: 13 additions & 3 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down Expand Up @@ -115,7 +116,7 @@ typedef struct _rio rio;
* if needed. */

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_write =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}

static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}

static inline void rioCloseASAP(rio *r) {
r->flags |= RIO_FLAG_CLOSE_ASAP;
}

/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
Expand All @@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

/* Like rioGetReadError() but for async close errors. */
static inline int rioGetAsyncCloseError(rio *r) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used anywhere. suggest to drop it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rioGetWriteError is not used anywhere either. This would maintain symmetry in our error handling capabilities for read, write, and async close operations, even if they're not currently utilized.

return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
}

void rioInitWithFile(rio *r, FILE *fp);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,7 @@ struct valkeyServer {
int dbid;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
rio *loading_rio; /* Pointer to the rio object currently used for loading data. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a replica */
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
Expand Down
62 changes: 59 additions & 3 deletions tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1111,8 +1111,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will cost 100 sec to generate
$primary debug populate 10000 primary 1
$primary config set rdb-key-save-delay 10000
$primary debug populate 100000 primary 1
$primary config set rdb-key-save-delay 1000

start_server {} {
set replica [srv 0 client]
Expand Down Expand Up @@ -1174,7 +1174,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}

$primary debug log "killing replica main connection"
set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"]
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines -1]
$primary client kill id $replica_main_conn_id
Expand All @@ -1197,3 +1197,59 @@ start_server {tags {"dual-channel-replication external:skip"}} {
stop_write_load $load_handle
}
}


start_server {tags {"dual-channel-replication external:skip"}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]

$primary config set repl-diskless-sync yes
$primary config set dual-channel-replication-enabled yes
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will take 100 sec to generate
$primary debug populate 1000000 primary 1
$primary config set rdb-key-save-delay -10

start_server {} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]

$replica config set dual-channel-replication-enabled yes
$replica config set loglevel debug
$replica config set repl-timeout 10
$replica config set repl-diskless-load flush-before-load

test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152
set loglines [count_log_lines 0]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't start sync session in time"
}
wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10
$primary set key val
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
$primary debug log "killing replica main connection $replica_main_conn_id"
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines 0]
$primary config set rdb-key-save-delay 0; # disable delay to allow next sync to succeed
$primary client kill id $replica_main_conn_id
# Wait for primary to abort the sync
wait_for_condition 50 1000 {
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
} else {
fail "Primary did not free repl buf block after sync failure"
}
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
verify_replica_online $primary 0 500
}
}
}
Loading