Skip to content

Commit

Permalink
ASK redirection and automatic interactive redirections
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 8, 2025
1 parent 5abd45e commit d8aea03
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 17 deletions.
32 changes: 29 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1478,13 +1478,30 @@ You can start using the cluster right away. You can obtain a connected `Redis` i
// Run your query on using the given Redis key / keys.
RESP *reply = redisxRequest(shard, "GET", key, NULL, NULL, &status);
...
```

The interactive queries handle both `MOVED` and `ASK` redirections automatically. However, asynchronous queries do not
since they return before receiving a response. Thus, when using `redisxReadReplyAsync()` later to process replies, you
should check for redirections:

```c
RESP *reply = redisxReadReplyAsync(...);

if(redisxClusterMoved(reply)) {
// The key is now served by another shard.
// You might want to obtain the new shard and try again...
// You might want to obtain the new shard and repeat the failed
// transaction again (interactively or pipelined)...
...
}
if(redisxClusterIsMigrating(reply)) {
// The key's slot is currently migrating. You may try the redirected
// address indicated in the reply, with the ASKING command, e.g. via a
// redisxClusterAskMigrating() interactive transaction.
...
}
...

```
Finally, when you are done using the cluster, simply discard it:
Expand All @@ -1498,7 +1515,7 @@ Finally, when you are done using the cluster, simply discard it:
### Detecting cluster reconfiguration

In the above example we have shown one way you might check for errors that result from cluster being reconfigured
on-the-fly, using `redisxClusterMoved()` on the `RESP` reply obtained from the shard.
on-the-fly, using `redisxClusterMoved()` and/or `redisxClusterIsMigrating()` on the `RESP` reply obtained from the shard.

Equivalently, you might use `redisxCheckRESP()` or `redisxCheckDestroyRESP()` also for detecting a cluster
reconfiguration. Both of these will return a designated `REDIS_MOVED` error code if the keyword is now served on
Expand All @@ -1511,13 +1528,22 @@ another shard:
// The key is now served by another shard.
...
}
if(s == REDIS_MIGRATING) {
// The key is migrating and may be accessed from new location via an ASKING directive
...
}
if(s != X_SUCCESS) {
// The reply is no good for some other reason...
...
}
...
```
To help managing migration errors, we provide `redisxClusterGetRedirection()` to obtain the redirected Redis instance based
on the redirection `RESP`. Once the redirected server shard is identified you may either resubmit the same query as before
(e.h. with `redisxSendArrayRequest()`) if `MOVED`, or else repeat the query via an interactive `ASKING` directive using
`redisxClusterAskMigrating()`.
A `REDIS_MOVED` error code will be returned by higher-level functions also, which ingest the `RESP` replies from the
shard and return a digested error code. For example, `redisxGetStringValue()` will set the output `len` value to
`REDIS_MOVED` if the value could not be obtained because of a cluster reconfiguration.
Expand Down
4 changes: 4 additions & 0 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ enum resp_type {
#define REDIS_UNEXPECTED_RESP (-105) ///< \hideinitializer Got a Redis response of a different type than expected
#define REDIS_UNEXPECTED_ARRAY_SIZE (-106) ///< \hideinitializer Got a Redis response with different number of elements than expected.
#define REDIS_MOVED (-107) ///< \hideinitializer The requested key has moved to another cluster shard.
#define REDIS_MIGRATING (-108) ///< \hideinitializer The requested key is importing, and you may query with ASKED on the specified node.

/**
* RedisX channel IDs. RedisX uses up to three separate connections to the server: (1) an interactive client, in which
Expand Down Expand Up @@ -419,9 +420,12 @@ int redisxSetTLSSkipVerify(Redis *redis, boolean value);
RedisCluster *redisxClusterInit(Redis *node);
Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key);
boolean redisxClusterMoved(const RESP *reply);
boolean redisxClusterIsMigrating(const RESP *reply);
int redisxClusterConnect(RedisCluster *cluster);
int redisxClusterDisconnect(RedisCluster *cluster);
void redisxClusterDestroy(RedisCluster *cluster);
Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh);
RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status);

int redisxPing(Redis *redis, const char *message);
enum redisx_protocol redisxGetProtocol(Redis *redis);
Expand Down
197 changes: 194 additions & 3 deletions src/redisx-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,65 @@ Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key) {
return NULL;
}

/// \cond PRIVATE
static Redis *rClusterGetShardByAddress(RedisCluster *cluster, const char *host, int port, boolean refresh) {
static const char *fn = "redisxClusterGetShard";

ClusterPrivate *p;
int i;

if(!cluster) {
x_error(X_NULL, EINVAL, fn, "cluster is NULL");
return NULL;
}

if(!host) {
x_error(X_NAME_INVALID, EINVAL, fn, "address is NULL");
return NULL;
}

if(!host[0]) {
x_error(X_NAME_INVALID, EINVAL, fn, "address is empty");
return NULL;
}

p = (ClusterPrivate *) cluster->priv;
if(!p) {
x_error(X_NO_INIT, ENXIO, fn, "cluster is not initialized");
return NULL;
}

pthread_mutex_lock(&p->mutex);

for(i = 0; i < p->n_shards; i++) {
const RedisShard *s = &p->shard[i];
int m;

for(m = 0; m < s->n_servers; m++) {
Redis *r = s->redis[m];
const RedisPrivate *np = (RedisPrivate *) r->priv;

if(np->port == port && strcmp(np->hostname, host) == 0) {
if(!redisxIsConnected(r)) if(redisxConnect(r, p->usePipeline) != X_SUCCESS) continue;
pthread_mutex_unlock(&p->mutex);
return r;
}
}
}

pthread_mutex_unlock(&p->mutex);

if(refresh) {
rClusterRefresh(cluster);
return rClusterGetShardByAddress(cluster, host, port, FALSE);
}

x_error(0, EAGAIN, fn, "not a known member of the cluster: %s:%d", host, port);
return NULL;
}

/// \endcond

/**
* Initializes a Redis cluster configuration using a known cluster node. The call will connect to
* the specified node (if not already connected), and will query the cluster configuration from it.
Expand Down Expand Up @@ -587,14 +646,15 @@ int redisxClusterDisconnect(RedisCluster *cluster) {

/**
* Checks if the reply is an error indicating that the cluster has been reconfigured and
* the request can no longer be fulfilled on the given shard. You might want to obtain
* the new shard using redisxClusterGetShard() again, and re-submit the request to the
* new shard.
* the request can no longer be fulfilled on the given shard (i.e., `MOVED` redirection).
* You might want to obtain the new shard using redisxClusterGetShard() again, and re-submit
* the request to the new shard.
*
* @param reply The response obtained from the Redis shard / server.
* @return TRUE (1) if the reply is an error indicating that the cluster has been
* reconfigured and the key has moved to another shard.
*
* @sa redisxClusterIsMigrating()
* @sa redisxClusterGetShard()
*/
boolean redisxClusterMoved(const RESP *reply) {
Expand All @@ -603,3 +663,134 @@ boolean redisxClusterMoved(const RESP *reply) {
if(reply->n < 5) return FALSE;
return (strncmp("MOVED", (char *) reply->value, 5) == 0);
}

/**
* Checks if the reply is an error indicating that the query is for a slot that is currently
* migrating to another shard (i.e., `ASK` redirection). You may need to use an `ASKING`
* directive, e.g. via redisxClusterAskMigrating() on the node specified in the message to
* access the key.
*
* @param reply The response obtained from the Redis shard / server.
* @return TRUE (1) if the reply is an error indicating that the cluster has been
* reconfigured and the key has moved to another shard.
*
* @sa redisxClusterMoved()
* @sa redisxClusterAskMigrating()
*/
boolean redisxClusterIsMigrating(const RESP *reply) {
if(!reply) return FALSE;
if(reply->type != RESP_ERROR) return FALSE;
if(reply->n < 3) return FALSE;
return (strncmp("ASK", (char *) reply->value, 3) == 0);
}

/**
* Parses a `-MOVED` or `-ASK` redirection response from a Redis cluster node, to obtain
* the shard from which the same keyword that caused the error can now be accessed.
*
* @param cluster Redis cluster configuration
* @param redirect the redirection response sent to a keyword query
* @param refresh whether it should refresh the cluster configuration and try again if the
* redirection target is not found in the current cluster configuration.
* @return the migrated server, from which the keyword should be queried now.
*
* @sa redisxClusterMoved()
* @sa redisxClusterIsMigrating()
* @sa redisxClusterAskMigrating()
*/
Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh) {
static const char *fn = "redisxClusterGetRedirect";

char *str, *tok;

if(!cluster) {
x_error(0, EINVAL, fn, "input cluster is NULL");
return NULL;
}

if(!redisxClusterMoved(redirect) && !redisxClusterIsMigrating(redirect)) {
return NULL;
}

str = xStringCopyOf((char *) redirect->value);
x_check_alloc(str);

strtok(str, " \t\r\n"); // MOVED or ASK
strtok(NULL, " \t\r\n"); // SLOT #
tok = strtok(NULL, ":"); // host:port
if(tok) {
const char *host = tok;
int port = strtol(strtok(NULL, " \t\r\n"), NULL, 10);
free(str);
return rClusterGetShardByAddress(cluster, host, port, refresh);
}

x_error(X_PARSE_ERROR, EBADMSG, fn, "Unparseable migration reply: %s", str);

free(str);
return NULL;
}

/**
* Makesa redirected transaction using the ASKING directive to the specific client. This should be
* in response to an -ASK redirection error to obtain a key that is in a slot that is currently
* migrating. The requested Redis command arguments are sent prefixed with the 'ASKING' directive,
* as per the Redis Cluster specification.
*
* @param redis Redirected Redis instance, e.g. from redisxClusterGetRedirect()
* @param args Original command arguments that were redirected
* @param lengths Original argument byte lengths redirected (or NULL to use strlen() automatically).
* @param n Original number of arguments.
* @param status Pointer to integer in which to return status: X_SUCCESS (0) if successful or
* else and error code &lt;0.
* @return The response to the `ASKING` query from the redirected server.
*
* @sa redisxClusterIsMigrating()
* @sa redisxClusterGetRedirect()
* @sa redisxArrayRequest()
*/
RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status) {
static const char *fn = "redisxClusterAsk";

const char **askargs = NULL;
int *asklen = NULL;
RESP *reply;

if(!args) {
x_error(0, EINVAL, fn, "input args is NULL");
if(status) *status = X_NULL;
return NULL;
}

askargs = (const char **) malloc((n + 1) * sizeof(char *));
if(!askargs) {
x_error(0, errno, fn, "alloc error (%d char *)", (n + 1));
if(status) *status = X_FAILURE;
return NULL;
}

if(lengths) {
asklen = (int *) malloc((n + 1) * sizeof(char *));
if(!asklen) {
x_error(0, errno, fn, "alloc error (%d int)", (n + 1));
free(askargs);
if(status) *status = X_FAILURE;
return NULL;
}

asklen[0] = 0;
memcpy(&asklen[1], lengths, n * sizeof(int));
}

askargs[0] = xStringCopyOf("ASKING");
memcpy(&askargs[1], args, n * sizeof(char *));

reply = redisxArrayRequest(redis, askargs, asklen, n + 1, status);

if(asklen) free(asklen);
free(askargs);

return reply;
}


