Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import-mode: Avoid expiration and eviction during data syncing #1185

Merged
merged 14 commits into from
Nov 19, 2024
29 changes: 29 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = {
#define CLIENT_ID_Keyspecs NULL
#endif

/********** CLIENT IMPORT_SOURCE ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLIENT IMPORT_SOURCE history */
#define CLIENT_IMPORT_SOURCE_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLIENT IMPORT_SOURCE tips */
#define CLIENT_IMPORT_SOURCE_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLIENT IMPORT_SOURCE key specs */
#define CLIENT_IMPORT_SOURCE_Keyspecs NULL
#endif

/* CLIENT IMPORT_SOURCE enabled argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = {
{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLIENT IMPORT_SOURCE argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = {
{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs},
};

/********** CLIENT INFO ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = {
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
{MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)},
{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.1.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args},
{MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)},
{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args},
{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args},
Expand Down
40 changes: 40 additions & 0 deletions src/commands/client-import-source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"IMPORT-SOURCE": {
"summary": "Mark this client as an import source when server is in import mode.",
"complexity": "O(1)",
"group": "connection",
"since": "8.1.0",
"arity": 3,
"container": "CLIENT",
"function": "clientCommand",
"command_flags": [
"NOSCRIPT",
"LOADING",
"STALE"
],
"acl_categories": [
"CONNECTION"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "enabled",
"type": "oneof",
"arguments": [
{
"name": "on",
"type": "pure-token",
"token": "ON"
},
{
"name": "off",
"type": "pure-token",
"token": "OFF"
}
]
}
]
}
}
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3136,6 +3136,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
21 changes: 20 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) {
key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key));
if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) {
if (allvolatile && server.primary_host && --maxtries == 0) {
if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the repilca, so the function cannot stop because
Expand Down Expand Up @@ -1826,6 +1826,25 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di
if (server.primary_host != NULL) {
if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
} else if (server.import_mode) {
/* If we are running in the import mode on a primary, instead of
* evicting the expired key from the database, we return ASAP:
* the key expiration is controlled by the import source that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on this server
* because it's a primary.
*
* Notice: other clients, apart from the import source, should not access
* the data imported by import source.
Comment on lines +1837 to +1838
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be enforced by server instead of providing a warning in the code comment/config file? The client's not marked with import_source should receive LOADING error if I'm understanding the use case correctly.

*
* Still we try to return the right information to the caller,
* that is, KEY_VALID if we think the key should still be valid,
* KEY_EXPIRED if we think the key is expired but don't want to delete it at this time.
*
* When receiving commands from the import source, keys are never considered
* expired. */
if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}

/* In some cases we're explicitly instructed to return an indication of a
Expand Down
4 changes: 2 additions & 2 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ int performEvictions(void) {
goto update_metrics;
}

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids. */
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || (iAmPrimary() && server.import_mode)) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */
goto update_metrics;
}

Expand Down
7 changes: 5 additions & 2 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,11 @@ int checkAlreadyExpired(long long when) {
* of a replica instance.
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the primary. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host);
* (possibly in the past) and wait for an explicit DEL from the primary.
*
* If the server is a primary and in the import mode, we also add the already
* expired key and wait for an explicit DEL from the import source. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}

#define EXPIRE_NX (1 << 0)
Expand Down
20 changes: 20 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3567,6 +3567,10 @@ void clientCommand(client *c) {
" Protect current client connection from eviction.",
"NO-TOUCH (ON|OFF)",
" Will not touch LRU/LFU stats when this mode is on.",
"IMPORT-SOURCE (ON|OFF)",
" Mark this connection as an import source if server.import_mode is true.",
" Sync tools can set their connections into 'import-source' state to visit",
" expired keys.",
NULL};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) {
Expand Down Expand Up @@ -4040,6 +4044,22 @@ void clientCommand(client *c) {
}
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "import-source")) {
/* CLIENT IMPORT-SOURCE ON|OFF */
if (!server.import_mode) {
addReplyError(c, "Server is not in import mode");
return;
}
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flag.import_source = 1;
addReply(c, shared.ok);
Comment on lines +4053 to +4055
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to client import_source flag which were turned on explicitly when server import_mode is turned off?

} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flag.import_source = 0;
addReply(c, shared.ok);
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
} else {
addReplySubcommandSyntaxError(c);
}
Expand Down
7 changes: 5 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,9 @@ void databasesCron(void) {
* as primary will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmPrimary()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
if (!server.import_mode) {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
}
} else {
expireReplicaKeys();
}
Expand Down Expand Up @@ -1651,7 +1653,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could have had similar conditional statement above at line 1058

!server.import_mode && iAmPrimary()


if (moduleCount()) {
moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL);
Expand Down Expand Up @@ -2057,6 +2059,7 @@ void initServerConfig(void) {
server.extended_redis_compat = 0;
server.pause_cron = 0;
server.dict_resizing = 1;
server.import_mode = 0;

server.latency_tracking_info_percentiles_len = 3;
server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len));
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,8 @@ typedef struct ClientFlags {
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t reserved : 5; /* Reserved for future use */
uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */
uint64_t reserved : 4; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2070,6 +2071,8 @@ struct valkeyServer {
char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */
long long primary_initial_offset; /* Primary PSYNC offset. */
int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Import Mode */
int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/expire.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,80 @@ start_server {tags {"expire"}} {
close_replication_stream $repl
assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {Import mode should forbid active expiration} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile to run these tests in cluster-enabled mode as well.

r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
r set foo2 bar PX 1
after 100

assert_equal [r dbsize] {2}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}

test {Import mode should forbid lazy expiration} {
r flushall
r debug set-active-expire 0

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 1 PX 1
after 10

r get foo1
assert_equal [r dbsize] {1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

r get foo1

assert_equal [r dbsize] {0}

assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {RANDOMKEY can return expired key in import mode} {
r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
after 10

set client [valkey [srv "host"] [srv "port"] 0 $::tls]
if {!$::singledb} {
$client select 9
}
assert_equal [$client ttl foo1] {-2}

assert_equal [r randomkey] {foo1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}
}

start_cluster 1 0 {tags {"expire external:skip cluster"}} {
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/maxmemory.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} {
assert {[r object freq foo] == 5}
}
}

start_server {tags {"maxmemory" "external:skip"}} {
test {Import mode should forbid eviction} {
r set key val
r config set import-mode yes
assert_equal [r client import-source on] {OK}
r config set maxmemory-policy allkeys-lru
r config set maxmemory 1

assert_equal [r dbsize] {1}
assert_error {OOM command not allowed*} {r set key1 val1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

assert_equal [r dbsize] {0}
}
}
7 changes: 7 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,13 @@ replica-priority 100
#
# replica-ignore-disk-write-errors no

# Make the primary forbid expiration and eviction.
# This is useful for sync tools, because expiration and eviction may cause the data corruption.
# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE.
# NOTICE: Clients should avoid writing the same key on the source server and the destination server.
#
# import-mode no

# -----------------------------------------------------------------------------
# By default, Sentinel includes all replicas in its reports. A replica
# can be excluded from Sentinel's announcements. An unannounced replica
Expand Down
Loading