Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved redirections #9

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1478,15 +1478,36 @@ You can start using the cluster right away. You can obtain a connected `Redis` i

// Run your query on using the given Redis key / keys.
RESP *reply = redisxRequest(shard, "GET", key, NULL, NULL, &status);
...
```

The interactive queries handle both `MOVED` and `ASK` redirections automatically. However, asynchronous queries do not
since they return before receiving a response. Thus, when using `redisxReadReplyAsync()` later to process replies, you
should check for redirections:

```c
RESP *reply = redisxReadReplyAsync(...);

if(redisxClusterMoved(reply)) {
// The key is now served by another shard.
// You might want to obtain the new shard and try again...
// You might want to obtain the new shard and repeat the failed
// transaction again (interactively or pipelined)...
...
}
if(redisxClusterIsMigrating(reply)) {
// The key's slot is currently migrating. You may try the redirected
// address indicated in the reply, with the ASKING command, e.g. via a
// redisxClusterAskMigrating() interactive transaction.
...
}

...

```

As a matter a best practice you should never assume that a given keyword is persistently served by the same shard.
Rather, you should obtain the current shard for the key each time you want to use it with the cluster, and always
check for errors on shard requests, and repeat failed requests on a newly obtained shard if necessary.

Finally, when you are done using the cluster, simply discard it:

```c
Expand All @@ -1498,11 +1519,12 @@ Finally, when you are done using the cluster, simply discard it:
### Detecting cluster reconfiguration

In the above example we have shown one way you might check for errors that result from cluster being reconfigured
on-the-fly, using `redisxClusterMoved()` on the `RESP` reply obtained from the shard.
on-the-fly, using `redisxClusterMoved()` and/or `redisxClusterIsMigrating()` on the `RESP` reply obtained from the
shard.

Equivalently, you might use `redisxCheckRESP()` or `redisxCheckDestroyRESP()` also for detecting a cluster
reconfiguration. Both of these will return a designated `REDIS_MOVED` error code if the keyword is now served on
another shard:
reconfiguration. Both of these will return a designated `REDIS_MOVED` or `REDIS_MIGRATING` error code if the keyword
has moved or is migrating, respectively, to another node, e.g.:

```c
...
Expand All @@ -1511,20 +1533,24 @@ another shard:
// The key is now served by another shard.
...
}
if(s == REDIS_MIGRATING) {
// The key is migrating and may be accessed from new location via an ASKING directive
...
}
if(s != X_SUCCESS) {
// The reply is no good for some other reason...
...
}
...
```

A `REDIS_MOVED` error code will be returned by higher-level functions also, which ingest the `RESP` replies from the
shard and return a digested error code. For example, `redisxGetStringValue()` will set the output `len` value to
`REDIS_MOVED` if the value could not be obtained because of a cluster reconfiguration.
To help manage redirection responses for asynchronous requests, we provide `redisxClusterGetRedirection()` to obtain
the redirected Redis instance based on the redirection `RESP`. Once the redirected cluster shard is identified you may
either resubmit the same query as before (e.h. with `redisxSendArrayRequestAsync()`) if `MOVED`, or else repeat the
query via an interactive `ASKING` directive using `redisxClusterAskMigrating()`.

As a matter a best practice you should never assume that a given keyword is persistently served by the same shard.
Rather, you should obtain the current shard for the key each time you want to use it with the cluster, and always
check for errors on shard requests, and repeat failed requests on a newly obtained shard if necessary.
A `REDIS_MOVED` error code may be returned by higher-level functions also, which ingest the `RESP` replies from the
shard and return a digested error code.


