-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Import-mode: Avoid expiration and eviction during data syncing #1185
Changes from 13 commits
f59c5e8
7226e55
c2e86bf
937924c
b3b5c46
a79a51a
3e36548
7883e9f
523b462
be13edc
7b1127e
11f75bc
20b005c
7f04964
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
{ | ||
"IMPORT-SOURCE": { | ||
"summary": "Mark this client as an import source when server is in import mode.", | ||
"complexity": "O(1)", | ||
"group": "connection", | ||
"since": "8.1.0", | ||
"arity": 3, | ||
"container": "CLIENT", | ||
"function": "clientCommand", | ||
"command_flags": [ | ||
"NOSCRIPT", | ||
"LOADING", | ||
"STALE" | ||
], | ||
"acl_categories": [ | ||
"CONNECTION" | ||
], | ||
"reply_schema": { | ||
"const": "OK" | ||
}, | ||
"arguments": [ | ||
{ | ||
"name": "enabled", | ||
"type": "oneof", | ||
"arguments": [ | ||
{ | ||
"name": "on", | ||
"type": "pure-token", | ||
"token": "ON" | ||
}, | ||
{ | ||
"name": "off", | ||
"type": "pure-token", | ||
"token": "OFF" | ||
} | ||
] | ||
} | ||
] | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3567,6 +3567,10 @@ void clientCommand(client *c) { | |
" Protect current client connection from eviction.", | ||
"NO-TOUCH (ON|OFF)", | ||
" Will not touch LRU/LFU stats when this mode is on.", | ||
"IMPORT-SOURCE (ON|OFF)", | ||
" Mark this connection as an import source if server.import_mode is true.", | ||
" Sync tools can set their connections into 'import-source' state to visit", | ||
" expired keys.", | ||
NULL}; | ||
addReplyHelp(c, help); | ||
} else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) { | ||
|
@@ -4040,6 +4044,22 @@ void clientCommand(client *c) { | |
} | ||
} | ||
addReply(c, shared.ok); | ||
} else if (!strcasecmp(c->argv[1]->ptr, "import-source")) { | ||
/* CLIENT IMPORT-SOURCE ON|OFF */ | ||
if (!server.import_mode) { | ||
addReplyError(c, "Server is not in import mode"); | ||
return; | ||
} | ||
if (!strcasecmp(c->argv[2]->ptr, "on")) { | ||
c->flag.import_source = 1; | ||
addReply(c, shared.ok); | ||
Comment on lines
+4053
to
+4055
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens to client |
||
} else if (!strcasecmp(c->argv[2]->ptr, "off")) { | ||
c->flag.import_source = 0; | ||
addReply(c, shared.ok); | ||
} else { | ||
addReplyErrorObject(c, shared.syntaxerr); | ||
return; | ||
} | ||
} else { | ||
addReplySubcommandSyntaxError(c); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1056,7 +1056,9 @@ void databasesCron(void) { | |
* as primary will synthesize DELs for us. */ | ||
if (server.active_expire_enabled) { | ||
if (iAmPrimary()) { | ||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); | ||
if (!server.import_mode) { | ||
zuiderkwast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); | ||
} | ||
} else { | ||
expireReplicaKeys(); | ||
} | ||
|
@@ -1651,7 +1653,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { | |
|
||
/* Run a fast expire cycle (the called function will return | ||
* ASAP if a fast cycle is not needed). */ | ||
if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); | ||
if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we could have had similar conditional statement above at line 1058
|
||
|
||
if (moduleCount()) { | ||
moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL); | ||
|
@@ -2057,6 +2059,7 @@ void initServerConfig(void) { | |
server.extended_redis_compat = 0; | ||
server.pause_cron = 0; | ||
server.dict_resizing = 1; | ||
server.import_mode = 0; | ||
|
||
server.latency_tracking_info_percentiles_len = 3; | ||
server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -832,6 +832,80 @@ start_server {tags {"expire"}} { | |
close_replication_stream $repl | ||
assert_equal [r debug set-active-expire 1] {OK} | ||
} {} {needs:debug} | ||
|
||
test {Import mode should forbid active expiration} { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be worthwhile to run these tests in cluster-enabled mode as well. |
||
r flushall | ||
|
||
r config set import-mode yes | ||
assert_equal [r client import-source on] {OK} | ||
|
||
r set foo1 bar PX 1 | ||
r set foo2 bar PX 1 | ||
after 100 | ||
|
||
assert_equal [r dbsize] {2} | ||
|
||
assert_equal [r client import-source off] {OK} | ||
r config set import-mode no | ||
|
||
# Verify all keys have expired | ||
wait_for_condition 40 100 { | ||
[r dbsize] eq 0 | ||
} else { | ||
fail "Keys did not actively expire." | ||
} | ||
} | ||
|
||
test {Import mode should forbid lazy expiration} { | ||
r flushall | ||
r debug set-active-expire 0 | ||
|
||
r config set import-mode yes | ||
assert_equal [r client import-source on] {OK} | ||
|
||
r set foo1 1 PX 1 | ||
after 10 | ||
|
||
r get foo1 | ||
assert_equal [r dbsize] {1} | ||
|
||
assert_equal [r client import-source off] {OK} | ||
r config set import-mode no | ||
|
||
r get foo1 | ||
|
||
assert_equal [r dbsize] {0} | ||
|
||
assert_equal [r debug set-active-expire 1] {OK} | ||
} {} {needs:debug} | ||
|
||
test {RANDOMKEY can return expired key in import mode} { | ||
r flushall | ||
|
||
r config set import-mode yes | ||
assert_equal [r client import-source on] {OK} | ||
|
||
r set foo1 bar PX 1 | ||
after 10 | ||
|
||
set client [valkey [srv "host"] [srv "port"] 0 $::tls] | ||
if {!$::singledb} { | ||
$client select 9 | ||
} | ||
assert_equal [$client ttl foo1] {-2} | ||
|
||
assert_equal [r randomkey] {foo1} | ||
|
||
assert_equal [r client import-source off] {OK} | ||
r config set import-mode no | ||
|
||
# Verify all keys have expired | ||
wait_for_condition 40 100 { | ||
[r dbsize] eq 0 | ||
} else { | ||
fail "Keys did not actively expire." | ||
} | ||
} | ||
} | ||
|
||
start_cluster 1 0 {tags {"expire external:skip cluster"}} { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be enforced by server instead of providing a warning in the code comment/config file? The client's not marked with import_source should receive
LOADING
error if I'm understanding the use case correctly.