From 45b79265154950fb9baf67af85124b6467264faf Mon Sep 17 00:00:00 2001 From: Ram Prasad Voleti Date: Fri, 5 Jul 2024 05:02:37 +0000 Subject: [PATCH] Replace Dict with Rax for Cluster Nodes Replace Dict with Rax for Cluster Nodes and construct primaries list on the go, instead of maintaining shards/masters list. Signed-off-by: Ram Prasad Voleti --- src/cluster.c | 4 +- src/cluster_legacy.c | 416 +++++++++------------- src/cluster_legacy.h | 4 +- src/commands.def | 6 +- src/commands/cluster-shards.json | 3 + src/rax.h | 1 + tests/cluster/tests/28-cluster-shards.tcl | 21 -- tests/unit/cluster/cluster-shards.tcl | 22 ++ 8 files changed, 211 insertions(+), 266 deletions(-) create mode 100644 tests/unit/cluster/cluster-shards.tcl diff --git a/src/cluster.c b/src/cluster.c index c61c36d768..45fde52842 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -849,10 +849,10 @@ void clusterCommand(client *c) { } else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) { /* CLUSTER SLOTS */ clusterCommandSlots(c); - } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { + } else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc == 2) { /* CLUSTER SHARDS */ clusterCommandShards(c); - } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { + } else if (!strcasecmp(c->argv[1]->ptr, "info") && c->argc == 2) { /* CLUSTER INFO */ sds info = genClusterInfoString(); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index dd33c172e6..f81b1ac414 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -64,8 +64,7 @@ void clusterUpdateState(void); int clusterNodeCoversSlot(clusterNode *n, int slot); list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica); -int clusterNodeAddToMasters(clusterNode *master); -int clusterNodeRemoveFromMasters(clusterNode *master); +list *clusterGetPrimaries(void); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); @@ -147,17 +146,6 @@ static inline int defaultClientPort(void) { /* Fixed timeout value for cluster operations (milliseconds) */ #define CLUSTER_OPERATION_TIMEOUT 2000 -/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to - * clusterNode structures. */ -dictType clusterNodesDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ -}; - /* Cluster re-addition blacklist. This maps node IDs to the time * we can re-add this node. The goal is to avoid reading a removed * node for some time. */ @@ -170,16 +158,6 @@ dictType clusterNodesBlackListDictType = { NULL /* allow to expand */ }; -/* Cluster shards hash table, mapping shard id to list of nodes */ -dictType clusterSdsToListType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - dictListDestructor, /* val destructor */ - NULL /* allow to expand */ -}; - /* Aux fields were introduced in Redis OSS 7.2 to support the persistence * of various important node properties, such as shard id, in nodes.conf. * Aux fields take an explicit format of name=value pairs and have no @@ -237,9 +215,6 @@ int auxShardIdSetter(clusterNode *n, void *value, int length) { return C_ERR; } } - /* Initially, during the load, add every node as master until the respective - * role is assigned with the persisted shard ID. */ - clusterNodeAddToMasters(n); return C_OK; } @@ -324,19 +299,6 @@ typedef struct { * Initialization * -------------------------------------------------------------------------- */ -/* We initially assign a temporary node name, role, and shardID to the nodes other than `myself`. Once we finish the handshake - * with other nodes or once we finish loading all the information from nodes.conf file, we will know the actual information - * of the respective other nodes. A node can be said persisted if we have the permanent information of the node. We intend - * to maintain masters list only after knowing the permanent information of the node. */ -int isMasterPersisted(clusterNode *node) { - for (int j = 0; j < server.cluster->nummasters; j++) { - if (node == server.cluster->masters[j]) { - return 1; - } - } - return 0; -} - /* Load the cluster config from 'filename'. * * If the file does not exist or is zero-length (this may happen because @@ -579,7 +541,9 @@ int clusterLoadConfig(char *filename) { goto fmterr; } primary = clusterLookupNode(argv[3], sdslen(argv[3])); + bool primary_loaded = true; if (!primary) { + primary_loaded = false; primary = createClusterNode(argv[3], 0); clusterAddNode(primary); } @@ -588,26 +552,14 @@ int clusterLoadConfig(char *filename) { * shard_id in this case */ if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { memcpy(n->shard_id, primary->shard_id, CLUSTER_NAMELEN); - } else if (isMasterPersisted(primary) && - memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) - { - /* If the primary has been added to a shard, make sure this + } else if (primary_loaded && memcmp(primary->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0) { + /* If the primary information has already been loaded from the conf file, make sure this * node has the same persisted shard id as the primary. */ sdsfreesplitres(argv, argc); goto fmterr; } - /* Since the role of node is decided as replica, remove it from - * master list which was added initially during the load and continue - * maintain the persisted master list */ - clusterNodeRemoveFromMasters(n); n->replicaof = primary; clusterNodeAddReplica(primary, n); - } else if (auxFieldHandlers[af_shard_id].isPresent(n) == 0) { - /* n is a primary but it does not have a persisted shard_id. - * This happens if we are loading a nodes.conf generated by - * an older version of the server. We should manually update the - * shard membership in this case */ - clusterNodeAddToMasters(n); } /* Set ping sent / pong received timestamps */ @@ -980,9 +932,7 @@ void clusterInit(void) { server.cluster->state = CLUSTER_FAIL; server.cluster->size = 0; server.cluster->todo_before_sleep = 0; - server.cluster->nodes = dictCreate(&clusterNodesDictType); - server.cluster->masters = NULL; - server.cluster->nummasters = 0; + server.cluster->nodes = raxNew(); server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; @@ -1016,7 +966,6 @@ void clusterInit(void) { myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_PRIMARY); serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name); clusterAddNode(myself); - clusterNodeAddToMasters(myself); saveconf = 1; } if (saveconf) clusterSaveConfigOrDie(1); @@ -1096,8 +1045,7 @@ void clusterInitLast(void) { * 6) The new configuration is saved and the cluster state updated. * 7) If the node was a replica, the whole data set is flushed away. */ void clusterReset(int hard) { - dictIterator *di; - dictEntry *de; + raxIterator ri; int j; /* Turn into primary. */ @@ -1115,22 +1063,22 @@ void clusterReset(int hard) { for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j); /* Forget all the nodes, but myself. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node == myself) continue; clusterDelNode(node); + raxSeek(&ri, ">=", ri.key, ri.key_len); } - dictReleaseIterator(di); + raxStop(&ri); /* Empty the nodes blacklist. */ dictEmpty(server.cluster->nodes_black_list, NULL); /* Hard reset only: set epochs to 0, change node ID. */ if (hard) { - sds oldname; - server.cluster->currentEpoch = 0; server.cluster->lastVoteEpoch = 0; myself->configEpoch = 0; @@ -1138,18 +1086,13 @@ void clusterReset(int hard) { /* To change the Node ID we need to remove the old name from the * nodes table, change the ID, and re-add back with new name. */ - oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN); - dictDelete(server.cluster->nodes, oldname); - sdsfree(oldname); + raxRemove(server.cluster->nodes, (unsigned char *)myself->name, CLUSTER_NAMELEN, NULL); getRandomHexChars(myself->name, CLUSTER_NAMELEN); getRandomHexChars(myself->shard_id, CLUSTER_NAMELEN); clusterAddNode(myself); serverLog(LL_NOTICE, "Node hard reset, now I'm %.40s", myself->name); } - /* Re-populate masters */ - clusterNodeAddToMasters(myself); - /* Make sure to persist the new config and update the state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -1318,7 +1261,7 @@ unsigned long getClusterConnectionsCount(void) { /* We decrement the number of nodes by one, since there is the * "myself" node too in the list. Each node uses two file descriptors, * one incoming and one outgoing, thus the multiplication by 2. */ - return server.cluster_enabled ? ((dictSize(server.cluster->nodes) - 1) * 2) : 0; + return server.cluster_enabled ? ((raxSize(server.cluster->nodes) - 1) * 2) : 0; } /* ----------------------------------------------------------------------------- @@ -1488,22 +1431,6 @@ int clusterNodeRemoveReplica(clusterNode *primary, clusterNode *replica) { return C_ERR; } -int clusterNodeRemoveFromMasters(clusterNode *master) { - for (int j = 0; j < server.cluster->nummasters; j++) { - if (server.cluster->masters[j] == master) { - if ((j+1) < server.cluster->nummasters) { - int remaining_masters = (server.cluster->nummasters - j) - 1; - memmove(server.cluster->masters+j,server.cluster->masters+(j+1), - (sizeof(*server.cluster->masters) * remaining_masters)); - } - server.cluster->nummasters--; - master->flags &= ~(CLUSTER_NODE_PRIMARY|CLUSTER_NODE_MIGRATE_TO); - return C_OK; - } - } - return C_ERR; -} - int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { int j; @@ -1518,19 +1445,6 @@ int clusterNodeAddReplica(clusterNode *primary, clusterNode *replica) { return C_OK; } -int clusterNodeAddToMasters(clusterNode *master) { - /* If it's already a master, don't add it again. */ - for (int j = 0; j < server.cluster->nummasters; j++) - if (server.cluster->masters[j] == master) return C_ERR; - server.cluster->masters = zrealloc(server.cluster->masters, - sizeof(clusterNode*)*((server.cluster->nummasters) + 1)); - server.cluster->masters[server.cluster->nummasters] = master; - server.cluster->nummasters++; - qsort(server.cluster->masters, server.cluster->nummasters, sizeof(clusterNode *), clusterNodeNameComparator); - master->flags |= CLUSTER_NODE_PRIMARY; - return C_OK; -} - int clusterCountNonFailingReplicas(clusterNode *n) { int j, ok_replicas = 0; @@ -1541,7 +1455,6 @@ int clusterCountNonFailingReplicas(clusterNode *n) { /* Low level cleanup of the node structure. Only called by clusterDelNode(). */ void freeClusterNode(clusterNode *n) { - sds nodename; int j; /* If the node has associated replicas, we have to set @@ -1550,12 +1463,9 @@ void freeClusterNode(clusterNode *n) { /* Remove this node from the list of replicas of its primary. */ if (nodeIsReplica(n) && n->replicaof) clusterNodeRemoveReplica(n->replicaof, n); - else clusterNodeRemoveFromMasters(n); /* Unlink from the set of nodes. */ - nodename = sdsnewlen(n->name, CLUSTER_NAMELEN); - serverAssert(dictDelete(server.cluster->nodes, nodename) == DICT_OK); - sdsfree(nodename); + serverAssert(raxRemove(server.cluster->nodes, (unsigned char *)n->name, CLUSTER_NAMELEN, NULL) == RAX_OK); sdsfree(n->hostname); sdsfree(n->human_nodename); @@ -1571,8 +1481,9 @@ void freeClusterNode(clusterNode *n) { void clusterAddNode(clusterNode *node) { int retval; - retval = dictAdd(server.cluster->nodes, sdsnewlen(node->name, CLUSTER_NAMELEN), node); - serverAssert(retval == DICT_OK); + retval = raxInsert(server.cluster->nodes, (unsigned char *)node->name, CLUSTER_NAMELEN, node, NULL); + + serverAssert(retval == RAX_OK); } /* Remove a node from the cluster. The function performs the high level @@ -1588,8 +1499,7 @@ void clusterAddNode(clusterNode *node) { */ void clusterDelNode(clusterNode *delnode) { int j; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* 1) Mark slots as unassigned. */ for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -1599,14 +1509,15 @@ void clusterDelNode(clusterNode *delnode) { } /* 2) Remove failure reports. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node == delnode) continue; clusterNodeDelFailureReport(node, delnode); } - dictReleaseIterator(di); + raxStop(&ri); /* 3) Free the node, unlinking it from the cluster. */ freeClusterNode(delnode); @@ -1615,43 +1526,50 @@ void clusterDelNode(clusterNode *delnode) { /* Node lookup by name */ clusterNode *clusterLookupNode(const char *name, int length) { if (verifyClusterNodeId(name, length) != C_OK) return NULL; - sds s = sdsnewlen(name, length); - dictEntry *de = dictFind(server.cluster->nodes, s); - sdsfree(s); - if (de == NULL) return NULL; - return dictGetVal(de); + void *n = NULL; + raxFind(server.cluster->nodes, (unsigned char *)name, length, &n); + return n; } /* Get all the nodes in my shard. - * We ensure that we maintain the master nodes only after they have been assigned with a persisted - * shard ID. Generate the list of nodes in a shard if they are persisted, else return - * NULL. The caller should release the list */ + * Generate the list of nodes in a shard. The caller should release the list */ list *clusterGetNodesInMyShard(clusterNode *node) { - clusterNode *master = clusterNodeGetPrimary(node); + clusterNode *primary = clusterNodeGetPrimary(node); list *l = listCreate(); - listAddNodeTail(l, master); - for (int i = 0; i < master->num_replicas; i++) { - listAddNodeTail(l, master->replicas[i]); + listAddNodeTail(l, primary); + for (int i = 0; i < primary->num_replicas; i++) { + listAddNodeTail(l, primary->replicas[i]); } return l; } +/* Get all the primaries in the cluster. + * Generate the list of primaries in a cluster which have slot coverage. The caller should release the list */ +list *clusterGetPrimaries(void) { + list *l = listCreate(); + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (clusterNodeIsVotingPrimary(node)) { + listAddNodeTail(l, node); + } + } + raxStop(&ri); + return l; +} + /* This is only used after the handshake. When we connect a given IP/PORT * as a result of CLUSTER MEET we don't have the node name yet, so we * pick a random one, and will fix it when we receive the PONG request using * this function. */ void clusterRenameNode(clusterNode *node, char *newname) { - int retval; - sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, node->human_nodename, newname); - retval = dictDelete(server.cluster->nodes, s); - sdsfree(s); - serverAssert(retval == DICT_OK); + serverAssert(raxRemove(server.cluster->nodes, (unsigned char *)node->name, CLUSTER_NAMELEN, NULL) == RAX_OK); memcpy(node->name, newname, CLUSTER_NAMELEN); clusterAddNode(node); - clusterNodeAddToMasters(node); } /* ----------------------------------------------------------------------------- @@ -1662,15 +1580,16 @@ void clusterRenameNode(clusterNode *node, char *newname) { * epoch if greater than any node configEpoch. */ uint64_t clusterGetMaxEpoch(void) { uint64_t max = 0; - dictIterator *di; - dictEntry *de; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (node->configEpoch > max) max = node->configEpoch; } - dictReleaseIterator(di); + raxStop(&ri); if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; return max; } @@ -1940,18 +1859,22 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * specified ip address and port number. This function is used in order to * avoid adding a new handshake node for the same address multiple times. */ int clusterHandshakeInProgress(char *ip, int port, int cport) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + clusterNode *node = NULL; + while (raxNext(&ri)) { + node = ri.data; if (!nodeInHandshake(node)) continue; - if (!strcasecmp(node->ip, ip) && getNodeDefaultClientPort(node) == port && node->cport == cport) break; + if (!strcasecmp(node->ip, ip) && getNodeDefaultClientPort(node) == port && node->cport == cport) { + raxStop(&ri); + return 1; // Return 1 if a matching node is found + } } - dictReleaseIterator(di); - return de != NULL; + raxStop(&ri); + return 0; } /* Start a handshake with the specified address if there is not one @@ -2177,7 +2100,6 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { node->tls_port = msg_tls_port; node->cport = ntohs(g->cport); clusterAddNode(node); - clusterNodeAddToMasters(node); } } @@ -2266,10 +2188,9 @@ void clusterSetNodeAsPrimary(clusterNode *n) { if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO; } n->flags &= ~CLUSTER_NODE_REPLICA; + n->flags |= CLUSTER_NODE_PRIMARY; n->replicaof = NULL; - clusterNodeAddToMasters(n); - /* Update config and state. */ clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -3114,10 +3035,7 @@ int clusterProcessPacket(clusterLink *link) { /* Primary node changed for this replica? */ if (primary && sender->replicaof != primary) { - if (sender->replicaof) - clusterNodeRemoveReplica(sender->replicaof, sender); - else - clusterNodeRemoveFromMasters(sender); + if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender); serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", sender->name, sender->human_nodename, primary->name, primary->human_nodename, sender->shard_id); @@ -3543,17 +3461,17 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) { * some node->link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with node links later. */ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; clusterSendMessage(node->link, msgblock); } - dictReleaseIterator(di); + raxStop(&ri); } /* Build the message header. hdr must point to a buffer at least @@ -3652,7 +3570,7 @@ void clusterSendPing(clusterLink *link, int type) { * nodes available minus two (ourself and the node we are sending the * message to). However practically there may be less valid nodes since * nodes in handshake state, disconnected, are not considered. */ - int freshnodes = dictSize(server.cluster->nodes) - 2; + int freshnodes = (int)raxSize(server.cluster->nodes) - 2; /* How many gossip sections we want to add? 1/10 of the number of nodes * and anyway at least 3. Why 1/10? @@ -3680,7 +3598,7 @@ void clusterSendPing(clusterLink *link, int type) { * Since we have non-voting replicas that lower the probability of an entry * to feature our node, we set the number of entries per packet as * 10% of the total nodes we have. */ - wanted = floor(dictSize(server.cluster->nodes) / 10); + wanted = floor(raxSize(server.cluster->nodes) / 10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; @@ -3706,9 +3624,13 @@ void clusterSendPing(clusterLink *link, int type) { /* Populate the gossip fields */ int maxiterations = wanted * 3; + raxIterator ri; + raxStart(&ri, server.cluster->nodes); while (freshnodes > 0 && gossipcount < wanted && maxiterations--) { - dictEntry *de = dictGetRandomKey(server.cluster->nodes); - clusterNode *this = dictGetVal(de); + raxSeek(&ri, "^", NULL, 0); + raxRandomWalk(&ri, 0); + if (raxEOF(&ri)) break; + clusterNode *this = ri.data; /* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. @@ -3739,15 +3661,14 @@ void clusterSendPing(clusterLink *link, int type) { freshnodes--; gossipcount++; } + raxStop(&ri); /* If there are PFAIL nodes, add them at the end. */ if (pfail_wanted) { - dictIterator *di; - dictEntry *de; - - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL && pfail_wanted > 0) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri) && pfail_wanted > 0) { + clusterNode *node = ri.data; if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; if (node->flags & CLUSTER_NODE_NOADDR) continue; if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; @@ -3758,7 +3679,7 @@ void clusterSendPing(clusterLink *link, int type) { * of PFAIL nodes. */ pfail_wanted--; } - dictReleaseIterator(di); + raxStop(&ri); } /* Compute the actual total length and send! */ @@ -3797,12 +3718,12 @@ void clusterSendPing(clusterLink *link, int type) { #define CLUSTER_BROADCAST_ALL 0 #define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 void clusterBroadcastPong(int target) { - dictIterator *di; - dictEntry *de; + raxIterator ri; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (!node->link) continue; if (node == myself || nodeInHandshake(node)) continue; @@ -3813,7 +3734,7 @@ void clusterBroadcastPong(int target) { } clusterSendPing(node->link, CLUSTERMSG_TYPE_PONG); } - dictReleaseIterator(di); + raxStop(&ri); } /* Create a PUBLISH message block. @@ -4418,8 +4339,7 @@ void clusterHandleReplicaFailover(void) { void clusterHandleReplicaMigration(int max_replicas) { int j, ok_replicas = 0; clusterNode *my_primary = myself->replicaof, *target = NULL, *candidate = NULL; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* Step 1: Don't migrate if the cluster state is not ok. */ if (server.cluster->state != CLUSTER_OK) return; @@ -4442,9 +4362,11 @@ void clusterHandleReplicaMigration(int max_replicas) { * replicas to migrate at the same time, but this is unlikely to * happen and relatively harmless when it does. */ candidate = myself; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + int ok_replicas = 0, is_orphaned = 1; /* We want to migrate only if this primary is working, orphaned, and @@ -4479,7 +4401,7 @@ void clusterHandleReplicaMigration(int max_replicas) { } } } - dictReleaseIterator(di); + raxStop(&ri); /* Step 4: perform the migration if there is a target, and if I'm the * candidate, but only if the primary is continuously orphaned for a @@ -4641,8 +4563,7 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) { /* This is executed 10 times every second */ void clusterCron(void) { - dictIterator *di; - dictEntry *de; + raxIterator ri; int update_state = 0; int orphaned_primaries; /* How many primaries there are without ok replicas. */ int max_replicas; /* Max number of ok replicas for a single primary. */ @@ -4666,9 +4587,11 @@ void clusterCron(void) { /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */ server.cluster->stats_pfail_nodes = 0; /* Run through some of the operations we want to do on each cluster node. */ - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + /* We free the inbound or outboud link to the node if the link has an * oversized message send queue and immediately try reconnecting. */ clusterNodeCronFreeLinkOnBufferLimitReached(node); @@ -4677,7 +4600,7 @@ void clusterCron(void) { */ if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue; } - dictReleaseIterator(di); + raxStop(&ri); /* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ @@ -4686,10 +4609,12 @@ void clusterCron(void) { /* Check a few random nodes and ping the one with the oldest * pong_received time. */ + raxStart(&ri, server.cluster->nodes); for (j = 0; j < 5; j++) { - de = dictGetRandomKey(server.cluster->nodes); - clusterNode *this = dictGetVal(de); - + raxSeek(&ri, "^", NULL, 0); + raxRandomWalk(&ri, 0); + if (raxEOF(&ri)) break; + clusterNode *this = ri.data; /* Don't ping nodes disconnected or with a ping currently active. */ if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; @@ -4698,6 +4623,7 @@ void clusterCron(void) { min_pong = this->pong_received; } } + raxStop(&ri); if (min_pong_node) { serverLog(LL_DEBUG, "Pinging node %.40s", min_pong_node->name); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); @@ -4713,9 +4639,10 @@ void clusterCron(void) { orphaned_primaries = 0; max_replicas = 0; this_replicas = 0; - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; now = mstime(); /* Use an updated time at every iteration. */ if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR | CLUSTER_NODE_HANDSHAKE)) continue; @@ -4797,7 +4724,7 @@ void clusterCron(void) { } } } - dictReleaseIterator(di); + raxStop(&ri); /* If we are a replica node but the replication is still turned off, * enable it if we know the address of our primary and it appears to @@ -4897,16 +4824,17 @@ void bitmapClearBit(unsigned char *bitmap, int pos) { * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the * MIGRATE_TO flag the when a primary gets the first slot. */ int clusterPrimariesHaveReplicas(void) { - dictIterator di; - dictInitIterator(&di, server.cluster->nodes); - dictEntry *de; + raxIterator ri; int replicas = 0; - while ((de = dictNext(&di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (nodeIsReplica(node)) continue; replicas += node->num_replicas; } + raxStop(&ri); return replicas != 0; } @@ -5053,13 +4981,17 @@ void clusterUpdateState(void) { { server.cluster->size = 0; - for (int i = 0; i < server.cluster->nummasters; i++) { - if (server.cluster->masters[i] && server.cluster->masters[i]->numslots) { + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; + if (clusterNodeIsVotingPrimary(node)) { server.cluster->size++; - if ((server.cluster->masters[i]->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0) - reachable_primaries++; + if ((node->flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL)) == 0) reachable_primaries++; } } + raxStop(&ri); } /* If we are in a minority partition, change the cluster state @@ -5194,9 +5126,8 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { serverAssert(myself->numslots == 0); if (clusterNodeIsPrimary(myself)) { - clusterNodeRemoveFromMasters(myself); + myself->flags &= ~(CLUSTER_NODE_PRIMARY | CLUSTER_NODE_MIGRATE_TO); myself->flags |= CLUSTER_NODE_REPLICA; - clusterCloseAllSlots(); } else { if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself); } @@ -5400,15 +5331,15 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) { * configuration file (nodes.conf) for a given node. */ sds clusterGenNodesDescription(client *c, int filter, int tls_primary) { sds ci = sdsempty(), ni; - dictIterator *di; - dictEntry *de; + raxIterator ri; /* Generate all nodes slots info firstly. */ clusterGenNodesSlotsInfo(filter); - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & filter) continue; ni = clusterGenNodeDescription(c, node, tls_primary); @@ -5419,7 +5350,7 @@ sds clusterGenNodesDescription(client *c, int filter, int tls_primary) { /* Release slots info. */ clusterFreeNodesSlotsInfo(node); } - dictReleaseIterator(di); + raxStop(&ri); return ci; } @@ -5463,16 +5394,16 @@ void addReplyClusterLinkDescription(client *c, clusterLink *link) { /* Add to the output buffer of the given client an array of cluster link descriptions, * with array entry being a description of a single current cluster link. */ void addReplyClusterLinksDescription(client *c) { - dictIterator *di; - dictEntry *de; + raxIterator ri; void *arraylen_ptr = NULL; int num_links = 0; arraylen_ptr = addReplyDeferredLen(c); - di = dictGetSafeIterator(server.cluster->nodes); - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->link) { num_links++; addReplyClusterLinkDescription(c, node->link); @@ -5482,7 +5413,7 @@ void addReplyClusterLinksDescription(client *c) { addReplyClusterLinkDescription(c, node->inbound_link); } } - dictReleaseIterator(di); + raxStop(&ri); setDeferredArrayLen(c, arraylen_ptr, num_links); } @@ -5620,14 +5551,15 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { } /* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, clusterNode* primary) { +void addShardReplyForClusterShards(client *c, clusterNode *primary) { addReplyMapLen(c, 2); addReplyBulkCString(c, "slots"); if (primary->slot_info_pairs != NULL) { serverAssert((primary->slot_info_pairs_count % 2) == 0); addReplyArrayLen(c, primary->slot_info_pairs_count); - for (int i = 0; i < primary->slot_info_pairs_count; i++) addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]); + for (int i = 0; i < primary->slot_info_pairs_count; i++) + addReplyLongLong(c, (unsigned long)primary->slot_info_pairs[i]); } else { /* If no slot info pair is provided, the node owns no slots */ addReplyArrayLen(c, 0); @@ -5651,12 +5583,17 @@ void addShardReplyForClusterShards(client *c, clusterNode* primary) { * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ void clusterCommandShards(client *c) { - addReplyArrayLen(c, server.cluster->nummasters); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); - for (int i = 0; i < server.cluster->nummasters; i++) { - addShardReplyForClusterShards(c, server.cluster->masters[i]); + list *primaries = clusterGetPrimaries(); + addReplyArrayLen(c, listLength(primaries)); + listIter li; + listRewind(primaries, &li); + for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { + clusterNode *n = listNodeValue(ln); + addShardReplyForClusterShards(c, n); } + listRelease(primaries); } sds genClusterInfoString(void) { @@ -5690,7 +5627,7 @@ sds genClusterInfoString(void) { "cluster_current_epoch:%llu\r\n" "cluster_my_epoch:%llu\r\n", statestr[server.cluster->state], slots_assigned, slots_ok, slots_pfail, slots_fail, - dictSize(server.cluster->nodes), server.cluster->size, + raxSize(server.cluster->nodes), server.cluster->size, (unsigned long long)server.cluster->currentEpoch, (unsigned long long)nodeEpoch(myself)); /* Show stats about messages sent and received. */ @@ -5772,7 +5709,7 @@ int clusterManualFailoverTimeLimit(void) { } int getClusterSize(void) { - return dictSize(server.cluster->nodes); + return (int)raxSize(server.cluster->nodes); } int getMyShardSlotCount(void) { @@ -5786,13 +5723,15 @@ int getMyShardSlotCount(void) { } char **getClusterNodesList(size_t *numnodes) { - size_t count = dictSize(server.cluster->nodes); + size_t count = raxSize(server.cluster->nodes); char **ids = zmalloc((count + 1) * CLUSTER_NAMELEN); - dictIterator *di = dictGetIterator(server.cluster->nodes); - dictEntry *de; + raxIterator ri; int j = 0; - while ((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); + + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + clusterNode *node = ri.data; if (node->flags & (CLUSTER_NODE_NOADDR | CLUSTER_NODE_HANDSHAKE)) continue; ids[j] = zmalloc(CLUSTER_NAMELEN); memcpy(ids[j], node->name, CLUSTER_NAMELEN); @@ -5801,7 +5740,7 @@ char **getClusterNodesList(size_t *numnodes) { *numnodes = j; ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need * to also get the count argument. */ - dictReleaseIterator(di); + raxStop(&ri); return ids; } @@ -6391,7 +6330,7 @@ int clusterCommandSpecial(client *c) { if (epoch < 0) { addReplyErrorFormat(c, "Invalid config epoch specified: %lld", epoch); - } else if (dictSize(server.cluster->nodes) > 1) { + } else if (raxSize(server.cluster->nodes) > 1) { addReplyError(c, "The user can assign a config epoch only when the " "node does not know any other node."); } else if (myself->configEpoch != 0) { @@ -6539,19 +6478,20 @@ void clusterPromoteSelfToPrimary(void) { } int detectAndUpdateCachedNodeHealth(void) { - dictIterator di; - dictInitIterator(&di, server.cluster->nodes); - dictEntry *de; + raxIterator ri; + raxStart(&ri, server.cluster->nodes); + raxSeek(&ri, "^", NULL, 0); clusterNode *node; int overall_health_changed = 0; - while ((de = dictNext(&di)) != NULL) { - node = dictGetVal(de); + while (raxNext(&ri)) { + node = ri.data; int present_is_node_healthy = isNodeAvailable(node); if (present_is_node_healthy != node->is_node_healthy) { overall_health_changed = 1; node->is_node_healthy = present_is_node_healthy; } } + raxStop(&ri); return overall_health_changed; } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 7118c064b6..a7cdbe13f8 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -320,10 +320,8 @@ struct clusterState { uint64_t currentEpoch; int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ int size; /* Num of primary nodes with at least one slot */ - dict *nodes; /* Hash table of name -> clusterNode structures */ + rax *nodes; /* Table mapping of name -> clusterNode structures */ dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ - clusterNode **masters; /* pointers to master nodes */ - int nummasters; /* Number of master nodes */ clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; diff --git a/src/commands.def b/src/commands.def index c069393fe3..4559c0aefe 100644 --- a/src/commands.def +++ b/src/commands.def @@ -896,7 +896,9 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = { #ifndef SKIP_CMD_TIPS_TABLE /* CLUSTER SHARDS tips */ -#define CLUSTER_SHARDS_Tips NULL +const char *CLUSTER_SHARDS_Tips[] = { +"nondeterministic_output", +}; #endif #ifndef SKIP_CMD_KEY_SPECS_TABLE @@ -1027,7 +1029,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, -{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,0,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, +{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, {MAKE_CMD("slaves","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, diff --git a/src/commands/cluster-shards.json b/src/commands/cluster-shards.json index ec782183fc..e63c129ea9 100644 --- a/src/commands/cluster-shards.json +++ b/src/commands/cluster-shards.json @@ -11,6 +11,9 @@ "LOADING", "STALE" ], + "command_tips": [ + "NONDETERMINISTIC_OUTPUT" + ], "reply_schema": { "description": "A nested list of a map of hash ranges and shard nodes describing individual shards.", "type": "array", diff --git a/src/rax.h b/src/rax.h index c03e0303a0..0ec9031cea 100644 --- a/src/rax.h +++ b/src/rax.h @@ -95,6 +95,7 @@ */ #define RAX_NODE_MAX_SIZE ((1 << 29) - 1) +#define RAX_OK 1 typedef struct raxNode { uint32_t iskey : 1; /* Does this node contain a key? */ uint32_t isnull : 1; /* Associated value is NULL (don't store it). */ diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index 0b607566b0..d6534c816b 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -285,24 +285,3 @@ test "CLUSTER MYSHARDID reports same shard id after cluster restart" { assert_equal [dict get $node_ids $i] [R $i cluster myshardid] } } - -test "Deterministic order of CLUSTER SHARDS response" { - set node_ids {} - for {set j 0} {$j < 8} {incr j} { - set shards_cfg [R $j CLUSTER SHARDS] - set i 0 - foreach shard_cfg $shards_cfg { - set nodes [dict get $shard_cfg nodes] - foreach node $nodes { - if {$j == 0} { - # Save the node ids from the first node response - dict set node_ids $i [dict get $node id] - } else { - # Verify the order of the node ids is the same as the first node response - assert_equal [dict get $node id] [dict get $node_ids $i] - } - incr i - } - } - } -} diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl new file mode 100644 index 0000000000..ae6ade812a --- /dev/null +++ b/tests/unit/cluster/cluster-shards.tcl @@ -0,0 +1,22 @@ +start_cluster 4 4 {tags {external:skip cluster}} { + test "Deterministic order of CLUSTER SHARDS response" { + set node_ids {} + for {set j 0} {$j < 8} {incr j} { + set shards_cfg [R $j CLUSTER SHARDS] + set i 0 + foreach shard_cfg $shards_cfg { + set nodes [dict get $shard_cfg nodes] + foreach node $nodes { + if {$j == 0} { + # Save the node ids from the first node response + dict set node_ids $i [dict get $node id] + } else { + # Verify the order of the node ids is the same as the first node response + assert_equal [dict get $node id] [dict get $node_ids $i] + } + incr i + } + } + } + } +}