<a name="cluster-explicit-connect"></a>
Expand Down
7 changes: 7 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ enum resp_type {
#define REDIS_UNEXPECTED_RESP (-105) ///< \hideinitializer Got a Redis response of a different type than expected
#define REDIS_UNEXPECTED_ARRAY_SIZE (-106) ///< \hideinitializer Got a Redis response with different number of elements than expected.
#define REDIS_MOVED (-107) ///< \hideinitializer The requested key has moved to another cluster shard.
#define REDIS_MIGRATING (-108) ///< \hideinitializer The requested key is importing, and you may query with ASKED on the specified node.

/**
* RedisX channel IDs. RedisX uses up to three separate connections to the server: (1) an interactive client, in which
Expand Down Expand Up @@ -418,10 +419,14 @@ int redisxSetTLSSkipVerify(Redis *redis, boolean value);

RedisCluster *redisxClusterInit(Redis *node);
Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key);
boolean redisxClusterIsRedirected(const RESP *reply);
boolean redisxClusterMoved(const RESP *reply);
boolean redisxClusterIsMigrating(const RESP *reply);
int redisxClusterConnect(RedisCluster *cluster);
int redisxClusterDisconnect(RedisCluster *cluster);
void redisxClusterDestroy(RedisCluster *cluster);
Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh);
RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status);

int redisxPing(Redis *redis, const char *message);
enum redisx_protocol redisxGetProtocol(Redis *redis);
Expand Down Expand Up @@ -507,6 +512,7 @@ int redisxUnlockClient(RedisClient *cl);
// Asynchronous access routines (use within redisxLockClient()/ redisxUnlockClient() blocks)...
int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg1, const char *arg2, const char *arg3);
int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *length, int n);
int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n);
int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, const char *value, boolean confirm);
int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm);
RESP *redisxReadReplyAsync(RedisClient *cl, int *pStatus);
Expand All @@ -516,6 +522,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl);
int redisxSkipReplyAsync(RedisClient *cl);
int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int length);


// Error generation with stderr message...
int redisxError(const char *func, int errorCode);
const char* redisxErrorDescription(int code);
Expand Down
Binary file modified man/man1/redisx-cli.1.gz
Binary file not shown.
109 changes: 74 additions & 35 deletions src/redisx-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@

#define _POSIX_C_SOURCE 199309L ///< for nanosleep()

// We'll use gcc major version as a proxy for the glibc library to decide which feature macro to use.
// gcc 5.1 was released 2015-04-22...
#ifndef __GNUC__
# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08)
#elif __GNUC__ >= 5
# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08)
#else
# define _BSD_SOURCE ///< strcasecmp() feature macro for glibc <= 2.19
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -29,6 +39,9 @@ static char *delim = "\\n";
static char *groupDelim = "\\n";
static int attrib = 0;

static Redis *redis;
static RedisCluster *cluster;

static void printVersion(const char *name) {
printf("%s %s\n", name, REDISX_VERSION_STRING);
}
Expand Down Expand Up @@ -56,10 +69,23 @@ static void printRESP(const RESP *resp) {
}
}

static void process(Redis *redis, const char **cmdargs, int nargs) {
static void process(const char **cmdargs, int nargs) {
int status = X_SUCCESS;
RESP *reply, *attr = NULL;

if(cluster) {
const char *key = cmdargs[1];

if(nargs > 3) if(strcasecmp("EVAL", cmdargs[0]) == 0 || strcasecmp("EVALSHA", cmdargs[0]) || strcasecmp("FCALL", cmdargs[0]))
key = cmdargs[3];

redis = redisxClusterGetShard(cluster, key);
if(!redis) {
fprintf(stderr, "ERROR! No suitable cluster node found for transaction.");
return;
}
}

reply = redisxArrayRequest(redis, cmdargs, NULL, nargs, &status);
if(!status && attrib) attr = redisxGetAttributes(redis);

Expand Down Expand Up @@ -103,18 +129,18 @@ static int interactive(Redis *redis) {

for(;;) {
char *line = readline(prompt);
char **args;
const char **args;
int nargs;

if(!line) continue;

if(strcmp("quit", line) == 0 || strcmp("exit", line) == 0) break;

poptParseArgvString(line, &nargs, (const char ***) &args);
poptParseArgvString(line, &nargs, &args);

if(args) {
if(nargs > 0) {
process(redis, (const char **) args, nargs);
process(args, nargs);
add_history(line);
}
free(args);
Expand Down Expand Up @@ -164,15 +190,16 @@ static char *readScript(const char *eval) {
* @return The argument list to pass to Redis , of the form:
* `EVAL <script> <NKEYS> [key1 [key2] ...] [arg1 [arg2 ...]]`
*/
static char **setScriptArgs(char *script, char **args, int *nargs) {
static const char **setScriptArgs(char *script, const char **args, int *nargs) {
int i, to = 0, n = *nargs, nkeys = 0;
char keys[20], **a;
char keys[20];
const char **a;

// Count the number of keys on the command-line up to the comma separator
for(nkeys = 0; nkeys < n; nkeys++) if(strcmp(",", args[nkeys])) break;
sprintf(keys, "%d", nkeys);

a = (char **) calloc((n + 2), sizeof(char *));
a = (const char **) calloc((n + 2), sizeof(char *));
x_check_alloc(a);

a[to++] = "EVAL";
Expand Down Expand Up @@ -212,61 +239,62 @@ int main(int argc, const char *argv[]) {
char *cipher_suites = NULL;
char *sni = NULL;
int skip_verify = 0;
int clusterMode = 0;

struct poptOption options[] = { //
{"host", 'h', POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT, &host, 0, "Server hostname.", "<hostname>"}, //
{"port", 'p', POPT_ARG_INT | POPT_ARGFLAG_SHOW_DEFAULT, &port, 0, "Server port.", "<port>"}, //
{"timeout", 't', POPT_ARG_DOUBLE, &timeout, 0, "Server connection timeout (decimals allowed).", "<seconds>"}, //
{"pass", 'a', POPT_ARG_STRING, &password, 0, "Password to use when connecting to the server.", "<password>"}, //
{"user", 0, POPT_ARG_STRING, &user, 0, "Used to send ACL style 'AUTH username pass'. Needs -a.", "<username>"}, //
{"askpass", 0, POPT_ARG_NONE, &askpass, 0, "Force user to input password with mask from STDIN. " //
{"timeout", 't', POPT_ARG_DOUBLE, &timeout, 0, "Server connection timeout (decimals allowed).", "<seconds>"}, //
{"pass", 'a', POPT_ARG_STRING, &password, 0, "Password to use when connecting to the server.", "<password>"}, //
{"user", 0, POPT_ARG_STRING, &user, 0, "Used to send ACL style 'AUTH username pass'. Needs -a.", "<username>"}, //
{"askpass", 0, POPT_ARG_NONE, &askpass, 0, "Force user to input password with mask from STDIN. " //
"If this argument is used, '-a' will be ignored.", NULL //
}, //
{"repeat", 'r', POPT_ARG_INT, &repeat, 0, "Execute specified command this many times.", "<times>"}, //
{"repeat", 'r', POPT_ARG_INT, &repeat, 0, "Execute specified command this many times.", "<times>"}, //
{"interval", 'i', POPT_ARG_DOUBLE | POPT_ARGFLAG_SHOW_DEFAULT, &interval, 0, "When -r is used, waits this many seconds before repeating. " //
"It is possible to specify sub-second times like -i 0.1.", "<seconds>" //
}, //
{"db", 'n', POPT_ARG_INT | POPT_ARGFLAG_SHOW_DEFAULT, &dbIndex, 0, "Database number.", "<index>"}, //
{"RESP2", '2', POPT_ARG_VAL, &protocol, 2, "Start session in RESP2 protocol mode.", NULL}, //
{"RESP3", '3', POPT_ARG_VAL, &protocol, 3, "Start session in RESP3 protocol mode.", NULL}, //
{"stdin", 'x', POPT_ARG_VAL, &input, 0, "Read last argument from STDIN", NULL}, //
{"json", 0, POPT_ARG_NONE, NULL, 'j', "Output in JSON format", NULL}, //
{"raw", 0, POPT_ARG_NONE, NULL, 'r', "Use raw formatting for replies (with delimiters).", NULL}, //
{"RESP2", '2', POPT_ARG_VAL, &protocol, 2, "Start session in RESP2 protocol mode.", NULL}, //
{"RESP3", '3', POPT_ARG_VAL, &protocol, 3, "Start session in RESP3 protocol mode.", NULL}, //
{"stdin", 'x', POPT_ARG_VAL, &input, 0, "Read last argument from STDIN", NULL}, //
{"json", 0, POPT_ARG_NONE, NULL, 'j', "Output in JSON format", NULL}, //
{"raw", 0, POPT_ARG_NONE, NULL, 'r', "Use raw formatting for replies (with delimiters).", NULL}, //
{"delim", 'd', POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT, &delim, 0, "Delimiter between elements for raw format. " //
"You can use JSON convention for escaping special characters.", "<string>" //
},
{"prefix", 'D', POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT, &groupDelim, 0, "Group prefix for raw format. " //
"You can use JSON convention for escaping special characters.", "<string>" //
},
{"tls", 0, POPT_ARG_NONE, &tls, 0, "Establish a secure TLS connection.", NULL}, //
{"sni", 0, POPT_ARG_NONE, &sni, 0, "Server name indication for TLS.", "<host>"}, //
{"cacert", 0, POPT_ARG_STRING, &ca_file, 0, "CA Certificate file to verify with.", "<file>"}, //
{"cacertdir", 0, POPT_ARG_STRING, &ca_path, 0, "Directory where trusted CA certificates are stored. If neither cacert nor cacertdir are "
{"cluster", 'c', POPT_ARG_NONE, &clusterMode, 0, "Enable cluster mode (follow -ASK and -MOVED redirections).", NULL}, //
{"tls", 0, POPT_ARG_NONE, &tls, 0, "Establish a secure TLS connection.", NULL}, //
{"sni", 0, POPT_ARG_NONE, &sni, 0, "Server name indication for TLS.", "<host>"}, //
{"cacert", 0, POPT_ARG_STRING, &ca_file, 0, "CA Certificate file to verify with.", "<file>"}, //
{"cacertdir", 0, POPT_ARG_STRING, &ca_path, 0, "Directory where trusted CA certificates are stored. If neither cacert nor cacertdir are "
"specified, the default system-wide trusted root certs configuration will apply.", "<path>" //
},
{"insecure", 0, POPT_ARG_NONE, &skip_verify,0, "Allow insecure TLS connection by skipping cert validation.", NULL}, //
{"cert", 0, POPT_ARG_STRING, &cert_file, 0, "Client certificate to authenticate with.", "<path>"}, //
{"key", 0, POPT_ARG_STRING, &key_file, 0, "Private key file to authenticate with.", "<path>"}, //
{"tls-ciphers", 0, POPT_ARG_STRING, &ciphers, 0, "Sets the list of preferred ciphers (TLSv1.2 and below) in order of preference from "
{"insecure", 0, POPT_ARG_NONE, &skip_verify, 0, "Allow insecure TLS connection by skipping cert validation.", NULL}, //
{"cert", 0, POPT_ARG_STRING, &cert_file, 0, "Client certificate to authenticate with.", "<path>"}, //
{"key", 0, POPT_ARG_STRING, &key_file, 0, "Private key file to authenticate with.", "<path>"}, //
{"tls-ciphers", 0, POPT_ARG_STRING, &ciphers, 0, "Sets the list of preferred ciphers (TLSv1.2 and below) in order of preference from "
"highest to lowest separated by colon (':').", "<list>"}, //
{"tls-ciphersuites", 0, POPT_ARG_STRING, &cipher_suites, 0, "Sets the list of preferred ciphersuites (TLSv1.3) in order of preference from "
"highest to lowest separated by colon (':').", "<list>"}, //
{"show-pushes", 0, POPT_ARG_STRING | POPT_ARGFLAG_SHOW_DEFAULT, &push, 0, "Whether to print RESP3 PUSH messages.", "yes|no" }, //
{"attributes", 0, POPT_ARG_NONE, &attrib, 0, "Show RESP3 attributes also, if available.", NULL}, //
{"eval", 0, POPT_ARG_STRING, &push, 0, "Send an EVAL command using the Lua script at <file>. " //
{"attributes", 0, POPT_ARG_NONE, &attrib, 0, "Show RESP3 attributes also, if available.", NULL}, //
{"eval", 0, POPT_ARG_STRING, &push, 0, "Send an EVAL command using the Lua script at <file>. " //
"The keyword and other arguments should be separated with a standalone comma on the command-line, such as: 'key1 key2 , arg1 arg2 ...'", "<file>" //
},
{"verbose", 0, POPT_ARG_NONE, &verbose, 0, "Verbose mode.", NULL }, //
{"verbose", 0, POPT_ARG_NONE, &verbose, 0, "Verbose mode.", NULL }, //
{"debug", 0, POPT_ARG_NONE | POPT_ARGFLAG_DOC_HIDDEN, &debug, 0, "Debug mode. Prints all network traffic.", NULL }, //
{"version", 0, POPT_ARG_NONE, NULL, 'v', "Output version and exit.", NULL }, //
{"version", 0, POPT_ARG_NONE, NULL, 'v', "Output version and exit.", NULL }, //

POPT_AUTOHELP POPT_TABLEEND //
};

int rc;
char **cmdargs;
const char **cmdargs;
int i, nargs = 0;
Redis *redis;

poptContext optcon = poptGetContext(fn, argc, argv, options, 0);
poptSetOtherOptionHelp(optcon, "[OPTIONS] [cmd [arg [arg ...]]]");
Expand All @@ -289,7 +317,7 @@ int main(int argc, const char *argv[]) {
delim = xjsonUnescape(delim);
groupDelim = xjsonUnescape(groupDelim);

cmdargs = (char **) poptGetArgs(optcon);
cmdargs = (const char **) poptGetArgs(optcon);

if(askpass) {
password = (char *) malloc(1024);
Expand Down Expand Up @@ -341,7 +369,16 @@ int main(int argc, const char *argv[]) {
}

xSetDebug(1);
prop_error(fn, redisxConnect(redis, 0));

if(clusterMode) {
cluster = redisxClusterInit(redis);
if(cluster) {
redisxDestroy(redis);
redis = NULL;
}
}

if(!cluster) prop_error(fn, redisxConnect(redis, FALSE));

if(!cmdargs) return interactive(redis);

Expand All @@ -355,10 +392,12 @@ int main(int argc, const char *argv[]) {
nanosleep(&sleeptime, NULL);
}

if(nargs) process(redis, (const char **) cmdargs, nargs);
if(nargs) process(cmdargs, nargs);
}

redisxDisconnect(redis);
if(cluster) redisxClusterDisconnect(cluster);
else redisxDisconnect(redis);

poptFreeContext(optcon);

return 0;
Expand Down
1 change: 0 additions & 1 deletion src/redisx-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,6 @@ int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *l
// Send the number of string elements in the command...
L = sprintf(buf, "*%d\r\n", n);


xvprintf("Redis-X> request[%d]", n);
for(i = 0; i < n; i++) {
if(args[i]) xvprintf(" %s", args[i]);
Expand Down
Loading
Loading