Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide ability for local cluster topology query #808

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 74 additions & 18 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -834,22 +834,19 @@ void clusterCommand(client *c) {

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
clusterCommandHelp(c);
} else if (!strcasecmp(c->argv[1]->ptr, "nodes") && c->argc == 2) {
} else if (!strcasecmp(c->argv[1]->ptr, "nodes") && c->argc <= 3) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE to self: are all these checks really needed as we have command arity and schema validaitons?

/* CLUSTER NODES */
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo());
addReplyVerbatim(c, nodes, sdslen(nodes), "txt");
sdsfree(nodes);
clusterCommandNodes(c);
} else if (!strcasecmp(c->argv[1]->ptr, "myid") && c->argc == 2) {
/* CLUSTER MYID */
clusterCommandMyId(c);
} else if (!strcasecmp(c->argv[1]->ptr, "myshardid") && c->argc == 2) {
/* CLUSTER MYSHARDID */
clusterCommandMyShardId(c);
} else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc == 2) {
} else if (!strcasecmp(c->argv[1]->ptr, "slots") && c->argc <= 3) {
/* CLUSTER SLOTS */
clusterCommandSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc == 2) {
} else if (!strcasecmp(c->argv[1]->ptr, "shards") && c->argc <= 3) {
/* CLUSTER SHARDS */
clusterCommandShards(c);
} else if (!strcasecmp(c->argv[1]->ptr, "info") && c->argc == 2) {
Expand Down Expand Up @@ -1358,10 +1355,14 @@ void clearCachedClusterSlotsResponse(void) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
if (server.cached_cluster_my_slot_info[conn_type]) {
sdsfree(server.cached_cluster_my_slot_info[conn_type]);
server.cached_cluster_my_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(int resp) {
sds generateClusterSlotResponse(int resp, clusterNode *query_node) {
client *recording_client = createCachedResponseClient(resp);
clusterNode *n = NULL;
int num_primaries = 0, start = -1;
Expand All @@ -1379,8 +1380,10 @@ sds generateClusterSlotResponse(int resp) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(recording_client, n, start, i - 1);
num_primaries++;
if (!query_node || n == query_node) {
addNodeReplyForClusterSlot(recording_client, n, start, i - 1);
num_primaries++;
}
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
Expand All @@ -1392,8 +1395,8 @@ sds generateClusterSlotResponse(int resp) {
return cluster_slot_response;
}

int verifyCachedClusterSlotsResponse(sds cached_response, int resp) {
sds generated_response = generateClusterSlotResponse(resp);
int verifyCachedClusterSlotsResponse(sds cached_response, int resp, clusterNode *query_node) {
sds generated_response = generateClusterSlotResponse(resp, query_node);
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal)
Expand All @@ -1402,6 +1405,28 @@ int verifyCachedClusterSlotsResponse(sds cached_response, int resp) {
return is_equal;
}

void clusterCommandNodes(client *c) {
clusterNode *n = NULL;
if (c->argc == 3) {
/* In case we are provided with a specific node to display we fetch the node data */
if (!strcasecmp(c->argv[2]->ptr, "myself")) {
n = getMyClusterNode();
} else {
n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (n == NULL) {
addReplyErrorFormat(c, "Unknown node id: %s", (char *)c->argv[2]->ptr);
return;
}
}
}
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client.
* In case we need to only provide data for a specific node, we will skip
* filling data for other nodes. */
sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo(), n);
addReplyVerbatim(c, nodes, sdslen(nodes), "txt");
sdsfree(nodes);
}

void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
Expand All @@ -1414,21 +1439,52 @@ void clusterCommandSlots(client *c) {
* ... continued until done
*/
int conn_type = 0;
clusterNode *query_node = NULL;
sds reply = NULL;
sds *cache = NULL;
int myself = 0;
if (connIsTLS(c->conn)) conn_type |= CACHE_CONN_TYPE_TLS;
if (isClientConnIpV6(c)) conn_type |= CACHE_CONN_TYPE_IPv6;
if (c->resp == 3) conn_type |= CACHE_CONN_TYPE_RESP3;

if (c->argc == 3) {
/* Fetch data for only specific node slots ownership */
if (!strcasecmp(c->argv[2]->ptr, "myself")) {
query_node = getMyClusterNode();
myself = 1;
} else {
query_node = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (query_node == NULL) {
addReplyErrorFormat(c, "Unknown node id: %s", (char *)c->argv[2]->ptr);
return;
}
}
if (clusterNodeIsReplica(query_node)) {
query_node = clusterNodeGetPrimary(query_node);
if (query_node == NULL) {
addReplyErrorFormat(c, "Node %s is a replica serving no primary", (char *)c->argv[2]->ptr);
return;
}
}
cache = &server.cached_cluster_my_slot_info[conn_type];
} else {
cache = &server.cached_cluster_slot_info[conn_type];
}
if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse(c->resp);
server.cached_cluster_slot_info[conn_type] = cached_reply;
if (query_node && !myself) {
reply = generateClusterSlotResponse(c->resp, query_node);
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply, c->resp) == 1);
if (!(*cache)) {
reply = generateClusterSlotResponse(c->resp, query_node);
(*cache) = reply;
} else {
reply = (*cache);
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(reply, c->resp, query_node) == 1);
}
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
addReplyProto(c, reply, sdslen(reply));
}

/* -----------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded);
unsigned long getClusterConnectionsCount(void);
int isClusterHealthy(void);

sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
sds clusterGenNodesDescription(client *c, int filter, int tls_primary, clusterNode *node);
sds genClusterInfoString(void);
/* handle implementation specific debug cluster commands. Return 1 if handled, 0 otherwise. */
int handleDebugClusterCommand(client *c);
Expand All @@ -72,6 +72,7 @@ int clusterAllowFailoverCmd(client *c);
void clusterPromoteSelfToPrimary(void);
int clusterManualFailoverTimeLimit(void);

void clusterCommandNodes(client *c);
void clusterCommandSlots(client *c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
Expand Down
152 changes: 96 additions & 56 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ int clusterSaveConfig(int do_fsync) {

/* Get the nodes description and concatenate our "vars" directive to
* save currentEpoch and lastVoteEpoch. */
ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0, NULL);
ci = sdscatprintf(ci, "vars currentEpoch %llu lastVoteEpoch %llu\n",
(unsigned long long)server.cluster->currentEpoch,
(unsigned long long)server.cluster->lastVoteEpoch);
Expand Down Expand Up @@ -5396,7 +5396,7 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
* in the slots_info struct on the node. This is used to improve the efficiency
* of clusterGenNodesDescription() because it removes looping of the slot space
* for generating the slot info for each node individually. */
void clusterGenNodesSlotsInfo(int filter) {
void clusterGenNodesSlotsInfo(int filter, clusterNode *query_node) {
clusterNode *n = NULL;
int start = -1;

Expand All @@ -5413,12 +5413,14 @@ void clusterGenNodesSlotsInfo(int filter) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
if (!(n->flags & filter)) {
if (!n->slot_info_pairs) {
n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
if (!query_node || n == query_node) {
if (!n->slot_info_pairs) {
n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
}
serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
n->slot_info_pairs[n->slot_info_pairs_count++] = start;
n->slot_info_pairs[n->slot_info_pairs_count++] = i - 1;
}
serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
n->slot_info_pairs[n->slot_info_pairs_count++] = start;
n->slot_info_pairs[n->slot_info_pairs_count++] = i - 1;
}
if (i == CLUSTER_SLOTS) break;
n = server.cluster->slots[i];
Expand Down Expand Up @@ -5448,28 +5450,35 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) {
* The representation obtained using this function is used for the output
* of the CLUSTER NODES function, and as format for the cluster
* configuration file (nodes.conf) for a given node. */
sds clusterGenNodesDescription(client *c, int filter, int tls_primary) {
sds clusterGenNodesDescription(client *c, int filter, int tls_primary, clusterNode *query_node) {
sds ci = sdsempty(), ni;
dictIterator *di;
dictEntry *de;

/* Generate all nodes slots info firstly. */
clusterGenNodesSlotsInfo(filter);
clusterGenNodesSlotsInfo(filter, query_node);
if (!query_node) {
/* If we need to fetch data for all nodes, iterate over all of them */
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->flags & filter) continue;
ni = clusterGenNodeDescription(c, node, tls_primary);
ci = sdscatsds(ci, ni);
sdsfree(ni);
ci = sdscatlen(ci, "\n", 1);

if (node->flags & filter) continue;
ni = clusterGenNodeDescription(c, node, tls_primary);
/* Release slots info. */
clusterFreeNodesSlotsInfo(node);
}
dictReleaseIterator(di);
} else if (!(query_node->flags & filter)) {
/* query only a single node. No need to iterate over all cluster nodes. */
ni = clusterGenNodeDescription(c, query_node, tls_primary);
ci = sdscatsds(ci, ni);
sdsfree(ni);
ci = sdscatlen(ci, "\n", 1);

/* Release slots info. */
clusterFreeNodesSlotsInfo(node);
}
dictReleaseIterator(di);
return ci;
}

Expand Down Expand Up @@ -5669,52 +5678,83 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
setDeferredMapLen(c, node_replylen, reply_count);
}

/* Helper function in order to generate a single shard reply
* to client c. The dict entry is the server->cluster.shards entry
* holding the specific shard. */
static void addShardReplyForClusterShards(client *c, dictEntry *de) {
list *nodes = dictGetVal(de);
serverAssert(listLength(nodes) > 0);
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Find a node which has the slot information served by this shard. */
clusterNode *n = NULL;
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
n = listNodeValue(ln);
if (n->slot_info_pairs) {
break;
}
}

if (n && n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++) {
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
}
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
}
}

/* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */
void clusterCommandShards(client *c) {
addReplyArrayLen(c, dictSize(server.cluster->shards));
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0);
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
for (dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
list *nodes = dictGetVal(de);
serverAssert(listLength(nodes) > 0);
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");

/* Find a node which has the slot information served by this shard. */
clusterNode *n = NULL;
listIter li;
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
n = listNodeValue(ln);
if (n->slot_info_pairs) {
break;
clusterNode *query_node = NULL;
if (c->argc == 3) {
/* Currently the only extra argument is 'myself' */
query_node = getMyClusterNode();
if (clusterNodeIsReplica(query_node)) {
query_node = clusterNodeGetPrimary(query_node);
if (query_node == NULL) {
addReplyErrorFormat(c, "Node is a replica serving no primary");
return;
}
}
}
/* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0, query_node);

if (n && n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0);
addReplyArrayLen(c, n->slot_info_pairs_count);
for (int i = 0; i < n->slot_info_pairs_count; i++) {
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
}
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
}

addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes));
listRewind(nodes, &li);
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n);
if (!query_node) {
/* Query is for all shards */
addReplyArrayLen(c, dictSize(server.cluster->shards));
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
for (dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
addShardReplyForClusterShards(c, de);
}
dictReleaseIterator(di);
} else {
/* Query is just for my shard*/
addReplyArrayLen(c, 1);
sds sid = sdsnewlen(query_node->shard_id, CLUSTER_NAMELEN);
dictEntry *de = dictFind(server.cluster->shards, sid);
serverAssert(de);
addShardReplyForClusterShards(c, de);
sdsfree(sid);
}
dictReleaseIterator(di);
}

sds genClusterInfoString(void) {
Expand Down
Loading
Loading