Skip to content

Commit

Permalink
Fix some issues with modules and MULTI/EXEC (redis#8617)
Browse files Browse the repository at this point in the history
Bug 1:
When a module ctx is freed moduleHandlePropagationAfterCommandCallback
is called and handles propagation. We want to prevent it from propagating
commands that were not replicated by the same context. Example:
1. module1.foo does: RM_Replicate(cmd1); RM_Call(cmd2); RM_Replicate(cmd3)
2. RM_Replicate(cmd1) propagates MULTI and adds cmd1 to also_propagagte
3. RM_Call(cmd2) create a new ctx, calls call() and destroys the ctx.
4. moduleHandlePropagationAfterCommandCallback is called, calling
   alsoPropagates EXEC (Note: EXEC is still not written to socket),
   setting server.in_trnsaction = 0
5. RM_Replicate(cmd3) is called, propagagting yet another MULTI (now
   we have nested MULTI calls, which is no good) and then cmd3

We must prevent RM_Call(cmd2) from resetting server.in_transaction.
REDISMODULE_CTX_MULTI_EMITTED was revived for that purpose.

Bug 2:
Fix issues with nested RM_Call where some have '!' and some don't.
Example:
1. module1.foo does RM_Call of module2.bar without replication (i.e. no '!')
2. module2.bar internally calls RM_Call of INCR with '!'
3. at the end of module1.foo we call RM_ReplicateVerbatim

We want the replica/AOF to see only module1.foo and not the INCR from module2.bar

Introduced a global replication_allowed flag inside RM_Call to determine
whether we need to replicate or not (even if '!' was specified)

Other changes:
Split beforePropagateMultiOrExec to beforePropagateMulti afterPropagateExec
just for better readability
  • Loading branch information
guybe7 authored Mar 10, 2021
1 parent ceb3a7a commit 3d0b427
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 19 deletions.
27 changes: 22 additions & 5 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_THREAD_SAFE (1<<4)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6)
#define REDISMODULE_CTX_MULTI_EMITTED (1<<7)

/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
Expand Down Expand Up @@ -599,6 +600,10 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {

/* We don't need to do anything here if the context was never used
* in order to propagate commands. */
if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;

/* We don't need to do anything here if the server isn't inside
* a transaction. */
if (!server.propagate_in_transaction) return;

/* If this command is executed from with Lua or MULTI/EXEC we do noy
Expand All @@ -607,9 +612,9 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {

/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrapped around MULTI/EXEC. */
beforePropagateMultiOrExec(0);
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();

/* If this is not a module command context (but is instead a simple
* callback context), we have to handle directly the "also propagate"
Expand Down Expand Up @@ -1707,10 +1712,12 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
* context, we have to setup the op array for the "also propagate" API
* so that RM_Replicate() will work. */
if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
serverAssert(ctx->saved_oparray.ops == NULL);
ctx->saved_oparray = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
}
execCommandPropagateMulti(ctx->client->db->id);
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
}

/* Replicate the specified command and arguments to slaves and AOF, as effect
Expand Down Expand Up @@ -4062,20 +4069,30 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
}
}

/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
if (replicate) moduleReplicateMultiIfNeeded(ctx);
/* We need to use a global replication_allowed flag in order to prevent
* replication of nested RM_Calls. Example:
* 1. module1.foo does RM_Call of module2.bar without replication (i.e. no '!')
* 2. module2.bar internally calls RM_Call of INCR with '!'
* 3. at the end of module1.foo we call RM_ReplicateVerbatim
* We want the replica/AOF to see only module1.foo and not the INCR from module2.bar */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = replicate && server.replication_allowed;

/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP;
if (replicate) {
/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
moduleReplicateMultiIfNeeded(ctx);

if (!(flags & REDISMODULE_ARGV_NO_AOF))
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c,call_flags);
server.replication_allowed = prev_replication_allowed;

serverAssert((c->flags & CLIENT_BLOCKED) == 0);

Expand Down
26 changes: 13 additions & 13 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,30 @@ void discardCommand(client *c) {
addReply(c,shared.ok);
}

