Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster-bus] Send a MEET packet to a node if there is no inbound link #1307

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");

if (link->conn) {
connClose(link->conn);
link->conn = NULL;
Expand All @@ -1350,6 +1354,7 @@ void freeClusterLink(clusterLink *link) {
} else if (link->node->inbound_link == link) {
serverAssert(link->inbound);
link->node->inbound_link = NULL;
link->node->inbound_link_freed_time = mstime();
}
}
zfree(link);
Expand Down Expand Up @@ -1489,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
node->inbound_link_freed_time = node->ctime;
memset(node->ip, 0, sizeof(node->ip));
node->announce_client_ipv4 = sdsempty();
node->announce_client_ipv6 = sdsempty();
Expand Down Expand Up @@ -1695,6 +1701,9 @@ void clusterAddNode(clusterNode *node) {
* it is a replica node.
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %s from cluster view", delnode->name);

int j;
dictIterator *di;
dictEntry *de;
Expand Down Expand Up @@ -2077,7 +2086,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
/* Return 1 if we already have a node in HANDSHAKE state matching the
* specified ip address and port number. This function is used in order to
* avoid adding a new handshake node for the same address multiple times. */
int clusterHandshakeInProgress(char *ip, int port, int cport) {
static int clusterHandshakeInProgress(char *ip, int port, int cport) {
dictIterator *di;
dictEntry *de;

Expand All @@ -2099,7 +2108,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) {
*
* EAGAIN - There is already a handshake in progress for this address.
* EINVAL - IP or port are not valid. */
int clusterStartHandshake(char *ip, int port, int cport) {
static int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
Expand Down Expand Up @@ -3224,12 +3233,12 @@ int clusterProcessPacket(clusterLink *link) {
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) clusterProcessGossipSection(hdr, link);
/* If this is a MEET packet from an unknown node, we still process
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
}
Comment on lines -3227 to +3241
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but this double if with the same condition was driving me crazy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind this. But in general we avoid changes to the lines of code not related to the PR.


/* Anyway reply with a PONG */
clusterSendPing(link, CLUSTERMSG_TYPE_PONG);
Expand All @@ -3240,7 +3249,7 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
if (sender && nodeInMeetState(sender)) {
/* Once we get a response for MEET from the sender, we can stop sending more MEET. */
sender->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name,
Expand Down Expand Up @@ -3665,7 +3674,7 @@ void clusterLinkConnectHandler(connection *conn) {
* of a PING one, to force the receiver to add us in its node
* table. */
mstime_t old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
clusterSendPing(link, nodeInMeetState(node) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
Expand All @@ -3685,6 +3694,9 @@ void clusterLinkConnectHandler(connection *conn) {
*/

serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport);
if (nodeInMeetState(node)) {
serverLog(LL_DEBUG, "Sending MEET packet on connection to node %.40s", node->name);
}
}

/* Performs sanity check on the message signature and length depending on the type. */
Expand Down Expand Up @@ -3744,7 +3756,9 @@ void clusterReadHandler(connection *conn) {

if (nread <= 0) {
/* I/O error... */
serverLog(LL_DEBUG, "I/O error reading from node link: %s",
serverLog(LL_DEBUG, "I/O error reading from node link (%.40s:%s): %s",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound",
(nread == 0) ? "connection closed" : connGetLastError(conn));
handleLinkIOError(link);
return;
Expand Down Expand Up @@ -4909,9 +4923,20 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_
/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
serverLog(LL_NOTICE, "Clusterbus handshake timeout [%s]:%d after a timeout of %lld",
node->ip, node->cport, handshake_timeout);
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
now - node->inbound_link_freed_time > handshake_timeout) {
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

if (node->link == NULL) {
clusterLink *link = createClusterLink(node);
Expand Down
4 changes: 4 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ typedef struct clusterLink {
#define nodeIsPrimary(n) ((n)->flags & CLUSTER_NODE_PRIMARY)
#define nodeIsReplica(n) ((n)->flags & CLUSTER_NODE_REPLICA)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeInMeetState(n) ((n)->flags & CLUSTER_NODE_MEET)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL)
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down Expand Up @@ -341,6 +343,8 @@ struct _clusterNode {
mstime_t voted_time; /* Last time we voted for a replica of this primary */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
mstime_t inbound_link_freed_time; /* Last time we freed the inbound link for this node.
If it was never freed, it is the same as ctime */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds announce_client_ipv4; /* IPv4 for clients only. */
Expand Down
9 changes: 9 additions & 0 deletions tests/support/cluster_util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ proc get_cluster_nodes {id {status "*"}} {
return $nodes
}

# Returns the parsed myself node entry as a dictionary.
proc get_myself id {
set nodes [get_cluster_nodes $id]
foreach n $nodes {
if {[cluster_has_flag $n myself]} {return $n}
}
return {}
}

# Returns 1 if no node knows node_id, 0 if any node knows it.
proc node_is_forgotten {node_id} {
for {set j 0} {$j < [llength $::servers]} {incr j} {
Expand Down
205 changes: 202 additions & 3 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ set old_singledb $::singledb
set ::singledb 1

tags {tls:skip external:skip cluster} {
set CLUSTER_PACKET_TYPE_PING 0
set CLUSTER_PACKET_TYPE_PONG 1
set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1
set CLUSTER_PACKET_TYPE_ALL -2

set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
Expand All @@ -22,9 +28,6 @@ tags {tls:skip external:skip cluster} {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
Expand Down Expand Up @@ -75,3 +78,199 @@ tags {tls:skip external:skip cluster} {

set ::singledb $old_singledb

proc cluster_get_first_node_in_handshake id {
pieturin marked this conversation as resolved.
Show resolved Hide resolved
set nodes [get_cluster_nodes $id]
foreach n $nodes {
if {[cluster_has_flag $n handshake]} {
return [dict get $n id]
}
}
return {}
}

proc cluster_3_nodes_all_know_each_other {} {
set node0_id [dict get [get_myself 0] id]
set node1_id [dict get [get_myself 1] id]
set node2_id [dict get [get_myself 2] id]

if {
[cluster_get_node_by_id 0 $node0_id] != {} &&
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {} &&
[cluster_get_node_by_id 1 $node0_id] != {} &&
[cluster_get_node_by_id 1 $node1_id] != {} &&
[cluster_get_node_by_id 1 $node2_id] != {} &&
[cluster_get_node_by_id 2 $node0_id] != {} &&
[cluster_get_node_by_id 2 $node1_id] != {} &&
[cluster_get_node_by_id 2 $node2_id] != {} &&
[llength [R 0 CLUSTER LINKS]] == 4 &&
[llength [R 1 CLUSTER LINKS]] == 4 &&
[llength [R 2 CLUSTER LINKS]] == 4
} {
return 1
} else {
return 0
}
}
Comment on lines +91 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From ChatGPT:

Suggested change
proc cluster_3_nodes_all_know_each_other {} {
set node0_id [dict get [get_myself 0] id]
set node1_id [dict get [get_myself 1] id]
set node2_id [dict get [get_myself 2] id]
if {
[cluster_get_node_by_id 0 $node0_id] != {} &&
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {} &&
[cluster_get_node_by_id 1 $node0_id] != {} &&
[cluster_get_node_by_id 1 $node1_id] != {} &&
[cluster_get_node_by_id 1 $node2_id] != {} &&
[cluster_get_node_by_id 2 $node0_id] != {} &&
[cluster_get_node_by_id 2 $node1_id] != {} &&
[cluster_get_node_by_id 2 $node2_id] != {} &&
[llength [R 0 CLUSTER LINKS]] == 4 &&
[llength [R 1 CLUSTER LINKS]] == 4 &&
[llength [R 2 CLUSTER LINKS]] == 4
} {
return 1
} else {
return 0
}
}
proc cluster_nodes_all_know_each_other {num_nodes} {
# Collect node IDs dynamically
set node_ids {}
for {set i 0} {$i < $num_nodes} {incr i} {
lappend node_ids [dict get [get_myself $i] id]
}
# Check if all nodes know each other
foreach node_id $node_ids {
foreach check_node_id $node_ids {
for {set node_index 0} {$node_index < $num_nodes} {incr node_index} {
if {[cluster_get_node_by_id $node_index $check_node_id] == {}} {
return 0
}
}
}
}
# Verify cluster link counts for each node
set expected_links [expr {2 * ($num_nodes - 1)}]
for {set i 0} {$i < $num_nodes} {incr i} {
if {[llength [R $i CLUSTER LINKS]] != $expected_links} {
return 0
}
}
return 1
}


start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} {
set CLUSTER_PACKET_TYPE_PING 0
set CLUSTER_PACKET_TYPE_PONG 1
set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1
set CLUSTER_PACKET_TYPE_ALL -2

test "Handshake eventually succeeds after node handshake timeout on both sides with inconsistent view of the cluster" {
set cluster_port [find_available_port $::baseport $::portcount]
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] {
# In this test we will trigger a handshake timeout on both sides of the handshake.
# Node 1 and 2 already know each other, then we make node 1 meet node 0:
#
# Node 1 -- MEET -> Node 0 [Node 0 might learn about Node 2 from the gossip section of the msg]
# Node 1 <- PONG -- Node 0 [we drop this message, so Node 1 will eventually mark the handshake as timed out]
# Node 1 <- PING -- Node 0 [we drop this message, so Node 1 will never send a PONG and Node 0 will eventually mark the handshake as timed out]
#
# After the handshake is timed out, we allow all cluster bus messages to go through.
# Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake.

set node0_id [dict get [get_myself 0] id]
set node1_id [dict get [get_myself 1] id]
set node2_id [dict get [get_myself 2] id]

# Drop all cluster bus messages
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_ALL
# Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2.
R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET

R 1 CLUSTER MEET [srv 0 host] [srv 0 port] $cluster_port

# Wait for Node 0 to be in handshake
wait_for_condition 10 400 {
[cluster_get_first_node_in_handshake 0] != {}
} else {
fail "Node 0 never entered handshake state"
}

# We want Node 0 to learn about Node 2 through the gossip section of the MEET message
set meet_retry 0
while {[cluster_get_node_by_id 0 $node2_id] eq {}} {
if {$meet_retry == 10} {
error "assertion: Retried to meet Node 0 too many times"
}
# If Node 0 doesn't know about Node 1 & 2, it means Node 1 did not gossip about node 2 in its MEET message.
# So we kill the outbound link from Node 1 to Node 0, to force a reconnect and a re-send of the MEET message.
after 100
# Since we are in handshake, we use a randomly generated ID we have to find
R 1 DEBUG CLUSTERLINK KILL ALL [cluster_get_first_node_in_handshake 1]
incr meet_retry 1
}

# Wait for Node 1's handshake to timeout
wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 1] eq {}
} else {
fail "Node 1 never exited handshake state"
}

# Wait for Node 0's handshake to timeout
wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 1] eq {}
} else {
fail "Node 0 never exited handshake state"
}

# At this point Node 0 knows Node 1 & 2 through the gossip, but they don't know Node 0.
wait_for_condition 50 100 {
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {} &&
[cluster_get_node_by_id 1 $node0_id] eq {} &&
[cluster_get_node_by_id 2 $node0_id] eq {}
} else {
fail "Unexpected CLUSTER NODES output, nodes 1 & 2 should not know node 0."
}

# Allow all messages to go through again
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE
R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inbound link.
# Handshake should now complete successfully.
wait_for_condition 50 200 {
[cluster_3_nodes_all_know_each_other]
} else {
fail "Unexpected CLUSTER NODES output, all nodes should know each other."
}
} ;# stop Node 0
} ;# test
} ;# stop cluster

start_cluster 2 0 {tags {external:skip cluster} overrides {cluster-node-timeout 4000 cluster-replica-no-failover yes}} {
set CLUSTER_PACKET_TYPE_PING 0
set CLUSTER_PACKET_TYPE_PONG 1
set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1
set CLUSTER_PACKET_TYPE_ALL -2

test "Handshake eventually succeeds after node handshake timeout on one side with inconsistent view of the cluster" {
set cluster_port [find_available_port $::baseport $::portcount]
start_server [list overrides [list cluster-enabled yes cluster-node-timeout 4000 cluster-port $cluster_port]] {
# In this test we will trigger a handshake timeout on one side of the handshake.
# Node 1 and 2 already know each other, then we make node 0 meet node 1:
#
# Node 0 -- MEET -> Node 1
# Node 0 <- PONG -- Node 1
# Node 0 <- PING -- Node 1 [Node 0 will mark the handshake as successful]
# Node 0 -- PONG -> Node 1 [we drop this message, so node 1 will eventually mark the handshake as timed out]
#
# After the handshake is timed out, we allow all cluster bus messages to go through.
# Eventually Node 0 should send a MEET packet to the other nodes to complete the handshake.

set node0_id [dict get [get_myself 0] id]
set node1_id [dict get [get_myself 1] id]
set node2_id [dict get [get_myself 2] id]

# Drop PONG messages
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG
# Drop MEET cluster bus messages, so that Node 0 cannot start a handshake with Node 2.
R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET

# Node 0 meets node 1
R 0 CLUSTER MEET [srv -1 host] [srv -1 port]

# Wait for node 0 to know about the other nodes in the cluster
wait_for_condition 50 100 {
[cluster_get_node_by_id 0 $node1_id] != {} &&
[cluster_get_node_by_id 0 $node2_id] != {}
} else {
fail "Node 0 never learned about node 1 and 2"
}
# At this point, node 0 learned about the other nodes in the cluster from meeting node 1.
wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 0] eq {}
} else {
fail "Node 1 never exited handshake state"
}
# At this point, from node 0 point of view, the handshake with node 1 succeeded.

wait_for_condition 50 100 {
[cluster_get_first_node_in_handshake 1] eq {}
} else {
fail "Node 1 never exited handshake state"
}
assert {[cluster_get_node_by_id 1 $node0_id] eq {}}
# At this point, from node 1 point of view, the handshake with node 0 timed out.

# Allow all messages
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE
R 2 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# Now Node 0 will send a MEET packet to Node 1 & 2 since it has an outbound link to these nodes but no inblound link.
# Handshake should now complete successfully.
pieturin marked this conversation as resolved.
Show resolved Hide resolved
wait_for_condition 50 200 {
[cluster_3_nodes_all_know_each_other]
} else {
fail "Unexpected CLUSTER NODES output, all nodes should know each other."
}
} ;# stop Node 0
} ;# test
} ;# stop cluster
Loading