18 changes: 8 additions & 10 deletions src/redisx-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
* @author Attila Kovacs
*/

#define _XOPEN_SOURCE 500 ///< for strdup()

#include <stdio.h>
#include <unistd.h>
#include <string.h>
Expand Down Expand Up @@ -210,8 +208,8 @@ int redisxSetTLS(Redis *redis, const char *ca_path, const char *ca_file) {
tls = &p->config.tls;

tls->enabled = TRUE;
tls->ca_path = ca_path ? strdup(ca_path) : NULL;
tls->ca_certificate = ca_file ? strdup(ca_file) : NULL;
tls->ca_path = xStringCopyOf(ca_path);
tls->ca_certificate = xStringCopyOf(ca_file);

rConfigUnlock(redis);

Expand Down Expand Up @@ -255,8 +253,8 @@ int redisxSetMutualTLS(Redis *redis, const char *cert_file, const char *key_file
p = (RedisPrivate *) redis->priv;
tls = &p->config.tls;

tls->certificate = strdup(cert_file);
tls->key = strdup(key_file);
tls->certificate = xStringCopyOf(cert_file);
tls->key = xStringCopyOf(key_file);

rConfigUnlock(redis);

Expand Down Expand Up @@ -293,7 +291,7 @@ int redisxSetTLSCiphers(Redis *redis, const char *cipher_list) {
p = (RedisPrivate *) redis->priv;
tls = &p->config.tls;

tls->ciphers = cipher_list ? strdup(cipher_list) : NULL;
tls->ciphers = xStringCopyOf(cipher_list);

rConfigUnlock(redis);

Expand Down Expand Up @@ -329,7 +327,7 @@ int redisxSetTLSCipherSuites(Redis *redis, const char *list) {
p = (RedisPrivate *) redis->priv;
tls = &p->config.tls;

tls->cipher_suites = list ? strdup(list) : NULL;
tls->cipher_suites = xStringCopyOf(list);

rConfigUnlock(redis);

Expand Down Expand Up @@ -366,7 +364,7 @@ int redisxSetDHCipherParams(Redis *redis, const char *dh_params_file) {
p = (RedisPrivate *) redis->priv;
tls = &p->config.tls;

tls->dh_params = dh_params_file ? strdup(dh_params_file) : NULL;
tls->dh_params = xStringCopyOf(dh_params_file);

rConfigUnlock(redis);

Expand Down Expand Up @@ -400,7 +398,7 @@ int redisxSetTLSServerName(Redis *redis, const char *host) {
p = (RedisPrivate *) redis->priv;
tls = &p->config.tls;

tls->hostname = host ? strdup(host) : NULL;
tls->hostname = xStringCopyOf(host);

rConfigUnlock(redis);

Expand Down
Loading

0 comments on commit d8aea03

Please sign in to comment.