Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster meet reliable under link failures #461

Merged
merged 11 commits into from
Jun 17, 2024
35 changes: 28 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2845,7 +2845,16 @@ int clusterIsValidPacket(clusterLink *link) {
* received from the wrong sender ID). */
int clusterProcessPacket(clusterLink *link) {
/* Validate that the packet is well-formed */
if (!clusterIsValidPacket(link)) return 1;
if (!clusterIsValidPacket(link)) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) {
freeClusterLink(link);
serverLog(LL_WARNING, "Closing link for matching packet type %hu", type);
return 0;
}
return 1;
}

clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
Expand Down Expand Up @@ -2943,6 +2952,13 @@ int clusterProcessPacket(clusterLink *link) {
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
/* Once we get a response for MEET from the sender, we can stop sending more MEET. */
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
sender->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name,
sender->human_nodename);
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
}
if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
Expand Down Expand Up @@ -3408,12 +3424,17 @@ void clusterLinkConnectHandler(connection *conn) {
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;
/* NOTE: Assume the current node is A and is asked to MEET another node B.
* Once A sends MEET to B, it cannot clear the MEET flag for B until it
* gets a response from B. If the MEET packet is not accepted by B due to
* link failure, A must continue sending MEET. If A doesn't continue sending
* MEET, A will know about B, but B will never add A. Every node always
* responds to PINGs from unknown nodes with a PONG, so A will know about B
* and continue sending PINGs. But B won't add A until it sees a MEET (or it
* gets to know about A from a trusted third node C). In this case, clearing
* the MEET flag here leads to asymmetry in the cluster membership. So, we
* clear the MEET flag in clusterProcessPacket.
*/
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ void debugCommand(client *c) {
" Show low level info about `key` and associated value.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
" When set to 1, the cluster link is closed after dropping a packet based on the filter."
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -593,6 +596,9 @@ void debugCommand(client *c) {
if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK) return;
server.cluster_drop_packet_filter = packet_type;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
PingXie marked this conversation as resolved.
Show resolved Hide resolved
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "object") && c->argc == 3) {
dictEntry *de;
robj *val;
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,8 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
Expand Down
83 changes: 83 additions & 0 deletions tests/unit/cluster/cluster-multiple-meets.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# make sure the test infra won't use SELECT
set old_singledb $::singledb
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
set ::singledb 1

tags {tls:skip external:skip cluster} {
set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
for {set id 0} {$id < [llength $::servers]} {incr id} {
# Every node should be reachable.
wait_for_condition 1000 50 {
([catch {R $id ping} ping_reply] == 0) &&
($ping_reply eq {PONG})
} else {
catch {R $id ping} err
fail "Node #$id keeps replying '$err' to PING."
}
}
}

test "Before slots allocation, all nodes report cluster failure" {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_PONG 1
set CLUSTER_PACKET_TYPE_NONE -1
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
}

test "Allocate slots" {
cluster_allocate_slots 2 0;# primaries replicas
}

test "Multiple MEETs from Node 1 to Node 0 should work" {
# Make 1 drop the PONG responses to MEET
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG
# It is important to close the connection on drop, otherwise a subsequent MEET won't be sent
R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R 1 CLUSTER MEET 127.0.0.1 [srv 0 port]

# Wait for at least a few MEETs to be sent so that we are sure that 1 is dropping the response to MEET.
wait_for_condition 1000 50 {
[CI 0 cluster_stats_messages_meet_received] > 1 &&
[CI 1 cluster_state] eq {fail} && [CI 0 cluster_state] eq {ok}
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# 0 will be connected to 1, but 1 won't see that 0 is connected
assert {[llength [get_cluster_nodes 1 connected]] == 1}
assert {[llength [get_cluster_nodes 0 connected]] == 2}

# Drop incoming and outgoing links from/to 1
R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID]

# Wait for 0 to know about 1 again after 1 sends a MEET
wait_for_condition 1000 50 {
[llength [get_cluster_nodes 0 connected]] == 2
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# Undo packet drop
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE
R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0

# Both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
} ;# tags

set ::singledb $old_singledb

71 changes: 71 additions & 0 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# make sure the test infra won't use SELECT
set old_singledb $::singledb
set ::singledb 1

tags {tls:skip external:skip cluster} {
set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
for {set id 0} {$id < [llength $::servers]} {incr id} {
# Every node should be reachable.
wait_for_condition 1000 50 {
([catch {R $id ping} ping_reply] == 0) &&
($ping_reply eq {PONG})
} else {
catch {R $id ping} err
fail "Node #$id keeps replying '$err' to PING."
}
}
}

test "Before slots allocation, all nodes report cluster failure" {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
}

test "Allocate slots" {
cluster_allocate_slots 2 0
}

test "MEET is reliable when target drops the initial MEETs" {
# Make 0 drop the initial MEET messages due to link failure
R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET
R 0 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R 1 CLUSTER MEET 127.0.0.1 [srv 0 port]

# Wait for at least a few MEETs to be sent so that we are sure that 0 is
# dropping them.
wait_for_condition 1000 50 {
[CI 0 cluster_stats_messages_meet_received] >= 3
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# Make sure the nodes still don't know about each other
assert {[llength [get_cluster_nodes 1 connected]] == 1}
assert {[llength [get_cluster_nodes 0 connected]] == 1}

R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# If the MEET is reliable, both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[CI 0 cluster_stats_messages_meet_received] >= 4 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
} ;# tags

set ::singledb $old_singledb

Loading