Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scoped RDB loading context and immediate abort flag #1173

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -2980,6 +2980,11 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
return res;
}

/* Cleanup function to restore the original loading_rio value. */
static void _restore_loading_rio(rio **old_rio_ptr) {
server.loading_rio = *old_rio_ptr;
}

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
Expand All @@ -3003,6 +3008,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
char buf[1024];
int error;
long long empty_keys_skipped = 0;
RDB_SCOPED_LOADING_RIO(rdb);

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
Expand Down
6 changes: 6 additions & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@
#define RDB_LOAD_ERR_EMPTY_KEY 1 /* Error of empty key */
#define RDB_LOAD_ERR_OTHER 2 /* Any other errors */


/* Macro to temporarily set server.loading_rio within a scope. */
#define RDB_SCOPED_LOADING_RIO(new_rio) \
__attribute__((cleanup(_restore_loading_rio))) rio *_old_rio __attribute__((unused)) = server.loading_rio; \
server.loading_rio = new_rio;

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len);
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
Expand Down
2 changes: 2 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,8 @@ int readIntoReplDataBlock(connection *conn, replDataBufBlock *data_block, size_t
}
if (nread == 0) {
serverLog(LL_VERBOSE, "Provisional primary closed connection");
/* Signal ongoing RDB load to terminate gracefully */
if (server.loading_rio) rioCloseASAP(server.loading_rio);
cancelReplicationHandshake(1);
return C_ERR;
}
Expand Down
16 changes: 13 additions & 3 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down Expand Up @@ -115,7 +116,7 @@ typedef struct _rio rio;
* if needed. */

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
if (r->flags & RIO_FLAG_WRITE_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_write =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -132,7 +133,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}

static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & RIO_FLAG_READ_ERROR || r->flags & RIO_FLAG_CLOSE_ASAP) return 0;
while (len) {
size_t bytes_to_read =
(r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
Expand All @@ -156,6 +157,10 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}

static inline void rioCloseASAP(rio *r) {
r->flags |= RIO_FLAG_CLOSE_ASAP;
}

/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
Expand All @@ -168,8 +173,13 @@ static inline int rioGetWriteError(rio *r) {
return (r->flags & RIO_FLAG_WRITE_ERROR) != 0;
}

/* Like rioGetReadError() but for async close errors. */
static inline int rioGetAsyncCloseError(rio *r) {
return (r->flags & RIO_FLAG_CLOSE_ASAP) != 0;
}

static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR | RIO_FLAG_WRITE_ERROR | RIO_FLAG_CLOSE_ASAP);
}

void rioInitWithFile(rio *r, FILE *fp);
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,7 @@ struct valkeyServer {
int dbid;
} repl_provisional_primary;
client *cached_primary; /* Cached primary to be reused for PSYNC. */
rio *loading_rio; /* Pointer to the rio object currently used for loading data. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a replica */
int repl_rdb_channel_state; /* State of the replica's rdb channel during dual-channel-replication */
Expand Down
66 changes: 65 additions & 1 deletion tests/integration/dual-channel-replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
}

$primary debug log "killing replica main connection"
set replica_main_conn_id [get_client_id_by_last_cmd $primary "sync"]
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines -1]
$primary client kill id $replica_main_conn_id
Expand All @@ -1197,3 +1197,67 @@ start_server {tags {"dual-channel-replication external:skip"}} {
stop_write_load $load_handle
}
}


start_server {tags {"dual-channel-replication external:skip"}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set loglines [count_log_lines 0]

$primary config set repl-diskless-sync yes
$primary config set dual-channel-replication-enabled yes
$primary config set loglevel debug
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry

# Generating RDB will cost 100 sec to generate
$primary debug populate 1000000 primary 1
$primary config set rdb-key-save-delay -1000

start_server {} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]

$replica config set dual-channel-replication-enabled yes
$replica config set loglevel debug
$replica config set repl-timeout 10
$replica config set repl-diskless-load flush-before-load

test "Replica notice main-connection killed during rdb load callback" {; # https://github.com/valkey-io/valkey/issues/1152
set loglines [count_log_lines 0]
$replica replicaof $primary_host $primary_port
# Wait for sync session to start
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't start sync session in time"
}
wait_for_log_messages 0 {"*Loading RDB produced by Valkey version*"} $loglines 1000 10
$primary set key val
set replica_main_conn_id [get_client_id_by_last_cmd $primary "psync"]
$primary debug log "killing replica main connection $replica_main_conn_id"
assert {$replica_main_conn_id != ""}
set loglines [count_log_lines 0]
$primary client kill id $replica_main_conn_id
# Wait for primary to abort the sync
wait_for_condition 50 1000 {
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
} else {
fail "Primary did not free repl buf block after sync failure"
}
wait_for_log_messages 0 {"*Failed trying to load the PRIMARY synchronization DB from socket*"} $loglines 1000 10
# Replica should retry
wait_for_condition 500 1000 {
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't retry after connection close"
}
}
}
}
Loading