Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only (re-)send MEET packet once every handshake timeout period #1441

Merged
merged 9 commits into from
Dec 30, 2024
Merged
107 changes: 60 additions & 47 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -1336,9 +1337,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s (%s)",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");
link->inbound ? "inbound" : "outbound",
link->node ? link->node->human_nodename : "<unknown>");

if (link->conn) {
connClose(link->conn);
Expand Down Expand Up @@ -1492,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->last_in_ping_gossip = 0;
node->ping_sent = node->pong_received = 0;
node->data_received = 0;
node->meet_sent = 0;
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
Expand Down Expand Up @@ -1702,7 +1705,7 @@ void clusterAddNode(clusterNode *node) {
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, delnode->human_nodename);

int j;
dictIterator *di;
Expand Down Expand Up @@ -3121,27 +3124,6 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}

if (type == CLUSTERMSG_TYPE_MEET && link->node && nodeInHandshake(link->node)) {
/* If the link is bound to a node and the node is in the handshake state, and we receive
* a MEET packet, it may be that the sender sent multiple MEET packets so in here we are
* dropping the MEET to avoid the assert in setClusterNodeToInboundClusterLink. The assert
* will happen if the other sends a MEET packet because it detects that there is no inbound
* link, this node creates a new node in HANDSHAKE state (with a random node name), and
* respond with a PONG. The other node receives the PONG and removes the CLUSTER_NODE_MEET
* flag. This node is supposed to open an outbound connection to the other node in the next
* cron cycle, but before this happens, the other node re-sends a MEET on the same link
* because it still detects no inbound connection. We improved the re-send logic of MEET in
* #1441, now we will only re-send MEET packet once every handshake timeout period.
*
* Note that in getNodeFromLinkAndMsg, the node in the handshake state has a random name
* and not truly "known", so we don't know the sender. Dropping the MEET packet can prevent
* us from creating a random node, avoid incorrect link binding, and avoid duplicate MEET
* packet eliminate the handshake state. */
serverLog(LL_NOTICE, "Dropping MEET packet from node %.40s because the node is already in handshake state",
link->node->name);
return 1;
}

uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
Expand Down Expand Up @@ -3238,7 +3220,26 @@ int clusterProcessPacket(clusterLink *link) {
}

if (type == CLUSTERMSG_TYPE_MEET) {
if (!sender) {
if (!sender && link->node) {
/* We received a MEET packet on an existing link.
* It means we received a second MEET packet from a node during the handshake
* process before we were able to send a PING packet to that node from our outbound
* connection.
* Here we are avoiding going into the next "else if" branch so as to not assert in
* setClusterNodeToInboundClusterLink() because of link->node not being NULL.
*
* The other sends a MEET packet because it detects that there is no inbound link,
* this node creates a new node in HANDSHAKE state (with a random node name), and
* respond with a PONG. The other node receives the PONG and removes the
* CLUSTER_NODE_MEET flag.
* This node is supposed to open an outbound connection to the other node in the
* next cron cycle, but before this happens, the other node might re-send a MEET on
* the same link because it still detects no inbound connection.
*
* Note that in getNodeFromLinkAndMsg, the node in the handshake state has a random name
* and not truly "known", so we don't know the sender. */
pieturin marked this conversation as resolved.
Show resolved Hide resolved
debugServerAssert(link->inbound && nodeInHandshake(link->node));
pieturin marked this conversation as resolved.
Show resolved Hide resolved
} else if (!sender) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
Expand All @@ -3263,18 +3264,18 @@ int clusterProcessPacket(clusterLink *link) {
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && now - sender->ctime > server.cluster_node_timeout) {
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
/* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
* know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
* Free my outbound link to that node, triggering a reconnect and a PING over the new link.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */

/* We should always receive a MEET packet on an inbound link. */
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
serverAssert(link != sender->link);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node",
sender->name);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node",
sender->name, sender->human_nodename);
freeClusterLink(sender->link);
}
}
Expand Down Expand Up @@ -4040,7 +4041,12 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
clusterMsg *hdr = &msgblock->msg;

if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
if (!link->inbound) {
if (type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
else if (type == CLUSTERMSG_TYPE_MEET)
link->node->meet_sent = mstime();
}

/* Populate the gossip fields */
int maxiterations = wanted * 3;
Expand Down Expand Up @@ -4960,10 +4966,22 @@ void clusterHandleManualFailover(void) {
* CLUSTER cron job
* -------------------------------------------------------------------------- */

static mstime_t getHandshakeTimeout(void) {
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the cluster_node_timeout value, but when cluster_node_timeout is
* too small we use the value of 1 second. */
return max(server.cluster_node_timeout, 1000);
}

static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) {
return now - node->ctime > getHandshakeTimeout() ? 1 : 0;
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}

/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1;
Expand All @@ -4972,19 +4990,22 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d after %lldms", node->ip,
node->cport, handshake_timeout);
if (nodeInHandshake(node) && nodeExceedsHandshakeTimeout(node, now)) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d", node->ip, node->cport);
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
now - node->inbound_link_freed_time > handshake_timeout) {
if (nodeInNormalState(node) && node->link != NULL && node->inbound_link == NULL &&
now - node->inbound_link_freed_time > getHandshakeTimeout() &&
now - node->meet_sent > getHandshakeTimeout()) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view.
* We make sure to not re-send a MEET packet more than once every handshake timeout period, so as to
* leave the other node time to complete the handshake. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s (%s) because there is no inbound link for it",
node->name, node->human_nodename);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

Expand Down Expand Up @@ -5045,19 +5066,11 @@ void clusterCron(void) {
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

iteration++; /* Number of times this function was called so far. */

clusterUpdateMyselfHostname();

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Run through some of the operations we want to do on each cluster node. */
Expand All @@ -5070,7 +5083,7 @@ void clusterCron(void) {
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
if (clusterNodeCronHandleReconnect(node, now)) continue;
}
dictReleaseIterator(di);

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ struct _clusterNode {
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t meet_sent; /* Unix time we sent latest meet packet */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ tags {tls:skip external:skip cluster} {
[CI 0 cluster_stats_messages_meet_received] >= 4 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
fail "Unexpected cluster state: node 1 cluster_state:[CI 1 cluster_state], node 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
Expand Down
Loading