From dfc18d41a8f0ecda4a279cc9f90da79e798b52a2 Mon Sep 17 00:00:00 2001 From: Attila Kovacs Date: Wed, 8 Jan 2025 00:33:51 +0100 Subject: [PATCH 1/2] ASK redirection and automatic interactive redirections --- README.md | 48 ++++++-- include/redisx.h | 7 ++ src/redisx-client.c | 1 - src/redisx-cluster.c | 264 ++++++++++++++++++++++++++++++++++++++++++- src/redisx-script.c | 83 +++++++++----- src/redisx-tab.c | 181 +++++++++++++++++++++-------- src/redisx-tls.c | 18 ++- src/redisx.c | 23 +++- src/resp.c | 1 + 9 files changed, 524 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index ef6eb84..743ecf7 100644 --- a/README.md +++ b/README.md @@ -1478,15 +1478,36 @@ You can start using the cluster right away. You can obtain a connected `Redis` i // Run your query on using the given Redis key / keys. RESP *reply = redisxRequest(shard, "GET", key, NULL, NULL, &status); + ... +``` + +The interactive queries handle both `MOVED` and `ASK` redirections automatically. However, asynchronous queries do not +since they return before receiving a response. Thus, when using `redisxReadReplyAsync()` later to process replies, you +should check for redirections: + +```c + RESP *reply = redisxReadReplyAsync(...); + if(redisxClusterMoved(reply)) { // The key is now served by another shard. - // You might want to obtain the new shard and try again... + // You might want to obtain the new shard and repeat the failed + // transaction again (interactively or pipelined)... + ... + } + if(redisxClusterIsMigrating(reply)) { + // The key's slot is currently migrating. You may try the redirected + // address indicated in the reply, with the ASKING command, e.g. via a + // redisxClusterAskMigrating() interactive transaction. ... } - ... + ``` +As a matter a best practice you should never assume that a given keyword is persistently served by the same shard. +Rather, you should obtain the current shard for the key each time you want to use it with the cluster, and always +check for errors on shard requests, and repeat failed requests on a newly obtained shard if necessary. + Finally, when you are done using the cluster, simply discard it: ```c @@ -1498,11 +1519,12 @@ Finally, when you are done using the cluster, simply discard it: ### Detecting cluster reconfiguration In the above example we have shown one way you might check for errors that result from cluster being reconfigured -on-the-fly, using `redisxClusterMoved()` on the `RESP` reply obtained from the shard. +on-the-fly, using `redisxClusterMoved()` and/or `redisxClusterIsMigrating()` on the `RESP` reply obtained from the +shard. Equivalently, you might use `redisxCheckRESP()` or `redisxCheckDestroyRESP()` also for detecting a cluster -reconfiguration. Both of these will return a designated `REDIS_MOVED` error code if the keyword is now served on -another shard: +reconfiguration. Both of these will return a designated `REDIS_MOVED` or `REDIS_MIGRATING` error code if the keyword +has moved or is migrating, respectively, to another node, e.g.: ```c ... @@ -1511,6 +1533,10 @@ another shard: // The key is now served by another shard. ... } + if(s == REDIS_MIGRATING) { + // The key is migrating and may be accessed from new location via an ASKING directive + ... + } if(s != X_SUCCESS) { // The reply is no good for some other reason... ... @@ -1518,13 +1544,13 @@ another shard: ... ``` -A `REDIS_MOVED` error code will be returned by higher-level functions also, which ingest the `RESP` replies from the -shard and return a digested error code. For example, `redisxGetStringValue()` will set the output `len` value to -`REDIS_MOVED` if the value could not be obtained because of a cluster reconfiguration. +To help manage redirection responses for asynchronous requests, we provide `redisxClusterGetRedirection()` to obtain +the redirected Redis instance based on the redirection `RESP`. Once the redirected cluster shard is identified you may +either resubmit the same query as before (e.h. with `redisxSendArrayRequestAsync()`) if `MOVED`, or else repeat the +query via an interactive `ASKING` directive using `redisxClusterAskMigrating()`. -As a matter a best practice you should never assume that a given keyword is persistently served by the same shard. -Rather, you should obtain the current shard for the key each time you want to use it with the cluster, and always -check for errors on shard requests, and repeat failed requests on a newly obtained shard if necessary. +A `REDIS_MOVED` error code may be returned by higher-level functions also, which ingest the `RESP` replies from the +shard and return a digested error code. diff --git a/include/redisx.h b/include/redisx.h index a8df7b6..e4a9683 100644 --- a/include/redisx.h +++ b/include/redisx.h @@ -129,6 +129,7 @@ enum resp_type { #define REDIS_UNEXPECTED_RESP (-105) ///< \hideinitializer Got a Redis response of a different type than expected #define REDIS_UNEXPECTED_ARRAY_SIZE (-106) ///< \hideinitializer Got a Redis response with different number of elements than expected. #define REDIS_MOVED (-107) ///< \hideinitializer The requested key has moved to another cluster shard. +#define REDIS_MIGRATING (-108) ///< \hideinitializer The requested key is importing, and you may query with ASKED on the specified node. /** * RedisX channel IDs. RedisX uses up to three separate connections to the server: (1) an interactive client, in which @@ -418,10 +419,14 @@ int redisxSetTLSSkipVerify(Redis *redis, boolean value); RedisCluster *redisxClusterInit(Redis *node); Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key); +boolean redisxClusterIsRedirected(const RESP *reply); boolean redisxClusterMoved(const RESP *reply); +boolean redisxClusterIsMigrating(const RESP *reply); int redisxClusterConnect(RedisCluster *cluster); int redisxClusterDisconnect(RedisCluster *cluster); void redisxClusterDestroy(RedisCluster *cluster); +Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh); +RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status); int redisxPing(Redis *redis, const char *message); enum redisx_protocol redisxGetProtocol(Redis *redis); @@ -507,6 +512,7 @@ int redisxUnlockClient(RedisClient *cl); // Asynchronous access routines (use within redisxLockClient()/ redisxUnlockClient() blocks)... int redisxSendRequestAsync(RedisClient *cl, const char *command, const char *arg1, const char *arg2, const char *arg3); int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *length, int n); +int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n); int redisxSetValueAsync(RedisClient *cl, const char *table, const char *key, const char *value, boolean confirm); int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm); RESP *redisxReadReplyAsync(RedisClient *cl, int *pStatus); @@ -516,6 +522,7 @@ int redisxIgnoreReplyAsync(RedisClient *cl); int redisxSkipReplyAsync(RedisClient *cl); int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int length); + // Error generation with stderr message... int redisxError(const char *func, int errorCode); const char* redisxErrorDescription(int code); diff --git a/src/redisx-client.c b/src/redisx-client.c index 21776f0..3b1e420 100644 --- a/src/redisx-client.c +++ b/src/redisx-client.c @@ -660,7 +660,6 @@ int redisxSendArrayRequestAsync(RedisClient *cl, const char **args, const int *l // Send the number of string elements in the command... L = sprintf(buf, "*%d\r\n", n); - xvprintf("Redis-X> request[%d]", n); for(i = 0; i < n; i++) { if(args[i]) xvprintf(" %s", args[i]); diff --git a/src/redisx-cluster.c b/src/redisx-cluster.c index 01be8d0..9c7d6f3 100644 --- a/src/redisx-cluster.c +++ b/src/redisx-cluster.c @@ -411,6 +411,65 @@ Redis *redisxClusterGetShard(RedisCluster *cluster, const char *key) { return NULL; } +/// \cond PRIVATE +static Redis *rClusterGetShardByAddress(RedisCluster *cluster, const char *host, int port, boolean refresh) { + static const char *fn = "redisxClusterGetShard"; + + ClusterPrivate *p; + int i; + + if(!cluster) { + x_error(X_NULL, EINVAL, fn, "cluster is NULL"); + return NULL; + } + + if(!host) { + x_error(X_NAME_INVALID, EINVAL, fn, "address is NULL"); + return NULL; + } + + if(!host[0]) { + x_error(X_NAME_INVALID, EINVAL, fn, "address is empty"); + return NULL; + } + + p = (ClusterPrivate *) cluster->priv; + if(!p) { + x_error(X_NO_INIT, ENXIO, fn, "cluster is not initialized"); + return NULL; + } + + pthread_mutex_lock(&p->mutex); + + for(i = 0; i < p->n_shards; i++) { + const RedisShard *s = &p->shard[i]; + int m; + + for(m = 0; m < s->n_servers; m++) { + Redis *r = s->redis[m]; + const RedisPrivate *np = (RedisPrivate *) r->priv; + + if(np->port == port && strcmp(np->hostname, host) == 0) { + if(!redisxIsConnected(r)) if(redisxConnect(r, p->usePipeline) != X_SUCCESS) continue; + pthread_mutex_unlock(&p->mutex); + return r; + } + } + } + + pthread_mutex_unlock(&p->mutex); + + if(refresh) { + rClusterRefresh(cluster); + return rClusterGetShardByAddress(cluster, host, port, FALSE); + } + + x_error(0, EAGAIN, fn, "not a known member of the cluster: %s:%d", host, port); + return NULL; +} + +/// \endcond + /** * Initializes a Redis cluster configuration using a known cluster node. The call will connect to * the specified node (if not already connected), and will query the cluster configuration from it. @@ -587,14 +646,16 @@ int redisxClusterDisconnect(RedisCluster *cluster) { /** * Checks if the reply is an error indicating that the cluster has been reconfigured and - * the request can no longer be fulfilled on the given shard. You might want to obtain - * the new shard using redisxClusterGetShard() again, and re-submit the request to the - * new shard. + * the request can no longer be fulfilled on the given shard (i.e., `MOVED` redirection). + * You might want to obtain the new shard using redisxClusterGetShard() again, and re-submit + * the request to the new shard. * * @param reply The response obtained from the Redis shard / server. * @return TRUE (1) if the reply is an error indicating that the cluster has been * reconfigured and the key has moved to another shard. * + * @sa redisxClusterIsMigrating() + * @sa redisxClusterIsRedirected() * @sa redisxClusterGetShard() */ boolean redisxClusterMoved(const RESP *reply) { @@ -603,3 +664,200 @@ boolean redisxClusterMoved(const RESP *reply) { if(reply->n < 5) return FALSE; return (strncmp("MOVED", (char *) reply->value, 5) == 0); } + +/** + * Checks if the reply is an error indicating that the query is for a slot that is currently + * migrating to another shard (i.e., `ASK` redirection). You may need to use an `ASKING` + * directive, e.g. via redisxClusterAskMigrating() on the node specified in the message to + * access the key. + * + * @param reply The response obtained from the Redis shard / server. + * @return TRUE (1) if the reply is an error indicating that the cluster has been + * reconfigured and the key has moved to another shard. + * + * @sa redisxClusterMoved() + * @sa redisxClusterIsRedirected() + * @sa redisxClusterAskMigrating() + */ +boolean redisxClusterIsMigrating(const RESP *reply) { + if(!reply) return FALSE; + if(reply->type != RESP_ERROR) return FALSE; + if(reply->n < 3) return FALSE; + return (strncmp("ASK", (char *) reply->value, 3) == 0); +} + + +/** + * Checks if the reply is an error indicating that the query should be redirected to another + * node (i.e., `MOVED` or `ASK` redirection). + * + * @param reply The response obtained from the Redis shard / server. + * @return TRUE (1) if the reply is an error indicating that the query should be + * directed to another node. + * + * @sa redisxClusterMoved() + * @sa redisxClusterIsMigrating() + */ +boolean redisxClusterIsRedirected(const RESP *reply) { + return redisxClusterMoved(reply) || redisxClusterIsMigrating(reply); +} + +/** + * Parses a `-MOVED` or `-ASK` redirection response from a Redis cluster node, to obtain + * the shard from which the same keyword that caused the error can now be accessed. + * + * @param cluster Redis cluster configuration + * @param redirect the redirection response sent to a keyword query + * @param refresh whether it should refresh the cluster configuration and try again if the + * redirection target is not found in the current cluster configuration. + * @return the migrated server, from which the keyword should be queried now. + * + * @sa redisxClusterMoved() + * @sa redisxClusterIsMigrating() + * @sa redisxClusterAskMigrating() + */ +Redis *redisxClusterGetRedirection(RedisCluster *cluster, const RESP *redirect, boolean refresh) { + static const char *fn = "redisxClusterGetRedirection"; + + char *str, *tok; + + if(!cluster) { + x_error(0, EINVAL, fn, "input cluster is NULL"); + return NULL; + } + + if(!redisxClusterMoved(redirect) && !redisxClusterIsMigrating(redirect)) { + return NULL; + } + + str = xStringCopyOf((char *) redirect->value); + x_check_alloc(str); + + strtok(str, " \t\r\n"); // MOVED or ASK + strtok(NULL, " \t\r\n"); // SLOT # + tok = strtok(NULL, ":"); // host:port + if(tok) { + const char *host = tok; + int port = strtol(strtok(NULL, " \t\r\n"), NULL, 10); + free(str); + return rClusterGetShardByAddress(cluster, host, port, refresh); + } + + x_error(X_PARSE_ERROR, EBADMSG, fn, "Unparseable migration reply: %s", str); + + free(str); + return NULL; +} + +/** + * Makes a redirected transaction using the ASKING directive to the specific client. This should be + * in response to an -ASK redirection error to obtain a key that is in a slot that is currently + * migrating. The requested Redis command arguments are sent prefixed with the 'ASKING' directive, + * as per the Redis Cluster specification. + * + * @param redis Redirected Redis instance, e.g. from redisxClusterGetRedirect() + * @param args Original command arguments that were redirected + * @param lengths Original argument byte lengths redirected (or NULL to use strlen() automatically). + * @param n Original number of arguments. + * @param status Pointer to integer in which to return status: X_SUCCESS (0) if successful or + * else and error code <0. + * @return The response to the `ASKING` query from the redirected server. + * + * @sa redisxClusterAskMigratingAsync() + * @sa redisxClusterIsMigrating() + * @sa redisxClusterGetRedirect() + * @sa redisxArrayRequest() + */ +RESP *redisxClusterAskMigrating(Redis *redis, const char **args, const int *lengths, int n, int *status) { + static const char *fn = "redisxClusterAskMigrating"; + + RedisClient *cl; + RESP *reply = NULL; + int s; + + s = redisxCheckValid(redis); + if(s != X_SUCCESS) { + if(status) *status = s; + return x_trace_null(fn, NULL); + } + + cl = redis->interactive; + s = redisxLockConnected(cl); + if(s) { + if(status) *status = s; + return x_trace_null(fn, NULL); + } + + redisxClearAttributesAsync(cl); + + s = redisxClusterAskMigratingAsync(cl, args, lengths, n); + if(s == X_SUCCESS) reply = redisxReadReplyAsync(cl, &s); + redisxUnlockClient(cl); + + if(s != X_SUCCESS) { + if(status) *status = s; + x_trace_null(fn, NULL); + } + + return reply; +} + + +/** + * Makes a redirected request using the ASKING directive to the specific client. This should be + * in response to an -ASK redirection error to obtain a key that is in a slot that is currently + * migrating. The requested Redis command arguments are sent prefixed with the 'ASKING' directive, + * as per the Redis Cluster specification. + * + * This function should be called with exclusive access to the client. + * + * @param cl Locked client on a redirected Redis instance, e.g. from redisxClusterGetRedirect() + * @param args Original command arguments that were redirected + * @param lengths Original argument byte lengths redirected (or NULL to use strlen() automatically). + * @param n Original number of arguments. + * @return X_SUCCESS (0) if successful or else and error code <0. + * + * @sa redisxClusterAskMigrating() + * @sa redisxClusterIsMigrating() + * @sa redisxClusterGetRedirect() + * @sa redisxArrayRequest() + */ +int redisxClusterAskMigratingAsync(RedisClient *cl, const char **args, const int *lengths, int n) { + static const char *fn = "redisxClusterAskMigratingAsync"; + + const ClientPrivate *cp; + const char **askargs = NULL; + int *asklen = NULL; + int status = X_SUCCESS; + + prop_error(fn, rCheckClient(cl)); + + cp = (ClientPrivate *) cl->priv; + if(!cp->isEnabled) return x_error(X_NO_SERVICE, ENOTCONN, fn, "client is not connected"); + + if(!args) return x_error(X_NULL, EINVAL, fn, "input args is NULL"); + + askargs = (const char **) malloc((n + 1) * sizeof(char *)); + if(!askargs) return x_error(X_FAILURE, errno, fn, "alloc error (%d char *)", (n + 1)); + + if(lengths) { + asklen = (int *) malloc((n + 1) * sizeof(char *)); + if(!asklen) { + free(askargs); + return x_error(X_FAILURE, errno, fn, "alloc error (%d int)", (n + 1)); + } + + asklen[0] = 0; + memcpy(&asklen[1], lengths, n * sizeof(int)); + } + + askargs[0] = xStringCopyOf("ASKING"); + memcpy(&askargs[1], args, n * sizeof(char *)); + + status = redisxSendArrayRequestAsync(cl, askargs, asklen, n + 1); + + if(asklen) free(asklen); + free(askargs); + + return status; +} diff --git a/src/redisx-script.c b/src/redisx-script.c index ac6a92b..e79e4ac 100644 --- a/src/redisx-script.c +++ b/src/redisx-script.c @@ -50,6 +50,47 @@ int redisxLoadScript(Redis *redis, const char *script, char **sha1) { return X_SUCCESS; } +/** + * Returns an EVAL argument array for the given Redis script and its keys and parameters. + * + * @param sha1 script SHA1 sum + * @param keys NULL-terminated array of Redis keys to pass to script, or NULL if the + * script requires no keys. + * @param params NULL-terminated array of other parameters to pass to script, or NULL + * if the script requires no parameters + * @param[out] nargs number of entries in returned argument array + * @return array containing EVAL command and arguments. + * + * @sa redisxRunScript() + * @sa redisxRunScriptAsync() + */ +static const char **rGetScriptArgs(const char *sha1, const char **keys, const char **params, int *nargs) { + const char **args; + char sn[20]; + int i = 0,k, nkeys = 0, nparams = 0, n; + + if(keys) while(keys[nkeys]) nkeys++; + if(params) while(params[nparams]) nparams++; + + n = 3 + nkeys + nparams; + sprintf(sn, "%d", nkeys); + args = (const char **) malloc(n * sizeof(char *)); + if(!args) { + *nargs = x_error(0, errno, "rGetScriptArgs", "alloc() error (%d char *)", n); + return NULL; + } + + args[i++] = "EVALSHA"; + args[i++] = (char *) sha1; + args[i++] = sn; + + for(k = 0; k < nkeys; k++) args[i++] = (char *) keys[k]; + for(k = 0; k < nparams; k++) args[i++] = (char *) params[k]; + + *nargs = n; + return args; +} + /** * Send a request to runs a LUA script that has been loaded into the Redis database. This function should * be called with the connected client's mutex locked. The call returns as soon as the request has been @@ -72,33 +113,20 @@ int redisxLoadScript(Redis *redis, const char *script, char **sha1) { int redisxRunScriptAsync(RedisClient *cl, const char *sha1, const char **keys, const char **params) { static const char *fn = "redisxRunScriptAsync"; - int i = 0, k, nkeys = 0, nparams = 0, nargs; - char sn[20], **args; + const char **args; + int nargs, status; prop_error(fn, rCheckClient(cl)); if(sha1 == NULL) return x_error(X_NULL, EINVAL, fn, "input script SHA1 sum is NULL"); - if(keys) while(keys[nkeys]) nkeys++; - if(params) while(params[nparams]) nparams++; - - nargs = 3 + nkeys + nparams; - sprintf(sn, "%d", nkeys); - args = (char **) malloc(nargs * sizeof(char *)); - if(!args) return x_error(X_NULL, errno, fn, "malloc() error"); - - args[i++] = "EVALSHA"; - args[i++] = (char *) sha1; - args[i++] = sn; - - for(k = 0; k < nkeys; k++) args[i++] = (char *) keys[k]; - for(k = 0; k < nparams; k++) args[i++] = (char *) params[k]; + args = rGetScriptArgs(sha1, keys, params, &nargs); + prop_error(fn, nargs); - i = redisxSendArrayRequestAsync(cl, (const char **) args, NULL, nargs); + status = redisxSendArrayRequestAsync(cl, (const char **) args, NULL, nargs); free(args); - prop_error(fn, i); - + prop_error(fn, status); return X_SUCCESS; } @@ -122,23 +150,26 @@ int redisxRunScriptAsync(RedisClient *cl, const char *sha1, const char **keys, c RESP *redisxRunScript(Redis *redis, const char *sha1, const char **keys, const char **params, int *status) { static const char *fn = "redisxRunScript"; + const char **args; + int nargs; RESP *reply = NULL; if(redisxCheckValid(redis) != X_SUCCESS) return x_trace_null(fn, NULL); if(sha1 == NULL) { + if(status) *status = X_NULL; x_error(0, EINVAL, fn, "sha1 parameter is NULL"); return NULL; } - if(redisxLockConnected(redis->interactive) != X_SUCCESS) return x_trace_null(fn, NULL); - - if(redisxRunScriptAsync(redis->interactive, sha1, keys, params) == X_SUCCESS) - reply = redisxReadReplyAsync(redis->interactive, status); - - redisxUnlockClient(redis->interactive); + args = rGetScriptArgs(sha1, keys, params, &nargs); + if(!args) { + if(status) *status = nargs; + return x_trace_null(fn, NULL); + } - if(reply == NULL) return x_trace_null(fn, NULL); + reply = redisxArrayRequest(redis, args, NULL, nargs, status); + free(args); return reply; } diff --git a/src/redisx-tab.c b/src/redisx-tab.c index 55724f1..6383e2b 100644 --- a/src/redisx-tab.c +++ b/src/redisx-tab.c @@ -127,18 +127,19 @@ int redisxSetValue(Redis *redis, const char *table, const char *key, const char int status = X_SUCCESS; - prop_error(fn, redisxCheckValid(redis)); - prop_error(fn, redisxLockConnected(redis->interactive)); - - status = redisxSetValueAsync(redis->interactive, table, key, value, confirm); - - if(!status && confirm) { - RESP *reply = redisxReadReplyAsync(redis->interactive, &status); - if(!status) status = redisxCheckRESP(reply, RESP_INT, 0); + if(confirm) { + RESP *reply = (table == NULL) ? + redisxRequest(redis, "SET", key, value, NULL, &status) : + redisxRequest(redis, "HSET", table, key, value, &status); + if(!status) status = redisxCheckRESP(reply, RESP_INT, -1); redisxDestroyRESP(reply); } - - redisxUnlockClient(redis->interactive); + else { + if(redis == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); + prop_error(fn, redisxLockConnected(redis->interactive)); + status = redisxSetValueAsync(redis->interactive, table, key, value, FALSE); + redisxUnlockClient(redis->interactive); + } prop_error(fn, status); return X_SUCCESS; @@ -278,6 +279,90 @@ char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int return str; } +/** + * Checks the input parameters for setting multiple entries at once in a Redis hash table. + * + * @param table Redis hash table name + * @param entries array of key/value pairs + * @param n number of key/value pairs in the array + * @return X_SUCCESS (0) if successful, or else an error code <0 (errno is set + * to EINVAL in case of an error). + * + * @sa redisxMultiSet() + * @sa redisxMultiSetAsync() + */ +static int rCheckMultiSetArgs(const char *table, const RedisEntry *entries, int n) { + static const char *fn = "rCheckMultiSetArgs"; + + if(table == NULL) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is NULL"); + if(!table[0]) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is empty"); + if(entries == NULL) return x_error(X_NULL, EINVAL, fn, "'entries' parameter is NULL"); + if(n < 1) return x_error(X_SIZE_INVALID, EINVAL, fn, "invalid array size: %d", n); + + return X_SUCCESS; +} + + +/** + * Returns a HMSET argument array. + * + * @param table Redis hash table name + * @param entries array of key/value pairs + * @param[in, out] n number of key/value pairs in the array [in], and returning the number + * of arguments in the returned array [out]. + * @return an allocated array containing the HMSET command and its arguments.' + * + * @sa rGetMultiSetLengths() + */ +static const char **rGetMultiSetArgs(const char *table, const RedisEntry *entries, int *n) { + int i, m, N = 2 + ((*n) << 1); + + const char **req = (const char **) malloc(N * sizeof(char *)); + if(!req) { + *n = x_error(X_FAILURE, errno, "rGetMultiSetArgs", "alloc error (%d char *)", N); + return NULL; + } + + req[0] = "HMSET"; // TODO, as of Redis 4.0.0, just use HSET... + req[1] = (char *) table; + + for(i = 0, m = 2; i < *n; i++) { + req[m++] = (char *) entries[i].key; + req[m++] = (char *) entries[i].value; + } + + *n = N; + + return req; +} + +/** + * Returns an array of argument lengths for an HMSET argument array, with the lengths of + * the values set to the nominal lengths of the entry values, such that they may not + * be null-terminated strings. + * + * @param entries array of key/value pairs + * @param n number of key/value pairs in the array + * @return an allocated array containing the actual value lengths. All other + * elements are set to 0, indicating that strlen() may be used on + * those arguments to determine their lengths automatically. + * + * @sa rGetMultiSetArgs() + */ +static int *rGetMultiSetLengths(const RedisEntry *entries, int n) { + int i, m, N = 2 + (n << 1), *L; + + L = (int *) calloc(N, sizeof(int)); + if(!L) { + x_error(0, errno, "rGetMultiSetLengths", "alloc error (%d int)", N); + return NULL; + } + + for(i = 0, m = 3; i < n; i++, m += 2) L[m] = entries[i].length; + + return L; +} + /** * Sets multiple key/value pairs in a given hash table. This function should be called with exclusive * access to the client. @@ -295,40 +380,22 @@ char *redisxGetStringValue(Redis *redis, const char *table, const char *key, int */ int redisxMultiSetAsync(RedisClient *cl, const char *table, const RedisEntry *entries, int n, boolean confirm) { static const char *fn = "redisxMultiSetAsync"; - int i, *L, N, status = X_SUCCESS; + int *L, N, status = X_SUCCESS; const char **req; - if(cl == NULL) return x_error(X_NULL, EINVAL, fn, "redis is NULL"); - if(table == NULL) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is NULL"); - if(!table[0]) return x_error(X_GROUP_INVALID, EINVAL, fn, "table parameter is empty"); - if(entries == NULL) return x_error(X_NULL, EINVAL, fn, "'entries' parameter is NULL"); - if(n < 1) return x_error(X_SIZE_INVALID, EINVAL, fn, "invalid size: %d", n); - - N = (n<<1)+2; + prop_error(fn, rCheckClient(cl)); + prop_error(fn, rCheckMultiSetArgs(table, entries, n)); - req = (const char **) malloc(N * sizeof(char *)); - if(!req) { - fprintf(stderr, "WARNING! Redis-X : alloc %d request components: %s\n", N, strerror(errno)); - return x_trace(fn, NULL, X_FAILURE); - } + N = n; + req = rGetMultiSetArgs(table, entries, &N); + if(!req) return x_trace(fn, NULL, X_FAILURE); - L = (int *) calloc(N, sizeof(int)); + L = rGetMultiSetLengths(entries, n); if(!L) { - fprintf(stderr, "WARNING! Redis-X : alloc %d request sizes: %s\n", N, strerror(errno)); free(req); return x_trace(fn, NULL, X_FAILURE); } - req[0] = "HMSET"; // TODO, as of Redis 4.0.0, just use HSET... - req[1] = (char *) table; - - for(i=0; iinteractive)); - status = redisxMultiSetAsync(redis->interactive, table, entries, n, confirm); - if(status == X_SUCCESS && confirm) { - RESP *reply = redisxReadReplyAsync(redis->interactive, &status); - if(!status) { - status = redisxCheckRESP(reply, RESP_SIMPLE_STRING, 0); - if(!status) if(strcmp(reply->value, "OK")) status = REDIS_ERROR; + if(confirm) { + const char **req; + int *L, N; + RESP *reply; + + prop_error(fn, rCheckMultiSetArgs(table, entries, n)); + + N = n; + req = rGetMultiSetArgs(table, entries, &N); + if(!req) return x_trace(fn, NULL, X_FAILURE); + + L = rGetMultiSetLengths(entries, n); + if(!L) { + free(req); + return x_trace(fn, NULL, X_FAILURE); } + + reply = redisxArrayRequest(redis, req, L, N, &status); + + free(req); + free(L); + + if(!status) if(strcmp("OK", (char *) reply->value)) + status = x_error(REDIS_UNEXPECTED_RESP, EBADMSG, fn, "unexpected HMSET response: %s", (char *) reply->value); + redisxDestroyRESP(reply); } - redisxUnlockClient(redis->interactive); + else { + status = redisxMultiSetAsync(redis->interactive, table, entries, n, confirm); + } prop_error(fn, status); - return X_SUCCESS; } diff --git a/src/redisx-tls.c b/src/redisx-tls.c index cf3b6be..1074be9 100644 --- a/src/redisx-tls.c +++ b/src/redisx-tls.c @@ -5,8 +5,6 @@ * @author Attila Kovacs */ -#define _XOPEN_SOURCE 500 ///< for strdup() - #include #include #include @@ -210,8 +208,8 @@ int redisxSetTLS(Redis *redis, const char *ca_path, const char *ca_file) { tls = &p->config.tls; tls->enabled = TRUE; - tls->ca_path = ca_path ? strdup(ca_path) : NULL; - tls->ca_certificate = ca_file ? strdup(ca_file) : NULL; + tls->ca_path = xStringCopyOf(ca_path); + tls->ca_certificate = xStringCopyOf(ca_file); rConfigUnlock(redis); @@ -255,8 +253,8 @@ int redisxSetMutualTLS(Redis *redis, const char *cert_file, const char *key_file p = (RedisPrivate *) redis->priv; tls = &p->config.tls; - tls->certificate = strdup(cert_file); - tls->key = strdup(key_file); + tls->certificate = xStringCopyOf(cert_file); + tls->key = xStringCopyOf(key_file); rConfigUnlock(redis); @@ -293,7 +291,7 @@ int redisxSetTLSCiphers(Redis *redis, const char *cipher_list) { p = (RedisPrivate *) redis->priv; tls = &p->config.tls; - tls->ciphers = cipher_list ? strdup(cipher_list) : NULL; + tls->ciphers = xStringCopyOf(cipher_list); rConfigUnlock(redis); @@ -329,7 +327,7 @@ int redisxSetTLSCipherSuites(Redis *redis, const char *list) { p = (RedisPrivate *) redis->priv; tls = &p->config.tls; - tls->cipher_suites = list ? strdup(list) : NULL; + tls->cipher_suites = xStringCopyOf(list); rConfigUnlock(redis); @@ -366,7 +364,7 @@ int redisxSetDHCipherParams(Redis *redis, const char *dh_params_file) { p = (RedisPrivate *) redis->priv; tls = &p->config.tls; - tls->dh_params = dh_params_file ? strdup(dh_params_file) : NULL; + tls->dh_params = xStringCopyOf(dh_params_file); rConfigUnlock(redis); @@ -400,7 +398,7 @@ int redisxSetTLSServerName(Redis *redis, const char *host) { p = (RedisPrivate *) redis->priv; tls = &p->config.tls; - tls->hostname = host ? strdup(host) : NULL; + tls->hostname = xStringCopyOf(host); rConfigUnlock(redis); diff --git a/src/redisx.c b/src/redisx.c index cf0a757..7ff332b 100644 --- a/src/redisx.c +++ b/src/redisx.c @@ -650,6 +650,10 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c * roundtrips for each and every request. But, it is simple and perfectly good method when one needs to retrieve * only a few (<1000) variables per second... * + * This is the base interactive query, which is used by all sorts of other interactive transactions. It handles + * `MOVED` and `ASK` redirections for Redis clusters automatically and transparently, so long as the target node + * is a known member of the cluster from before or immediately after the migration message was received. + * * \param redis Pointer to a Redis instance. * \param args An array of strings to send to Redis, corresponding to a single query. * If you have an `char **` array, you may need to cast to `(const char **)` to avoid @@ -662,7 +666,7 @@ RESP *redisxRequest(Redis *redis, const char *command, const char *arg1, const c * \param status Pointer to the return error status. If not NULL, it will be populated with one of: * * X_SUCCESS on success. - * X_NO_INIT if the Redis client librarywas not initialized via redisxInit(). + * X_NO_INIT if the Redis client library was not initialized via redisxInit(). * X_NULL if the argument is NULL or n<1. * X_TIMEDOUT if the reading of the response timed out. * X_NO_SERVICE if not connected to Redis. @@ -714,6 +718,23 @@ RESP *redisxArrayRequest(Redis *redis, const char **args, const int *lengths, in x_trace_null(fn, NULL); } + // Handle -ASK and -MOVED redirections. + if(redisxClusterIsRedirected(reply)) { + boolean ask = redisxClusterIsMigrating(reply); + RedisPrivate *p; + Redis *redirect; + + rConfigLock(redis); + p = (RedisPrivate *) redis->priv; + redirect = redisxClusterGetRedirection(p->cluster, reply, ask); + rConfigUnlock(redis); + + if(redirect) { + if(ask) return redisxClusterAskMigrating(redirect, args, lengths, n, status); + return redisxArrayRequest(redirect, args, lengths, n, status); + } + } + return reply; } diff --git a/src/resp.c b/src/resp.c index 9e29f21..0d014c6 100644 --- a/src/resp.c +++ b/src/resp.c @@ -162,6 +162,7 @@ int redisxCheckRESP(const RESP *resp, enum resp_type expectedType, int expectedS if(resp == NULL) return x_error(X_NULL, EINVAL, fn, "RESP is NULL"); if(redisxClusterMoved(resp)) return x_error(REDIS_MOVED, EAGAIN, fn, "keyword has moved to another cluster shard"); + if(redisxClusterIsMigrating(resp)) return x_error(REDIS_MIGRATING, EAGAIN, fn, "keyword is migrating. Use ASKING on new shard"); if(resp->type == RESP3_BOOLEAN) { if(resp->n != (expectedSize ? 1 : 0)) return x_error(X_FAILURE, EBADMSG, fn, "unexpected boolean value: expected %d, got %d", (expectedSize ? 1 : 0), resp->n); } From b3fddd85454782a41d95822c599b617dc753d08a Mon Sep 17 00:00:00 2001 From: Attila Kovacs Date: Wed, 8 Jan 2025 11:32:02 +0100 Subject: [PATCH 2/2] redisx-cli: Add -c for cluster mode. --- man/man1/redisx-cli.1.gz | Bin 1991 -> 2032 bytes src/redisx-cli.c | 109 ++++++++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 35 deletions(-) diff --git a/man/man1/redisx-cli.1.gz b/man/man1/redisx-cli.1.gz index a52b389f97d3453ae828007b20f244093260b82e..ce13c146cbdb36823574100efc48f90f8058ba96 100644 GIT binary patch literal 2032 zcmV7R%zA-P_d!xklJ>FKJcD-+Odr6!hv$Mk3A zIuUfSsviH~UzZ8>Io1AeZ%z8`0^J`Qzyu8hZ`b3M<%bd>k<{uAzBoTPwfg=%YbzQ3Z?7-@2&Qvttq z&DVA}Ydc@_jW>Jre;9jw@(`R_cai--#rULezT$!{y0!H$NnVZLu14LC15a*xo}ww? zlXROb%RYY=oEz7R{Oa-MVR>_ZCpe*6jREx@Du*~uu$WO$S@$y6JURjbYw58BYarAJ zVpYA3iA>(nYVD-kX8=On1Sq6|H9|-Cg9XulK-=)tQ-Y3aarq3r_sBh=Po2Jp`Akj?B& zi4Fv5uB|P}B9-`C8bA}2qlZ(b#1>7-hSPJB3nT#@u=g%(oej3IdKw*WEcjYFETOO^ zMjEP6!|8*NqIP%LW9yMvw`~@L=PrKAroVy_tBVRrw|YQ zZwi3UBkXE42SjEQ;{y9+O9ijExxkCoB0luV<oY+gB(d!hGKQVAH$`Q{s}5)8_1W}H+LDp4Mm*tNh*5I!6_$3 zY?wQxHRYz-80)Z1Eptbg#Wu)BQn?e0XDqsa;9D$Ir8sP3+V?m3aeOLG9iR)f2Kd z1!vhg`8~~uQ3#tO_HxarDbK0tifb7pRRStE3721&z)KJutS`s>+M#5yZGLsR= z>!g(M@bv=aKb-Aa=#TzqTwlNv-;1j_;wp6K13IxVlG336M59cn{-{YgivaP*X_#oF>`8XTUtOv{6JgM0uj$s}Yb5=EczF>j%9{iJP*<6|y zVJo#NWhyR~SZ~y!pCuiGYp61%D&)a`%iDa+8)sKYW;$^-hbweJRm6xuWGWR~bW|xQ zK))uURIYCuJhF;wc9(T~Y8DP5IP!7{S93O|mrmsu!3VCcT$4@T*n41xg7K(4^D0ACea6f{udF+9M7Wq0bb-1;_6i zwcWjt-FHCd zO=ZA+`AxJ8R{Xzcla&W1{l+t)w#xAqEVNFU0DZN8_>jMY20g7PqtasRIRB5klE`bj z>)DS~klNcRtpai@m8Oz2#)U((9}+O9sYn`Fe4O7NMu*Pxpj9K>b`pHyR2x8sWFO4{ zmNmy1k3Br%Tej`Er})}-nt~$n*|5_DX-cWB>36HcvXpLGQ}A6;gY>WAdCF70ms)2x zPByWwI1HULa7Go)KtThLtpJ^~&VvMDteO1>ou!K2Aq{1bgGyRqokQPRQ}%;3El|W6 zcb|Sw8ZEHmYS>}p_IBXtZ6V?h)3JThceLyq${iB$N zptOWv?N$~Zev$l{J@MzK`SNp7$*h)cD_xmk+5Ij0FJCNRew*J<_yG@VC~cZdIqsiK O5`O|Xi#!5?6951;2=EgC literal 1991 zcmV;&2RQg2iwFP!000001D#jhZ{xTTf9GE@@JkZF*5%{c!!5c6oR2oS=8_=tZtt+( z2U;3gY$#G7DLXFik1u9K*;IBH-S#E1hMbw-4Cgaj+=9bW`PW&Y6`aBO>G_Ydv-8>6 zpWrn6Ir|YN&vg77{3=Z+^+aU(Ej-MBy%7o9at3oKY|}_n&a^TJ%mhEcN2y z0`um-KRiC=ce%(e6V2k&vbtQ(>Sgr|%j&MSJ{Z~HvU>g;kJ@h~%Iev|V~4 z2L|c=*19q|nspxCvU+~MtnTRN8&xyuUx+IqxhTwGi;}cNx{7pV0=kXV#4_-hcvh|w zLFdcr`49eekx*Y#?f>%Dq~A_%lh1OHidr=Z{dzFhVWnPE9J8QPQ)b?%#LekRAq#Ya zM=PcA%fnK+y(iQgKlO0Tas7f_o`91<6x8P$Wt^We3EXR zW!dM?g7e|}Cck>Vds^H*J_t^tR%1ZDgUTU}ld8`msH}UNUmhI+ftB=Ff;A9o1hK3x z`{X6>Xti?E?J@u%?gA9jzzU(G`@w?i49GznY*na1g2w>10_oOWgC=+qETd}CtDJ&$ zpep7kF587f&cF$3tWbK81~_MA5!41j^tLg&Wp#Nd3F0@@Be}vk4%m8lf7TL_sl;}W^ z=GxklEK-TTr2#ZSIeIu{N^H@TY&bn9xj+)o0ef^|>uj)v)zjYL#)7Y`>F^HD z&VR~Iv(xPCBufOg2Z8hiERcSfu1Vw(C6)E(i=TcTi3tZWu~4%lkA*d*Pb3xA7%bRV z!^Ptg3)M*NL27NcSe|46*S)I!CFS9C3Jpr5%o-kZ5D`p@LMFyY?|Wko-9bFX-xL6y zN7&YA_K3{7j|+^+mI_{SLxDH#M0|+J<b{gSm{$X(o!$lwjM;SaImK??0 zxCZGgJhmh8H|q+{AV*S_p;#U8_u4djdKyN3+mjv`L^Bo#g9;FJ@iZ<1T3 zHRYyS8|$!4EprE#^{tPMq;e;2Ua{x`f^V@~td$MLB&b&xhR%(rpyPx3h- z<2JJgyOk>V-V!bXkpos(S%=VLQ)W#TvyGy)-m8^HegKBJ1K+J?J!g*%w0c3-rr0y;nPQkgiKZuAinoHA>6_P_-z3|mA4 z*jHHJtJrJJ0DiX}_Z>=RhFe&(p*n31Ul!;K*%CUA5UHW4CAB1UK8=LQa%ma~Lw;?|i^!wpHapnX@*qIuD|-rc&$9_0Hf5EZbJ^-WW4@W3c0@ z9|050Im_7CCZRMj!%>Hx%;dc!-JD zj~Dm(<}5qsk!yw8ZcnIuSVsfZo&!yRAh`O~YF(q}F^u}Xry|u#C&`7Y9TZku1II`2 zj!wQ0BaGC22V~wT2HY3lM9W~s|BE(RO<)o?lL@syj<;Z;b;<;YRr%>dehUqHS|vs$ zz}RvAA9fm%*LEA(4^)ua+bNCgawCrAgHNt%- z!52=o0c1$_#tdLlbBytL!lSrl+YbALuWhF(C=#CyyGM|wl-ioO1MQcZbR(LA?}{3v ze+Ne|xH}_l=P)G=XY}VBvMf6q`6sS$;$#2+%Ptoex}njCfBdXG!s zkN8;A{c^$6V?V!iQjc`FbKa&^ZIARM89+G^p4L&^hZoND#)F*}u_Qs^}fk@YHiqNh_># z=v!;bH~`b4L!AAN68D+WqA9M1U9;b^_8id`BK}|<1SboGh786&*!yXjM{It5S4K`j zL7TA;Xm);l7V{94mhca|k%fo9NdC-T`18|z@wupER!g^$uFSCP{@VL5zqx_=ef~J% Z2i&iVv=K7puqQG}{0FD~W*Ht6003eD-oF3< diff --git a/src/redisx-cli.c b/src/redisx-cli.c index 344edbf..95caf09 100644 --- a/src/redisx-cli.c +++ b/src/redisx-cli.c @@ -5,6 +5,16 @@ #define _POSIX_C_SOURCE 199309L ///< for nanosleep() +// We'll use gcc major version as a proxy for the glibc library to decide which feature macro to use. +// gcc 5.1 was released 2015-04-22... +#ifndef __GNUC__ +# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08) +#elif __GNUC__ >= 5 +# define _DEFAULT_SOURCE ///< strcasecmp() feature macro starting glibc 2.20 (2014-09-08) +#else +# define _BSD_SOURCE ///< strcasecmp() feature macro for glibc <= 2.19 +#endif + #include #include #include @@ -29,6 +39,9 @@ static char *delim = "\\n"; static char *groupDelim = "\\n"; static int attrib = 0; +static Redis *redis; +static RedisCluster *cluster; + static void printVersion(const char *name) { printf("%s %s\n", name, REDISX_VERSION_STRING); } @@ -56,10 +69,23 @@ static void printRESP(const RESP *resp) { } } -static void process(Redis *redis, const char **cmdargs, int nargs) { +static void process(const char **cmdargs, int nargs) { int status = X_SUCCESS; RESP *reply, *attr = NULL; + if(cluster) { + const char *key = cmdargs[1]; + + if(nargs > 3) if(strcasecmp("EVAL", cmdargs[0]) == 0 || strcasecmp("EVALSHA", cmdargs[0]) || strcasecmp("FCALL", cmdargs[0])) + key = cmdargs[3]; + + redis = redisxClusterGetShard(cluster, key); + if(!redis) { + fprintf(stderr, "ERROR! No suitable cluster node found for transaction."); + return; + } + } + reply = redisxArrayRequest(redis, cmdargs, NULL, nargs, &status); if(!status && attrib) attr = redisxGetAttributes(redis); @@ -103,18 +129,18 @@ static int interactive(Redis *redis) { for(;;) { char *line = readline(prompt); - char **args; + const char **args; int nargs; if(!line) continue; if(strcmp("quit", line) == 0 || strcmp("exit", line) == 0) break; - poptParseArgvString(line, &nargs, (const char ***) &args); + poptParseArgvString(line, &nargs, &args); if(args) { if(nargs > 0) { - process(redis, (const char **) args, nargs); + process(args, nargs); add_history(line); } free(args); @@ -164,15 +190,16 @@ static char *readScript(const char *eval) { * @return The argument list to pass to Redis , of the form: * `EVAL