Skip to content

Commit

Permalink
Refactor function to prepare cluster commands
Browse files Browse the repository at this point in the history
Rename and remove unnecessary checks in internal function:
command_format_by_slot().

Signed-off-by: Björn Svensson <[email protected]>
  • Loading branch information
bjosv committed Jun 22, 2024
1 parent e3431b6 commit c92d39b
Showing 1 changed file with 17 additions and 60 deletions.
77 changes: 17 additions & 60 deletions src/valkeycluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2422,52 +2422,31 @@ static void *valkey_cluster_command_execute(valkeyClusterContext *cc,
return reply;
}

/*
* Split the command into subcommands by slot
*
* Returns slot_num
* If slot_num < 0 or slot_num >= VALKEYCLUSTER_SLOTS means this function runs
* error; Otherwise if the commands > 1 , slot_num is the last subcommand slot
* number.
*/
static int command_format_by_slot(valkeyClusterContext *cc,
struct cmd *command) {
struct keypos *kp;
int key_count;
int slot_num = -1;

if (cc == NULL || command == NULL || command->cmd == NULL ||
command->clen <= 0) {
goto done;
/* Prepare command by parsing the string to find the key and to get the slot. */
static int prepareCommand(valkeyClusterContext *cc, struct cmd *command) {
if (command->cmd == NULL || command->clen <= 0) {
return VALKEY_ERR;
}

valkey_parse_cmd(command);
if (command->result == CMD_PARSE_ENOMEM) {
__valkeyClusterSetError(cc, VALKEY_ERR_OOM, "Out of memory");
goto done;
} else if (command->result != CMD_PARSE_OK) {
return VALKEY_ERR;
}
if (command->result != CMD_PARSE_OK) {
__valkeyClusterSetError(cc, VALKEY_ERR_PROTOCOL, command->errstr);
goto done;
return VALKEY_ERR;
}

key_count = vkarray_n(command->keys);

if (key_count <= 0) {
if (vkarray_n(command->keys) <= 0) {
__valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"No keys in command(must have keys for valkey cluster mode)");
goto done;
} else if (key_count == 1) {
kp = vkarray_get(command->keys, 0);
slot_num = keyHashSlot(kp->start, kp->end - kp->start);
command->slot_num = slot_num;

goto done;
return VALKEY_ERR;
}

done:

return slot_num;
struct keypos *kp = vkarray_get(command->keys, 0);
command->slot_num = keyHashSlot(kp->start, kp->end - kp->start);
return VALKEY_OK;
}

/* Deprecated function, replaced with valkeyClusterSetOptionMaxRetry() */
Expand Down Expand Up @@ -2505,7 +2484,6 @@ int valkeyClusterSetEventCallback(valkeyClusterContext *cc,
void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd,
int len) {
valkeyReply *reply = NULL;
int slot_num;
struct cmd *command = NULL;

if (cc == NULL) {
Expand All @@ -2521,16 +2499,10 @@ void *valkeyClusterFormattedCommand(valkeyClusterContext *cc, char *cmd,
if (command == NULL) {
goto oom;
}

command->cmd = cmd;
command->clen = len;

slot_num = command_format_by_slot(cc, command);
if (slot_num < 0) {
goto error;
} else if (slot_num >= VALKEYCLUSTER_SLOTS) {
__valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"slot_num is out of range");
if (prepareCommand(cc, command) != VALKEY_OK) {
goto error;
}

Expand Down Expand Up @@ -2669,7 +2641,6 @@ void *valkeyClusterCommandArgv(valkeyClusterContext *cc, int argc,

int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd,
int len) {
int slot_num;
struct cmd *command = NULL;

if (cc->requests == NULL) {
Expand All @@ -2684,17 +2655,10 @@ int valkeyClusterAppendFormattedCommand(valkeyClusterContext *cc, char *cmd,
if (command == NULL) {
goto oom;
}

command->cmd = cmd;
command->clen = len;

slot_num = command_format_by_slot(cc, command);

if (slot_num < 0) {
goto error;
} else if (slot_num >= VALKEYCLUSTER_SLOTS) {
__valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"slot_num is out of range");
if (prepareCommand(cc, command) != VALKEY_OK) {
goto error;
}

Expand Down Expand Up @@ -3565,7 +3529,6 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc,

valkeyClusterContext *cc;
int status = VALKEY_OK;
int slot_num;
valkeyClusterNode *node;
valkeyAsyncContext *ac;
struct cmd *command = NULL;
Expand Down Expand Up @@ -3599,18 +3562,12 @@ int valkeyClusterAsyncFormattedCommand(valkeyClusterAsyncContext *acc,
memcpy(command->cmd, cmd, len);
command->clen = len;

slot_num = command_format_by_slot(cc, command);

if (slot_num < 0) {
if (prepareCommand(cc, command) != VALKEY_OK) {
__valkeyClusterAsyncSetError(acc, cc->err, cc->errstr);
goto error;
} else if (slot_num >= VALKEYCLUSTER_SLOTS) {
__valkeyClusterAsyncSetError(acc, VALKEY_ERR_OTHER,
"slot_num is out of range");
goto error;
}

node = node_get_by_table(cc, (uint32_t)slot_num);
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
/* Initiate a slotmap update since the slot is not served. */
throttledUpdateSlotMapAsync(acc, NULL);
Expand Down

0 comments on commit c92d39b

Please sign in to comment.