void beforePropagateMultiOrExec(int multi) {
if (multi) {
/* Propagating MULTI */
serverAssert(!server.propagate_in_transaction);
server.propagate_in_transaction = 1;
} else {
/* Propagating EXEC */
serverAssert(server.propagate_in_transaction == 1);
server.propagate_in_transaction = 0;
}
void beforePropagateMulti() {
/* Propagating MULTI */
serverAssert(!server.propagate_in_transaction);
server.propagate_in_transaction = 1;
}

void afterPropagateExec() {
/* Propagating EXEC */
serverAssert(server.propagate_in_transaction == 1);
server.propagate_in_transaction = 0;
}

/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
* implementation for more information. */
void execCommandPropagateMulti(int dbid) {
beforePropagateMultiOrExec(1);
beforePropagateMulti();
propagate(server.multiCommand,dbid,&shared.multi,1,
PROPAGATE_AOF|PROPAGATE_REPL);
}

void execCommandPropagateExec(int dbid) {
beforePropagateMultiOrExec(0);
propagate(server.execCommand,dbid,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();
}

/* Aborts a transaction, with a specific error message.
Expand Down Expand Up @@ -254,7 +254,6 @@ void execCommand(client *c) {
if (server.propagate_in_transaction) {
int is_master = server.masterhost == NULL;
server.dirty++;
beforePropagateMultiOrExec(0);
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the
Expand All @@ -264,6 +263,7 @@ void execCommand(client *c) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
afterPropagateExec();
}

server.in_exec = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3158,6 +3158,7 @@ void initServer(void) {
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
Expand Down Expand Up @@ -3502,6 +3503,7 @@ void redisOpArrayFree(redisOpArray *oa) {
zfree(op->argv);
}
zfree(oa->ops);
oa->ops = NULL;
}

/* ====================== Commands lookup and execution ===================== */
Expand Down Expand Up @@ -3552,6 +3554,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (!server.replication_allowed)
return;

/* Propagate a MULTI request once we encounter the first command which
* is a write command.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,7 @@ struct redisServer {
int child_info_nread; /* Num of bytes of the last read from pipe */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
int replication_allowed; /* Are we allowed to replicate? */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
Expand Down Expand Up @@ -1932,7 +1933,8 @@ void flagTransaction(client *c);
void execCommandAbort(client *c, sds error);
void execCommandPropagateMulti(int dbid);
void execCommandPropagateExec(int dbid);
void beforePropagateMultiOrExec(int multi);
void beforePropagateMulti();
void afterPropagateExec();

/* Redis object implementation */
void decrRefCount(robj *o);
Expand Down
74 changes: 74 additions & 0 deletions tests/modules/propagate.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,43 @@ int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK;
}

/* Timer callback. */
void timerNestedHandler(RedisModuleCtx *ctx, void *data) {
int repl = (long long)data;

/* The goal is the trigger a module command that calls RM_Replicate
* in order to test MULTI/EXEC structre */
RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-start","1");
RedisModule_Call(ctx,"propagate-test.nested", repl? "!" : "");
RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-end","1");
}

int propagateTestTimerNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModuleTimerID timer_id =
RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)0);
REDISMODULE_NOT_USED(timer_id);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestTimerNestedReplCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModuleTimerID timer_id =
RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)1);
REDISMODULE_NOT_USED(timer_id);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

/* The thread entry point. */
void *threadMain(void *arg) {
REDISMODULE_NOT_USED(arg);
Expand Down Expand Up @@ -131,6 +168,28 @@ int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK;
}

int propagateTestNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleCallReply *reply;

/* This test mixes multiple propagation systems. */
reply = RedisModule_Call(ctx, "INCR", "c!", "using-call");
RedisModule_FreeCallReply(reply);

RedisModule_Call(ctx,"propagate-test.simple", "!");

RedisModule_Replicate(ctx,"INCR","c","counter-3");
RedisModule_Replicate(ctx,"INCR","c","counter-4");

reply = RedisModule_Call(ctx, "INCR", "c!", "after-call");
RedisModule_FreeCallReply(reply);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
Expand All @@ -143,6 +202,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested",
propagateTestTimerNestedCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested-repl",
propagateTestTimerNestedReplCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.thread",
propagateTestThreadCommand,
"",1,1,1) == REDISMODULE_ERR)
Expand All @@ -158,5 +227,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.nested",
propagateTestNestedCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

return REDISMODULE_OK;
}
53 changes: 53 additions & 0 deletions tests/unit/moduleapi/propagate.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,59 @@ tags "modules" {
close_replication_stream $repl
}

test {module propagates nested ctx case1} {
set repl [attach_to_replication_stream]

$master del timer-nested-start
$master del timer-nested-end
$master propagate-test.timer-nested

wait_for_condition 5000 10 {
[$replica get timer-nested-end] eq "1"
} else {
fail "The two counters don't match the expected value."
}

assert_replication_stream $repl {
{select *}
{multi}
{incrby timer-nested-start 1}
{incrby timer-nested-end 1}
{exec}
}
close_replication_stream $repl
}

test {module propagates nested ctx case2} {
set repl [attach_to_replication_stream]

$master del timer-nested-start
$master del timer-nested-end
$master propagate-test.timer-nested-repl

wait_for_condition 5000 10 {
[$replica get timer-nested-end] eq "1"
} else {
fail "The two counters don't match the expected value."
}

# Note the 'after-call' and 'timer-nested-start' propagation below is out of order (known limitation)
assert_replication_stream $repl {
{select *}
{multi}
{incr using-call}
{incr counter-1}
{incr counter-2}
{incr after-call}
{incr counter-3}
{incr counter-4}
{incrby timer-nested-start 1}
{incrby timer-nested-end 1}
{exec}
}
close_replication_stream $repl
}

test {module propagates from thread} {
set repl [attach_to_replication_stream]

Expand Down

0 comments on commit 3d0b427

Please sign in to comment.