From e873375fcfdfe70df79494461d16c5a33dadd0f5 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Thu, 3 Oct 2024 10:44:24 +0530 Subject: [PATCH 01/13] Added a new SWIM protocol message type: `ProbePing` A `ProbePing` message will be sent to another supervisor if we receive a `Confirmed` message from another supervisor. This message will be used to make sure we do not blindly accept a supervisor to be `Confirmed` based on what some other node has to say. This is particularly important if a (set of) node(s) enter into partition and come out of partition, then they would think other supervisors are `Confirmed` and spread that 'rumor`, resulting in otherwise `Alive` supervisors to be construed as `Confirmed` by some supervisors. This message will check a supposedly `Confirmed` supervisor with a `ProbePing` message and if there is a response to that `ProbePing` the supervisor is not considered `Confirmed`. The `ProbePing` message is sent in the `outbound` loop. The way this is done is we maintain a set of members to be `ProbePing`ed in a `probe_list` inside the `Server` struct. Signed-off-by: Abhijit Gadgil --- components/butterfly/protocols/swim.proto | 8 ++- components/butterfly/src/lib.rs | 2 + components/butterfly/src/member.rs | 14 +++-- components/butterfly/src/probe_list.rs | 22 +++++++ components/butterfly/src/server.rs | 50 ++++++++++++--- components/butterfly/src/server/inbound.rs | 46 +++++++++++++- components/butterfly/src/server/outbound.rs | 68 +++++++++++++++++---- components/butterfly/src/swim.rs | 44 +++++++++++++ components/butterfly/tests/common/mod.rs | 2 + 9 files changed, 226 insertions(+), 30 deletions(-) create mode 100644 components/butterfly/src/probe_list.rs diff --git a/components/butterfly/protocols/swim.proto b/components/butterfly/protocols/swim.proto index abdb9d5054..3a74c9f170 100644 --- a/components/butterfly/protocols/swim.proto +++ b/components/butterfly/protocols/swim.proto @@ -10,6 +10,7 @@ message Member { optional int32 gossip_port = 5; optional bool persistent = 6 [default = false]; optional bool departed = 7 [default = false]; + optional bool probe_ping = 8 [default = false]; } message Ping { @@ -22,6 +23,10 @@ message Ack { optional Member forward_to = 2; } +message ProbePing { + optional Member from = 1; +} + message PingReq { optional Member from = 1; optional Member target = 2; @@ -35,7 +40,7 @@ message Membership { } message Swim { - enum Type { PING = 1; ACK = 2; PINGREQ = 3; }; + enum Type { PING = 1; ACK = 2; PINGREQ = 3; PROBEPING = 4; }; // Identifies which field is filled in. required Type type = 1; @@ -43,6 +48,7 @@ message Swim { Ping ping = 2; Ack ack = 3; PingReq pingreq = 4; + ProbePing probeping = 6; } repeated Membership membership = 5; } diff --git a/components/butterfly/src/lib.rs b/components/butterfly/src/lib.rs index 13c38a6684..f948cf137e 100644 --- a/components/butterfly/src/lib.rs +++ b/components/butterfly/src/lib.rs @@ -35,6 +35,8 @@ pub mod rumor; pub mod server; pub mod swim; +pub mod probe_list; + pub use crate::server::Server; use lazy_static::lazy_static; use std::cell::UnsafeCell; diff --git a/components/butterfly/src/member.rs b/components/butterfly/src/member.rs index 1e12092e2d..19a7d8859e 100644 --- a/components/butterfly/src/member.rs +++ b/components/butterfly/src/member.rs @@ -59,7 +59,7 @@ lazy_static! { /// /// Note: we're intentionally deriving `Copy` to be able to treat this /// like a "normal" numeric type. -#[derive(Clone, Debug, Ord, PartialEq, PartialOrd, Eq, Copy, Default)] +#[derive(Clone, Debug, Ord, PartialEq, PartialOrd, Eq, Copy, Default, Hash)] pub struct Incarnation(u64); impl From for Incarnation { @@ -128,7 +128,7 @@ pub type UuidSimple = String; /// A member in the swim group. Passes most of its functionality along to the internal protobuf /// representation. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Member { pub id: String, pub incarnation: Incarnation, @@ -137,6 +137,7 @@ pub struct Member { pub gossip_port: u16, pub persistent: bool, pub departed: bool, + pub probe_ping: bool, } impl Member { @@ -170,7 +171,8 @@ impl Default for Member { swim_port: 0, gossip_port: 0, persistent: false, - departed: false, } + departed: false, + probe_ping: false, } } } @@ -194,7 +196,8 @@ impl From for proto::Member { swim_port: Some(value.swim_port.into()), gossip_port: Some(value.gossip_port.into()), persistent: Some(value.persistent), - departed: Some(value.departed), } + departed: Some(value.departed), + probe_ping: Some(value.probe_ping), } } } @@ -305,7 +308,8 @@ impl FromProto for Member { .and_then(as_port) .ok_or(Error::ProtocolMismatch("gossip-port"))?, persistent: proto.persistent.unwrap_or(false), - departed: proto.departed.unwrap_or(false), }) + departed: proto.departed.unwrap_or(false), + probe_ping: proto.probe_ping.unwrap_or(false), }) } } diff --git a/components/butterfly/src/probe_list.rs b/components/butterfly/src/probe_list.rs new file mode 100644 index 0000000000..06c81accfd --- /dev/null +++ b/components/butterfly/src/probe_list.rs @@ -0,0 +1,22 @@ +//! Implementation of a probe list - A list of Members to be 'probe'd when reported as `Confirmed`. + +use std::collections::HashSet; + +use habitat_common::sync::{Lock, + ReadGuard, + WriteGuard}; + +use crate::member::Member; + +#[derive(Debug)] +pub struct ProbeList { + members: Lock>, +} + +impl ProbeList { + pub fn new() -> Self { Self { members: Lock::new(HashSet::new()), } } + + pub fn members_read(&self) -> ReadGuard<'_, HashSet> { self.members.read() } + + pub fn members_write(&self) -> WriteGuard<'_, HashSet> { self.members.write() } +} diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index bc7312cd16..54d75305e0 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -23,6 +23,7 @@ use crate::{error::{Error, MemberList, MemberListProxy}, message, + probe_list::ProbeList, rumor::{dat_file::{DatFileReader, DatFileWriter}, departure::Departure, @@ -40,6 +41,7 @@ use crate::{error::{Error, RumorStoreProxy, RumorType}, swim::Ack}; + use habitat_common::{liveliness_checker, sync::Lock, FeatureFlag}; @@ -286,6 +288,7 @@ pub struct Server { // depends on it being so. Refactor so it can be private. myself: Arc, pub member_list: Arc, + pub probe_list: Arc, ring_key: Arc>, rumor_heat: Arc, pub service_store: RumorStore, @@ -315,6 +318,7 @@ impl Clone for Server { member_id: self.member_id.clone(), myself: self.myself.clone(), member_list: self.member_list.clone(), + probe_list: self.probe_list.clone(), ring_key: self.ring_key.clone(), rumor_heat: self.rumor_heat.clone(), service_store: self.service_store.clone(), @@ -377,6 +381,7 @@ impl Server { member_id: Arc::new(member_id), myself: Arc::new(myself), member_list: Arc::new(MemberList::new()), + probe_list: Arc::new(ProbeList::new()), ring_key: Arc::new(ring_key), rumor_heat: Arc::default(), service_store: RumorStore::default(), @@ -669,14 +674,43 @@ impl Server { fn insert_member_from_rumor_mlw_smw_rhw(&self, member: Member, mut health: Health) { let rk: RumorKey = RumorKey::from(&member); - if member.id == self.member_id() - && health != Health::Alive - && member.incarnation >= self.myself.lock_smr().incarnation() - { - self.myself - .lock_smw() - .refute_incarnation(member.incarnation); - health = Health::Alive; + if member.id != self.member_id() { + // Incoming Health is Confirmed + if health == Health::Confirmed { + let membership = self.member_list.membership_for_mlr(&member.id); + if let Some(mship) = membership { + match mship.health { + Health::Alive | Health::Suspect => { + if mship.member.incarnation > member.incarnation { + debug!("Member: {}, Incoming Incarnation {} is older, current incarnation is \ + {}. No-OP.", + member.id, member.incarnation, mship.member.incarnation); + } else { + warn!("Member: {}, Our Information about the member is '{}', Incoming \ + information is '{}'. Will Send a `ProbePing` to the \ + member.", + member.id, mship.health, health); + self.probe_list.members_write().insert(member.clone()); + return; + } + } + _ => { + debug!("Member: {}, Incoming Health {}, Member Health: {}. No-op", + member.id, health, mship.health); + } + } + } else { + debug!("Member: {} Does not exist in the Member List.", member.id); + } + } + } else { + if health != Health::Alive && member.incarnation >= self.myself.lock_smr().incarnation() + { + self.myself + .lock_smw() + .refute_incarnation(member.incarnation); + health = Health::Alive; + } } let member_id = member.id.clone(); diff --git a/components/butterfly/src/server/inbound.rs b/components/butterfly/src/server/inbound.rs index 668f963597..527219eecb 100644 --- a/components/butterfly/src/server/inbound.rs +++ b/components/butterfly/src/server/inbound.rs @@ -9,6 +9,7 @@ use crate::{member::Health, swim::{Ack, Ping, PingReq, + ProbePing, Swim, SwimKind}}; use habitat_common::liveliness_checker; @@ -123,7 +124,15 @@ pub fn run_loop(server: &Server, socket: &UdpSocket, tx_outbound: &AckSender) -> pingreq.from.id); continue; } - process_pingreq_mlr_smr_rhw(server, socket, addr, pingreq); + process_pingreq_mlr_smw_rhw(server, socket, addr, pingreq); + } + SwimKind::ProbePing(probe_ping) => { + if server.is_member_blocked_sblr(&probe_ping.from.id) { + debug!("Not processing message from {} - it is blocked", + probe_ping.from.id); + continue; + } + process_probeping_mlw_smw_rhw(server, socket, addr, probe_ping); } } } @@ -154,7 +163,7 @@ pub fn run_loop(server: &Server, socket: &UdpSocket, tx_outbound: &AckSender) -> /// * `MemberList::entries` (read) /// * `Server::member` (read) /// * `RumorHeat::inner` (write) -fn process_pingreq_mlr_smr_rhw(server: &Server, +fn process_pingreq_mlr_smw_rhw(server: &Server, socket: &UdpSocket, addr: SocketAddr, mut msg: PingReq) { @@ -227,10 +236,12 @@ fn process_ack_mlw_smw_rhw(server: &Server, /// * `Server::member` (write) /// * `RumorHeat::inner` (write) fn process_ping_mlw_smw_rhw(server: &Server, socket: &UdpSocket, addr: SocketAddr, mut msg: Ping) { + trace!("Ping from {}@{}", msg.from.id, addr); + outbound::ack_mlr_smr_rhw(server, socket, &msg.from, addr, msg.forward_to); // Populate the member for this sender with its remote address msg.from.address = addr.ip().to_string(); - trace!("Ping from {}@{}", msg.from.id, addr); + if msg.from.departed { server.insert_member_mlw_rhw(msg.from, Health::Departed); } else { @@ -240,3 +251,32 @@ fn process_ping_mlw_smw_rhw(server: &Server, socket: &UdpSocket, addr: SocketAdd server.insert_member_from_rumor_mlw_smw_rhw(membership.member, membership.health); } } + +/// # Locking (see locking.md) +/// * `MemberList::entries` (write) +/// * `Server::member` (write) +/// * `RumorHeat::inner` (write) +fn process_probeping_mlw_smw_rhw(server: &Server, + socket: &UdpSocket, + addr: SocketAddr, + mut msg: ProbePing) { + trace!("ProbePing from {}@{}", msg.from.id, addr); + + // We received this message because someone believed we are `Confirmed`. For that member to + // know we are `Alive` quickly, we will increment our `Incarnation`. This will create a `hot` + // rumor in the network (through 'successful' `insert_mlw`) when our `Ack` is processed. + server.myself.lock_smw().increment_incarnation(); + + // Ack now. + outbound::ack_mlr_smr_rhw(server, socket, &msg.from, addr, None); + + // Populate the member for this sender with its remote address + msg.from.address = addr.ip().to_string(); + + // Mark Sender as `Alive` + if msg.from.departed { + server.insert_member_mlw_rhw(msg.from, Health::Departed); + } else { + server.insert_member_mlw_rhw(msg.from, Health::Alive); + } +} diff --git a/components/butterfly/src/server/outbound.rs b/components/butterfly/src/server/outbound.rs index 14ebcc9896..c04da627c7 100644 --- a/components/butterfly/src/server/outbound.rs +++ b/components/butterfly/src/server/outbound.rs @@ -12,11 +12,13 @@ use crate::{member::{Health, swim::{Ack, Ping, PingReq, + ProbePing, Swim}}; use habitat_common::liveliness_checker; use habitat_core::util::ToI64; use lazy_static::lazy_static; -use log::{error, +use log::{debug, + error, trace, warn}; use prometheus::{register_histogram_vec, @@ -108,7 +110,8 @@ fn run_loop(server: &Server, socket: &UdpSocket, rx_inbound: &AckReceiver, timin socket, member, member.swim_socket_address(), - None); + None, + false); }); } } @@ -119,7 +122,35 @@ fn run_loop(server: &Server, socket: &UdpSocket, rx_inbound: &AckReceiver, timin continue; } - server.update_swim_round(); + let members_to_probe = server.probe_list + .members_write() + .drain() + .collect::>(); + + if members_to_probe.len() > 0 { + debug!("Probing {} members in the Probe List.", + members_to_probe.len()); + + for member in members_to_probe { + trace!("Probing member: {}", member.id); + let addr = member.swim_socket_address(); + ping_mlr_smr_rhw(server, + socket, + &member, + member.swim_socket_address(), + None, + true); + if recv_ack_mlw_rhw(server, rx_inbound, timing, &member, addr, AckFrom::Ping) { + trace!("Probe Successful for Member: {}", member.id); + } else { + warn!("Probe failed for Member: {}, Marking as Confirmed", + member.id); + server.insert_member_mlw_rhw(member, Health::Confirmed); + } + } + } else { + debug!("Zero members in probe_list"); + } let check_list = server.member_list.check_list_mlr(&server.member_id); @@ -134,6 +165,8 @@ fn run_loop(server: &Server, socket: &UdpSocket, rx_inbound: &AckReceiver, timin } } + server.update_swim_round(); + // This will only come into play if: // // * there was nothing in `check_list` @@ -183,7 +216,7 @@ fn probe_mlw_smr_rhw(server: &Server, // Ping the member, and wait for the ack. SWIM_PROBES_SENT.with_label_values(&["ping"]).inc(); - ping_mlr_smr_rhw(server, socket, &member, addr, None); + ping_mlr_smr_rhw(server, socket, &member, addr, None, false); if recv_ack_mlw_rhw(server, rx_inbound, timing, &member, addr, AckFrom::Ping) { SWIM_PROBES_SENT.with_label_values(&["ack"]).inc(); @@ -383,12 +416,22 @@ pub fn ping_mlr_smr_rhw(server: &Server, socket: &UdpSocket, target: &Member, addr: SocketAddr, - forward_to: Option<&Member>) { - let ping_msg = Ping { membership: vec![], - from: server.myself.lock_smr().to_member(), - forward_to: forward_to.cloned(), /* TODO: see if we can eliminate this - * clone */ }; - let swim = populate_membership_rumors_mlr_rhw(server, target, ping_msg); + forward_to: Option<&Member>, + probe_ping: bool) { + let swim = if !probe_ping { + let ping_msg = Ping { membership: vec![], + from: server.myself.lock_smr().to_member(), + forward_to: forward_to.cloned(), /* TODO: see if we can eliminate + * this + * clone */ }; + + populate_membership_rumors_mlr_rhw(server, target, ping_msg) + } else { + let mut from = server.myself.lock_smr().to_member(); + from.probe_ping = true; + ProbePing { from }.into() + }; + let bytes = match swim.encode() { Ok(bytes) => bytes, Err(e) => { @@ -492,7 +535,6 @@ pub fn ack_mlr_smr_rhw(server: &Server, let ack_msg = Ack { membership: vec![], from: server.myself.lock_smr().to_member(), forward_to: forward_to.map(Member::from), }; - let member_id = ack_msg.from.id.clone(); let swim = populate_membership_rumors_mlr_rhw(server, target, ack_msg); let bytes = match swim.encode() { Ok(bytes) => bytes, @@ -514,8 +556,8 @@ pub fn ack_mlr_smr_rhw(server: &Server, SWIM_MESSAGES_SENT.with_label_values(label_values).inc(); SWIM_BYTES_SENT.with_label_values(label_values) .set(payload.len().to_i64()); - trace!("Sent ack to {}@{}", member_id, addr); + trace!("Sent ack to {}@{}", target.id, addr); } - Err(e) => error!("Failed ack to {}@{}: {}", member_id, addr, e), + Err(e) => error!("Failed ack to {}@{}: {}", target.id, addr, e), } } diff --git a/components/butterfly/src/swim.rs b/components/butterfly/src/swim.rs index 46f160e231..ef5639c3b7 100644 --- a/components/butterfly/src/swim.rs +++ b/components/butterfly/src/swim.rs @@ -221,11 +221,52 @@ impl From for Swim { } } +#[derive(Debug, Clone, Serialize)] +pub struct ProbePing { + pub from: Member, +} + +impl FromProto for ProbePing { + fn from_proto(value: proto::Swim) -> Result { + let payload = match value.payload.ok_or(Error::ProtocolMismatch("payload"))? { + SwimPayload::Probeping(ping) => ping, + _ => panic!("try-from probeping"), + }; + Ok(ProbePing { from: payload.from + .ok_or(Error::ProtocolMismatch("from")) + .and_then(Member::from_proto)?, }) + } +} + +impl protocol::Message for ProbePing { + const MESSAGE_ID: &'static str = "ProbePing"; +} + +impl From for proto::ProbePing { + fn from(value: ProbePing) -> Self { proto::ProbePing { from: Some(value.from.into()), } } +} + +impl From for proto::Swim { + fn from(value: ProbePing) -> Self { + proto::Swim { r#type: SwimType::Probeping as i32, + membership: vec![], + payload: Some(SwimPayload::Probeping(value.into())), } + } +} + +impl From for Swim { + fn from(value: ProbePing) -> Self { + Swim { r#type: SwimType::Probeping, + membership: vec![], + kind: SwimKind::ProbePing(value), } + } +} #[derive(Debug, Clone, Serialize)] pub enum SwimKind { Ping(Ping), Ack(Ack), PingReq(PingReq), + ProbePing(ProbePing), } impl From for SwimPayload { @@ -234,6 +275,7 @@ impl From for SwimPayload { SwimKind::Ping(ping) => SwimPayload::Ping(ping.into()), SwimKind::Ack(ack) => SwimPayload::Ack(ack.into()), SwimKind::PingReq(pingreq) => SwimPayload::Pingreq(pingreq.into()), + SwimKind::ProbePing(probeping) => SwimPayload::Probeping(probeping.into()), } } } @@ -251,6 +293,7 @@ impl SwimKind { SwimKind::Ping(_) => "ping", SwimKind::Ack(_) => "ack", SwimKind::PingReq(_) => "pingreq", + SwimKind::ProbePing(_) => "probeping", } } } @@ -274,6 +317,7 @@ impl Swim { SwimType::Ack => SwimKind::Ack(Ack::from_proto(proto)?), SwimType::Ping => SwimKind::Ping(Ping::from_proto(proto)?), SwimType::Pingreq => SwimKind::PingReq(PingReq::from_proto(proto)?), + SwimType::Probeping => SwimKind::ProbePing(ProbePing::from_proto(proto)?), }; Ok(Swim { r#type, membership: memberships, diff --git a/components/butterfly/tests/common/mod.rs b/components/butterfly/tests/common/mod.rs index 2a890025e0..c03e97570d 100644 --- a/components/butterfly/tests/common/mod.rs +++ b/components/butterfly/tests/common/mod.rs @@ -56,6 +56,8 @@ pub fn start_server_smw_rhw(name: &str, ring_key: Option, suitability: let member = Member { swim_port, gossip_port, ..Default::default() }; + eprintln!("member: {:#?}", member); + let mut server = Server::new(listen_swim, listen_gossip, member, From c2adbfe95871a1452f48a62f9d9b2b3fd2c976c2 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Fri, 4 Oct 2024 18:21:43 +0530 Subject: [PATCH 02/13] `mark_sender_alive` API The `insert_member_mlw_rhw` API is supposed to insert the member in the member list (note this is different from `insert_member_from_rumor`). But due to the way we use `Incarnation`, the `Alive` health insertion is not successful. We are using this new API for marking sender as `Alive`. This can be better handled by `insert_member` but there are way too many test cases around that logic, which is not very wise to change at the moment. Signed-off-by: Abhijit Gadgil --- components/butterfly/src/member.rs | 45 +++++++++++++++++---- components/butterfly/src/server.rs | 26 ++++++++++++ components/butterfly/src/server/inbound.rs | 9 +++-- components/butterfly/src/server/outbound.rs | 9 ++++- 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/components/butterfly/src/member.rs b/components/butterfly/src/member.rs index 19a7d8859e..2ea0d96746 100644 --- a/components/butterfly/src/member.rs +++ b/components/butterfly/src/member.rs @@ -30,7 +30,7 @@ use serde::{de, Serialize, Serializer}; use std::{collections::{hash_map, - HashMap}, + HashMap }, convert::TryFrom, fmt, net::SocketAddr, @@ -503,23 +503,39 @@ impl MemberList { // TODO (CM): why don't we just insert a membership record here? pub fn insert_mlw(&self, incoming_member: Member, incoming_health: Health) -> bool { self.insert_membership_mlw(Membership { member: incoming_member, - health: incoming_health, }) + health: incoming_health, }, + false) } /// # Locking (see locking.md) /// * `MemberList::entries` (write) - fn insert_membership_mlw(&self, incoming: Membership) -> bool { + fn insert_membership_mlw(&self, incoming: Membership, ignore_incarnation_health: bool) -> bool { // Is this clone necessary, or can a key be a reference to a field contained in the value? // Maybe the members we store should not contain the ID to reduce the duplication? + trace!("insert_membership_mlw: Member: {}, Health: {}", + incoming.member.id, + incoming.health); let modified = match self.write_entries().entry(incoming.member.id.clone()) { hash_map::Entry::Occupied(mut entry) => { let val = entry.get_mut(); - if incoming.newer_or_less_healthy_than(val.member.incarnation, val.health) { - *val = member_list::Entry { member: incoming.member, - health: incoming.health, - health_updated_at: Instant::now(), }; - true + if incoming.newer_or_less_healthy_than(val.member.incarnation, val.health) + || ignore_incarnation_health + { + if val.health != incoming.health || !ignore_incarnation_health { + trace!("++ current health: {}, incoming health: {}", val.health, incoming.health); + *val = member_list::Entry { member: incoming.member, + health: incoming.health, + health_updated_at: Instant::now(), }; + trace!("Occupied: Updated"); + true + } else { + trace!("~~ current health: {}, incoming health: {}.", val.health, incoming.health); + trace!("Occupied: Not Updated"); + false + } } else { + trace!("-- current health: {}, incoming health: {}, incarnation: {}, ", val.health, incoming.health, val.member.incarnation); + trace!("Occupied: Not Updated"); false } } @@ -527,6 +543,7 @@ impl MemberList { entry.insert(member_list::Entry { member: incoming.member, health: incoming.health, health_updated_at: Instant::now(), }); + trace!("Empty: Created!"); true } }; @@ -539,6 +556,17 @@ impl MemberList { modified } + pub(crate) fn insert_member_ignore_incarnation_mlw(&self, + incoming_member: Member, + incoming_health: Health) + -> bool { + trace!("Inserting Member Ignore Incarnation: {:?} with Health: {}", + incoming_member, + incoming_health); + self.insert_membership_mlw(Membership { member: incoming_member, + health: incoming_health, }, + true) + } /// # Locking (see locking.md) /// * `MemberList::entries` (write) pub fn set_departed_mlw(&self, member_id: &str) { @@ -674,6 +702,7 @@ impl MemberList { .cloned() .collect(); members.shuffle(&mut thread_rng()); + members } diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index 54d75305e0..f29031e008 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -607,6 +607,7 @@ impl Server { /// * `MemberList::entries` (write) /// * `RumorHeat::inner` (write) pub fn insert_member_mlw_rhw(&self, member: Member, health: Health) { + trace!("insert_member_mlw_rhw"); let rk: RumorKey = RumorKey::from(&member); let member_id = member.id.clone(); if self.member_list.insert_mlw(member, health) { @@ -623,6 +624,30 @@ impl Server { } } + /// Set the Sender to Alive + /// + /// If we simply use `insert_member_mlw_rhw` as above, the sender may be having the current + /// Incarnation and hence if we try to "update" the sender, it does not get updated. So the + /// sender might remain in `Confirmed` state even after having received a message/ack from the + /// sender. We need to ignore the `Incarnation` in this case. + /// + /// Ideally this should be fixed by fixing the `insert_mlw` API from `crate::member` (by taking + /// extra parameter, but in the interest of not breaking existing usage, we are defining this + /// new function. At some point revisit (and fix affected unit tests). + pub(crate) fn mark_sender_alive_mlw_rhw(&self, sender: Member) { + trace!("mark_sender_alive_mlw_rhw"); + let rk: RumorKey = RumorKey::from(&sender); + + let sender_id = sender.id.clone(); + + if self.member_list + .insert_member_ignore_incarnation_mlw(sender, Health::Alive) + { + debug!("Marking member '{}' as 'Alive' again, startng a new rumour.", sender_id); + self.rumor_heat.lock_rhw().start_hot_rumor(rk); + } + } + /// Set our member to departed, then send up to 10 out of order ack messages to other /// members to seed our status. /// @@ -672,6 +697,7 @@ impl Server { /// * `Server::member` (write) /// * `RumorHeat::inner` (write) fn insert_member_from_rumor_mlw_smw_rhw(&self, member: Member, mut health: Health) { + trace!("insert_member_from_rumor_mlw_smw_rhw"); let rk: RumorKey = RumorKey::from(&member); if member.id != self.member_id() { diff --git a/components/butterfly/src/server/inbound.rs b/components/butterfly/src/server/inbound.rs index 527219eecb..3ae62a703a 100644 --- a/components/butterfly/src/server/inbound.rs +++ b/components/butterfly/src/server/inbound.rs @@ -242,13 +242,14 @@ fn process_ping_mlw_smw_rhw(server: &Server, socket: &UdpSocket, addr: SocketAdd // Populate the member for this sender with its remote address msg.from.address = addr.ip().to_string(); + for membership in msg.membership { + server.insert_member_from_rumor_mlw_smw_rhw(membership.member, membership.health); + } + if msg.from.departed { server.insert_member_mlw_rhw(msg.from, Health::Departed); } else { - server.insert_member_mlw_rhw(msg.from, Health::Alive); - } - for membership in msg.membership { - server.insert_member_from_rumor_mlw_smw_rhw(membership.member, membership.health); + server.mark_sender_alive_mlw_rhw(msg.from); } } diff --git a/components/butterfly/src/server/outbound.rs b/components/butterfly/src/server/outbound.rs index c04da627c7..c25367cbc9 100644 --- a/components/butterfly/src/server/outbound.rs +++ b/components/butterfly/src/server/outbound.rs @@ -35,6 +35,8 @@ use std::{fmt, thread, time::{Duration, Instant}}; +use std::collections::HashSet; +use std::iter::FromIterator; /// How long to sleep between calls to `recv`. const PING_RECV_QUEUE_EMPTY_SLEEP_MS: u64 = 10; @@ -284,7 +286,7 @@ fn recv_ack_mlw_rhw(server: &Server, if ack.from.departed { server.insert_member_mlw_rhw(ack.from, Health::Departed); } else { - server.insert_member_mlw_rhw(ack.from, Health::Alive); + server.mark_sender_alive_mlw_rhw(ack.from); } // Keep listening, we want the ack we expected continue; @@ -293,7 +295,7 @@ fn recv_ack_mlw_rhw(server: &Server, if ack.from.departed { server.insert_member_mlw_rhw(ack.from, Health::Departed); } else { - server.insert_member_mlw_rhw(ack.from, Health::Alive); + server.mark_sender_alive_mlw_rhw(ack.from); } return true; } @@ -343,6 +345,9 @@ pub fn populate_membership_rumors_mlr_rhw(server: &Server, .take(5) // TODO (CM): magic number! .collect(); + let rumors_set: HashSet<_> = HashSet::from_iter(rumors); + let rumors = Vec::from_iter(rumors_set); + for rkey in rumors.iter() { if let Some(member) = server.member_list.membership_for_mlr(&rkey.to_string()) { swim.membership.push(member); From 1ebbaf33e6a907f5a3703eb854578a010ec597be Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Fri, 4 Oct 2024 18:24:05 +0530 Subject: [PATCH 03/13] Updates to Election Algorithm If we receive an election with a `new_term`, we start a new election *only if* we were the leader of previous term. And if we were not *leader* of the previous term, we join the election rumor only when we receive one from the leader. When the leader dies (2 or more) members will re-start election with *new term* and hence the above *new term* logic won't come into play and the normal *leader Election* algorithm will come into play and will work. Signed-off-by: Abhijit Gadgil --- components/butterfly/src/rumor.rs | 8 ++++++++ components/butterfly/src/server.rs | 21 +++++++++++++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/components/butterfly/src/rumor.rs b/components/butterfly/src/rumor.rs index f79ed145a8..23ed18d6e6 100644 --- a/components/butterfly/src/rumor.rs +++ b/components/butterfly/src/rumor.rs @@ -254,6 +254,14 @@ mod storage { } } + impl<'a, E: ElectionRumor> IterableGuard<'a, RumorMap> { + pub fn get_member_id(&self, service_group: &str) -> Option<&str> { + self.get(service_group) + .map(|sg| sg.get(E::const_id()).map(ElectionRumor::member_id)) + .unwrap_or(None) + } + } + /// Allows ergonomic use of the guard for accessing the guarded `RumorMap`: /// ``` /// # use habitat_butterfly::rumor::{Departure, diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index f29031e008..dd9f3dfc51 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -1133,10 +1133,23 @@ impl Server { .map(|stored_term| election.term > stored_term) .unwrap_or(false); if new_term { - debug!("removing old rumor and starting new election"); - self.election_store - .remove_rsw(election.key(), election.id()); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + if Some(self.member_id()) == self.election_store.lock_rsr().get_member_id(election.key()) { + debug!("I am the leader of previous term!"); + debug!("removing old rumor and starting new election"); + self.election_store.remove_rsw(election.key(), election.id()); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + } else { + if Some(election.member_id.as_str()) == self.election_store.lock_rsr().get_member_id(election.key()) { + debug!("Received New Term election from the leader. Starting my own to merge. Term: {}", election.term); + self.election_store.remove_rsw(election.key(), election.id()); + trace!("Removed old election."); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + trace!("Started new election."); + } else { + warn!("Received a New Term Election, but not from the current leader, ignoring!"); + return; + } + } } // If we are the member that this election is voting for, then check to see if the // election is over! If it is, mark this election as final before you process it. From c4078fdd7863d8d44463a3e7652d2e38db345736 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Fri, 4 Oct 2024 18:24:36 +0530 Subject: [PATCH 04/13] Ignored a failing test case `five_members_elect_a_new_leader_when_they_are_quorum_partitioned` test case is ignored for now. This test case right now fails because we *explicitly* restart election on a *non-leader* node. The modified algorithm _may_ need to account from such external restarts. Fixed some `rustfmt` failures Signed-off-by: Abhijit Gadgil --- components/butterfly/src/member.rs | 16 ++++++--- components/butterfly/src/server.rs | 37 +++++++++++++------- components/butterfly/src/server/outbound.rs | 6 ++-- components/butterfly/tests/rumor/election.rs | 5 ++- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/components/butterfly/src/member.rs b/components/butterfly/src/member.rs index 2ea0d96746..4fb3ce4710 100644 --- a/components/butterfly/src/member.rs +++ b/components/butterfly/src/member.rs @@ -30,7 +30,7 @@ use serde::{de, Serialize, Serializer}; use std::{collections::{hash_map, - HashMap }, + HashMap}, convert::TryFrom, fmt, net::SocketAddr, @@ -522,19 +522,26 @@ impl MemberList { || ignore_incarnation_health { if val.health != incoming.health || !ignore_incarnation_health { - trace!("++ current health: {}, incoming health: {}", val.health, incoming.health); + trace!("++ current health: {}, incoming health: {}", + val.health, + incoming.health); *val = member_list::Entry { member: incoming.member, health: incoming.health, health_updated_at: Instant::now(), }; trace!("Occupied: Updated"); true } else { - trace!("~~ current health: {}, incoming health: {}.", val.health, incoming.health); + trace!("~~ current health: {}, incoming health: {}.", + val.health, + incoming.health); trace!("Occupied: Not Updated"); false } } else { - trace!("-- current health: {}, incoming health: {}, incarnation: {}, ", val.health, incoming.health, val.member.incarnation); + trace!("-- current health: {}, incoming health: {}, incarnation: {}, ", + val.health, + incoming.health, + val.member.incarnation); trace!("Occupied: Not Updated"); false } @@ -567,6 +574,7 @@ impl MemberList { health: incoming_health, }, true) } + /// # Locking (see locking.md) /// * `MemberList::entries` (write) pub fn set_departed_mlw(&self, member_id: &str) { diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index dd9f3dfc51..f22fac8220 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -643,7 +643,8 @@ impl Server { if self.member_list .insert_member_ignore_incarnation_mlw(sender, Health::Alive) { - debug!("Marking member '{}' as 'Alive' again, startng a new rumour.", sender_id); + debug!("Marking member '{}' as 'Alive' again, startng a new rumour.", + sender_id); self.rumor_heat.lock_rhw().start_hot_rumor(rk); } } @@ -708,13 +709,13 @@ impl Server { match mship.health { Health::Alive | Health::Suspect => { if mship.member.incarnation > member.incarnation { - debug!("Member: {}, Incoming Incarnation {} is older, current incarnation is \ - {}. No-OP.", + debug!("Member: {}, Incoming Incarnation {} is older, current \ + incarnation is {}. No-OP.", member.id, member.incarnation, mship.member.incarnation); } else { - warn!("Member: {}, Our Information about the member is '{}', Incoming \ - information is '{}'. Will Send a `ProbePing` to the \ - member.", + warn!("Member: {}, Our Information about the member is '{}', \ + Incoming information is '{}'. Will Send a `ProbePing` to \ + the member.", member.id, mship.health, health); self.probe_list.members_write().insert(member.clone()); return; @@ -1133,20 +1134,30 @@ impl Server { .map(|stored_term| election.term > stored_term) .unwrap_or(false); if new_term { - if Some(self.member_id()) == self.election_store.lock_rsr().get_member_id(election.key()) { + if Some(self.member_id()) + == self.election_store.lock_rsr().get_member_id(election.key()) + { debug!("I am the leader of previous term!"); debug!("removing old rumor and starting new election"); - self.election_store.remove_rsw(election.key(), election.id()); + self.election_store + .remove_rsw(election.key(), election.id()); self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); } else { - if Some(election.member_id.as_str()) == self.election_store.lock_rsr().get_member_id(election.key()) { - debug!("Received New Term election from the leader. Starting my own to merge. Term: {}", election.term); - self.election_store.remove_rsw(election.key(), election.id()); + if Some(election.member_id.as_str()) + == self.election_store.lock_rsr().get_member_id(election.key()) + { + debug!("Received New Term election from the leader. Starting my own \ + to merge. Term: {}", + election.term); + self.election_store + .remove_rsw(election.key(), election.id()); trace!("Removed old election."); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, + election.term); trace!("Started new election."); } else { - warn!("Received a New Term Election, but not from the current leader, ignoring!"); + warn!("Received a New Term Election, but not from the current \ + leader, ignoring!"); return; } } diff --git a/components/butterfly/src/server/outbound.rs b/components/butterfly/src/server/outbound.rs index c25367cbc9..8f3bf35551 100644 --- a/components/butterfly/src/server/outbound.rs +++ b/components/butterfly/src/server/outbound.rs @@ -28,15 +28,15 @@ use prometheus::{register_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec}; -use std::{fmt, +use std::{collections::HashSet, + fmt, + iter::FromIterator, net::{SocketAddr, UdpSocket}, sync::mpsc, thread, time::{Duration, Instant}}; -use std::collections::HashSet; -use std::iter::FromIterator; /// How long to sleep between calls to `recv`. const PING_RECV_QUEUE_EMPTY_SLEEP_MS: u64 = 10; diff --git a/components/butterfly/tests/rumor/election.rs b/components/butterfly/tests/rumor/election.rs index 1ab7bef06f..06a2bff2f3 100644 --- a/components/butterfly/tests/rumor/election.rs +++ b/components/butterfly/tests/rumor/election.rs @@ -89,9 +89,12 @@ fn five_members_elect_a_new_leader_when_the_old_one_dies() { }); } +#[ignore = "Leader Election Algorithm Changed. Need to revisit this test."] #[test] #[allow(clippy::cognitive_complexity)] fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { + env_logger::init(); + let mut net = btest::SwimNet::new_with_suitability_rhw(vec![1, 0, 0, 0, 0]); net[0].myself().lock_smw().set_persistent(); net[4].myself().lock_smw().set_persistent(); @@ -129,7 +132,7 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { net.partition(0..2, 2..5); assert_wait_for_health_of_mlr!(net, [0..2, 2..5], Health::Confirmed); net[0].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); - net[4].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + // net[4].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); assert_wait_for_election_status!(net, 0, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 1, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 2, "witcher.prod", ElectionStatus::Finished); From 5b6418ef37fa79099636d0198e69ef1b3d48b496 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Mon, 7 Oct 2024 09:46:31 +0530 Subject: [PATCH 05/13] Fixed Clippy warnings Signed-off-by: Abhijit Gadgil --- components/butterfly/src/server.rs | 47 ++++++++++----------- components/butterfly/src/server/outbound.rs | 2 +- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index f22fac8220..3415ed843d 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -730,14 +730,13 @@ impl Server { debug!("Member: {} Does not exist in the Member List.", member.id); } } - } else { - if health != Health::Alive && member.incarnation >= self.myself.lock_smr().incarnation() - { - self.myself - .lock_smw() - .refute_incarnation(member.incarnation); - health = Health::Alive; - } + } else if health != Health::Alive + && member.incarnation >= self.myself.lock_smr().incarnation() + { + self.myself + .lock_smw() + .refute_incarnation(member.incarnation); + health = Health::Alive; } let member_id = member.id.clone(); @@ -1113,6 +1112,7 @@ impl Server { /// * `MemberList::entries` (read) /// * `RumorHeat::inner` (write) /// * `ManagerServices::inner` (read) + #[allow(clippy::cognitive_complexity)] pub fn insert_election_rsw_mlr_rhw_msr(&self, mut election: Election) { debug!("insert_election: {:?}", election); let rk = RumorKey::from(&election); @@ -1142,24 +1142,21 @@ impl Server { self.election_store .remove_rsw(election.key(), election.id()); self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + } else if Some(election.member_id.as_str()) + == self.election_store.lock_rsr().get_member_id(election.key()) + { + debug!("Received New Term election from the leader. Starting my own to \ + merge. Term: {}", + election.term); + self.election_store + .remove_rsw(election.key(), election.id()); + trace!("Removed old election."); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + trace!("Started new election."); } else { - if Some(election.member_id.as_str()) - == self.election_store.lock_rsr().get_member_id(election.key()) - { - debug!("Received New Term election from the leader. Starting my own \ - to merge. Term: {}", - election.term); - self.election_store - .remove_rsw(election.key(), election.id()); - trace!("Removed old election."); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, - election.term); - trace!("Started new election."); - } else { - warn!("Received a New Term Election, but not from the current \ - leader, ignoring!"); - return; - } + warn!("Received a New Term Election, but not from the current leader, \ + ignoring!"); + return; } } // If we are the member that this election is voting for, then check to see if the diff --git a/components/butterfly/src/server/outbound.rs b/components/butterfly/src/server/outbound.rs index 8f3bf35551..3a7c4f5ded 100644 --- a/components/butterfly/src/server/outbound.rs +++ b/components/butterfly/src/server/outbound.rs @@ -129,7 +129,7 @@ fn run_loop(server: &Server, socket: &UdpSocket, rx_inbound: &AckReceiver, timin .drain() .collect::>(); - if members_to_probe.len() > 0 { + if !members_to_probe.is_empty() { debug!("Probing {} members in the Probe List.", members_to_probe.len()); From f11fa06d099b112486fb64af8cf581b038a24d56 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Mon, 7 Oct 2024 15:51:27 +0530 Subject: [PATCH 06/13] Another update to Election algorithm If a node reboots while it's election is in `Running` state and during the interval, the election is `Finished`. The rebooted node was not able to join the finished election. If the election is `Finished` for the same term. If we receive a rumor that says election is not `Finished` while our state of the election is `Finished` and we steal their vote and make a `hot` rumor so that the other member gets a chance to `finish` their election. Signed-off-by: Abhijit Gadgil --- components/butterfly/src/rumor/election.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/components/butterfly/src/rumor/election.rs b/components/butterfly/src/rumor/election.rs index 7bbd33cb71..fcec430bdb 100644 --- a/components/butterfly/src/rumor/election.rs +++ b/components/butterfly/src/rumor/election.rs @@ -171,8 +171,19 @@ impl Rumor for Election { *self = other; true } else if other.term == self.term && self.status == ElectionStatus::Finished { - debug!("stored rumor is finished and received rumor is for same term; nothing to do"); - false + if other.status == ElectionStatus::Finished { + debug!("stored rumor is finished and received rumor is for same term; nothing to \ + do"); + false + } else { + debug!("stored rumor is finished and received rumor is for same term; Received \ + rumor is not 'Finished'. Taking their votes and sharing ourselves."); + self.steal_votes(&mut other); + true + } + } else if self.term > other.term { + debug!("stored rumor represents a newer term than received; keep sharing it"); + true } else if self.term > other.term { debug!("stored rumor represents a newer term than received; keep sharing it"); true From 0b12b7e587ee425825bc271714f51557dcfcfbd3 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Mon, 7 Oct 2024 19:03:31 +0530 Subject: [PATCH 07/13] Fixes to the failing test case - no longer ignored We need to *restart* election on all the partitioned followers (so that each one of them increases term by 1) and then they can lead to election `Finished` state. This is what will happen in real world where the followers will determine the leader is `Confirmed` and independently update their *term* and start a new election. Signed-off-by: Abhijit Gadgil --- components/butterfly/src/rumor/election.rs | 3 --- components/butterfly/tests/rumor/election.rs | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/components/butterfly/src/rumor/election.rs b/components/butterfly/src/rumor/election.rs index fcec430bdb..dd87f88c8b 100644 --- a/components/butterfly/src/rumor/election.rs +++ b/components/butterfly/src/rumor/election.rs @@ -184,9 +184,6 @@ impl Rumor for Election { } else if self.term > other.term { debug!("stored rumor represents a newer term than received; keep sharing it"); true - } else if self.term > other.term { - debug!("stored rumor represents a newer term than received; keep sharing it"); - true } else if self.suitability > other.suitability { debug!("stored rumor is more suitable; take received rumor's votes and share"); self.steal_votes(&mut other); diff --git a/components/butterfly/tests/rumor/election.rs b/components/butterfly/tests/rumor/election.rs index 06a2bff2f3..9dc938de92 100644 --- a/components/butterfly/tests/rumor/election.rs +++ b/components/butterfly/tests/rumor/election.rs @@ -89,7 +89,6 @@ fn five_members_elect_a_new_leader_when_the_old_one_dies() { }); } -#[ignore = "Leader Election Algorithm Changed. Need to revisit this test."] #[test] #[allow(clippy::cognitive_complexity)] fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { @@ -132,7 +131,9 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { net.partition(0..2, 2..5); assert_wait_for_health_of_mlr!(net, [0..2, 2..5], Health::Confirmed); net[0].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); - // net[4].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + net[2].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + net[3].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + net[4].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); assert_wait_for_election_status!(net, 0, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 1, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 2, "witcher.prod", ElectionStatus::Finished); From 35f158b594cceca23af9e7554793eaf0cd5513eb Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Tue, 8 Oct 2024 09:10:58 +0530 Subject: [PATCH 08/13] Review comments - Adding Optional Suitability to Election Added a `suitability` parameter to `start_election_rsw_mlr_rhw_msr`. This will be used by the `leader` of the current term to make itself as a leader in the next term as well. Plus other review comments Signed-off-by: Abhijit Gadgil --- components/butterfly/src/member.rs | 34 +++++++++------------- components/butterfly/src/server.rs | 25 ++++++++++------ components/butterfly/src/server/inbound.rs | 4 +-- components/butterfly/tests/common/mod.rs | 3 +- components/sup/src/manager.rs | 2 +- 5 files changed, 33 insertions(+), 35 deletions(-) diff --git a/components/butterfly/src/member.rs b/components/butterfly/src/member.rs index 4fb3ce4710..260c68cd1e 100644 --- a/components/butterfly/src/member.rs +++ b/components/butterfly/src/member.rs @@ -509,7 +509,10 @@ impl MemberList { /// # Locking (see locking.md) /// * `MemberList::entries` (write) - fn insert_membership_mlw(&self, incoming: Membership, ignore_incarnation_health: bool) -> bool { + fn insert_membership_mlw(&self, + incoming: Membership, + ignore_incarnation_and_health: bool) + -> bool { // Is this clone necessary, or can a key be a reference to a field contained in the value? // Maybe the members we store should not contain the ID to reduce the duplication? trace!("insert_membership_mlw: Member: {}, Health: {}", @@ -519,24 +522,16 @@ impl MemberList { hash_map::Entry::Occupied(mut entry) => { let val = entry.get_mut(); if incoming.newer_or_less_healthy_than(val.member.incarnation, val.health) - || ignore_incarnation_health + || (ignore_incarnation_and_health && val.health != incoming.health) { - if val.health != incoming.health || !ignore_incarnation_health { - trace!("++ current health: {}, incoming health: {}", - val.health, - incoming.health); - *val = member_list::Entry { member: incoming.member, - health: incoming.health, - health_updated_at: Instant::now(), }; - trace!("Occupied: Updated"); - true - } else { - trace!("~~ current health: {}, incoming health: {}.", - val.health, - incoming.health); - trace!("Occupied: Not Updated"); - false - } + trace!("++ current health: {}, incoming health: {}", + val.health, + incoming.health); + *val = member_list::Entry { member: incoming.member, + health: incoming.health, + health_updated_at: Instant::now(), }; + trace!("Occupied: Updated"); + true } else { trace!("-- current health: {}, incoming health: {}, incarnation: {}, ", val.health, @@ -567,9 +562,6 @@ impl MemberList { incoming_member: Member, incoming_health: Health) -> bool { - trace!("Inserting Member Ignore Incarnation: {:?} with Health: {}", - incoming_member, - incoming_health); self.insert_membership_mlw(Membership { member: incoming_member, health: incoming_health, }, true) diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index 3415ed843d..aa380eabb2 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -941,8 +941,12 @@ impl Server { /// * `MemberList::entries` (read) /// * `RumorHeat::inner` (write) /// * `ManagerServices::inner` (read) - pub fn start_election_rsw_mlr_rhw_msr(&self, service_group: &str, term: u64) { - let suitability = self.suitability_lookup.suitability_for_msr(service_group); + pub fn start_election_rsw_mlr_rhw_msr(&self, + service_group: &str, + term: u64, + suitability: Option) { + let suitability = + suitability.unwrap_or(self.suitability_lookup.suitability_for_msr(service_group)); let has_quorum = self.check_quorum_mlr(service_group); let e = Election::new(self.member_id(), service_group, @@ -1091,7 +1095,7 @@ impl Server { warn!("Starting a new election for {} {}", service_group, term); self.election_store .remove_rsw(&service_group, Election::const_id()); - self.start_election_rsw_mlr_rhw_msr(&service_group, term); + self.start_election_rsw_mlr_rhw_msr(&service_group, term, None); } for (service_group, old_term) in update_elections_to_restart { @@ -1138,10 +1142,13 @@ impl Server { == self.election_store.lock_rsr().get_member_id(election.key()) { debug!("I am the leader of previous term!"); - debug!("removing old rumor and starting new election"); + debug!("removing old rumor and starting new election with highest \ + `Suitability`"); self.election_store .remove_rsw(election.key(), election.id()); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, + election.term, + Some(u64::MAX)); } else if Some(election.member_id.as_str()) == self.election_store.lock_rsr().get_member_id(election.key()) { @@ -1150,9 +1157,9 @@ impl Server { election.term); self.election_store .remove_rsw(election.key(), election.id()); - trace!("Removed old election."); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); - trace!("Started new election."); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, + election.term, + None); } else { warn!("Received a New Term Election, but not from the current leader, \ ignoring!"); @@ -1207,7 +1214,7 @@ impl Server { .lock() .expect("Election timers lock poisoned"); existing_timers.insert(election.service_group.clone(), ElectionTimer(timer)); - self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term); + self.start_election_rsw_mlr_rhw_msr(&election.service_group, election.term, None); } if !election.is_finished() { diff --git a/components/butterfly/src/server/inbound.rs b/components/butterfly/src/server/inbound.rs index 3ae62a703a..a5c5510c99 100644 --- a/components/butterfly/src/server/inbound.rs +++ b/components/butterfly/src/server/inbound.rs @@ -124,7 +124,7 @@ pub fn run_loop(server: &Server, socket: &UdpSocket, tx_outbound: &AckSender) -> pingreq.from.id); continue; } - process_pingreq_mlr_smw_rhw(server, socket, addr, pingreq); + process_pingreq_mlr_smr_rhw(server, socket, addr, pingreq); } SwimKind::ProbePing(probe_ping) => { if server.is_member_blocked_sblr(&probe_ping.from.id) { @@ -163,7 +163,7 @@ pub fn run_loop(server: &Server, socket: &UdpSocket, tx_outbound: &AckSender) -> /// * `MemberList::entries` (read) /// * `Server::member` (read) /// * `RumorHeat::inner` (write) -fn process_pingreq_mlr_smw_rhw(server: &Server, +fn process_pingreq_mlr_smr_rhw(server: &Server, socket: &UdpSocket, addr: SocketAddr, mut msg: PingReq) { diff --git a/components/butterfly/tests/common/mod.rs b/components/butterfly/tests/common/mod.rs index c03e97570d..8b79f9cf59 100644 --- a/components/butterfly/tests/common/mod.rs +++ b/components/butterfly/tests/common/mod.rs @@ -56,7 +56,6 @@ pub fn start_server_smw_rhw(name: &str, ring_key: Option, suitability: let member = Member { swim_port, gossip_port, ..Default::default() }; - eprintln!("member: {:#?}", member); let mut server = Server::new(listen_swim, listen_gossip, @@ -451,7 +450,7 @@ impl SwimNet { pub fn add_election(&mut self, member: usize, service: &str) { self[member].start_election_rsw_mlr_rhw_msr(&ServiceGroup::new(service, "prod", None).unwrap(), - 0); + 0, None); } } diff --git a/components/sup/src/manager.rs b/components/sup/src/manager.rs index 803f7eadc3..b242cfb689 100644 --- a/components/sup/src/manager.rs +++ b/components/sup/src/manager.rs @@ -950,7 +950,7 @@ impl Manager { .remove(&service.service_group)); if service.topology() == Topology::Leader { self.butterfly - .start_election_rsw_mlr_rhw_msr(&service.service_group, 0); + .start_election_rsw_mlr_rhw_msr(&service.service_group, 0, None); } if let Err(e) = self.user_config_watcher.add(&service) { From effc08b675755c97d3884a1d7e79374bf750b375 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Tue, 8 Oct 2024 10:38:27 +0530 Subject: [PATCH 09/13] Clippy Fixes Signed-off-by: Abhijit Gadgil --- components/butterfly/src/server.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index aa380eabb2..99feb9fc90 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -945,8 +945,9 @@ impl Server { service_group: &str, term: u64, suitability: Option) { - let suitability = - suitability.unwrap_or(self.suitability_lookup.suitability_for_msr(service_group)); + let suitability = suitability.unwrap_or_else(|| { + self.suitability_lookup.suitability_for_msr(service_group) + }); let has_quorum = self.check_quorum_mlr(service_group); let e = Election::new(self.member_id(), service_group, From 21c98b44e3019921aac1c2b8e1b1fd3d5886253e Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Tue, 8 Oct 2024 11:25:47 +0530 Subject: [PATCH 10/13] `restart_election` for leader with Max Suitability If we are a leader and if we restart the elections for whatever reasons (believing we lost quorum), we should start with `Max` suitability Signed-off-by: Abhijit Gadgil --- components/butterfly/src/server.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index 99feb9fc90..9563e5fa7e 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -1094,9 +1094,17 @@ impl Server { for (service_group, old_term) in elections_to_restart { let term = old_term + 1; warn!("Starting a new election for {} {}", service_group, term); + let suitability = if Some(self.member_id()) + == self.election_store.lock_rsr().get_member_id(&service_group) + { + Some(u64::MAX) + } else { + None + }; + self.election_store .remove_rsw(&service_group, Election::const_id()); - self.start_election_rsw_mlr_rhw_msr(&service_group, term, None); + self.start_election_rsw_mlr_rhw_msr(&service_group, term, suitability); } for (service_group, old_term) in update_elections_to_restart { From d2b0bce07ab91d81569466c82a36779d0c20cc1c Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Tue, 8 Oct 2024 11:57:14 +0530 Subject: [PATCH 11/13] Log Lack of quorum as a `warn!` message Signed-off-by: Abhijit Gadgil --- components/butterfly/src/server.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/components/butterfly/src/server.rs b/components/butterfly/src/server.rs index 9563e5fa7e..4e8710efa6 100644 --- a/components/butterfly/src/server.rs +++ b/components/butterfly/src/server.rs @@ -922,13 +922,19 @@ impl Server { #[allow(clippy::integer_division)] let has_quorum = alive_population > total_population / 2; - trace!("check_quorum({}): {}/{} alive/total => {}, electorate: {:?}, service_group: {:?}", - key, - alive_population, - total_population, - has_quorum, - electorate, - service_group_members); + let quorum_log_entry = format!("check_quorum({}): {}/{} alive/total => {}, electorate: \ + {:?}, service_group: {:?}", + key, + alive_population, + total_population, + has_quorum, + electorate, + service_group_members); + if !has_quorum { + warn!("{}", quorum_log_entry); + } else { + trace!("{}", quorum_log_entry); + } has_quorum } From c1a7fcb68c543792a33b2810110d22ccdabec1d6 Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Fri, 11 Oct 2024 11:24:56 +0530 Subject: [PATCH 12/13] Added unit tests for Election Algorithm changes Signed-off-by: Abhijit Gadgil --- components/butterfly/tests/common/mod.rs | 22 ++++ components/butterfly/tests/rumor/election.rs | 113 ++++++++++++++++++- 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/components/butterfly/tests/common/mod.rs b/components/butterfly/tests/common/mod.rs index 8b79f9cf59..9125dc3b67 100644 --- a/components/butterfly/tests/common/mod.rs +++ b/components/butterfly/tests/common/mod.rs @@ -368,6 +368,28 @@ impl SwimNet { } } + /// Partition a node from the rest of the network + pub fn partition_node(&self, idx: usize) { + println!("Partitioning {} from the network.", idx); + for i in 0..self.members.len() { + if i != idx { + self.block(idx, i); + self.block(i, idx); + } + } + } + + /// UnPartition a node from the rest of the network + pub fn unpartition_node(&self, idx: usize) { + println!("UnPartitioning {} from the network.", idx); + for i in 0..self.members.len() { + if i != idx { + self.unblock(idx, i); + self.unblock(i, idx); + } + } + } + /// # Locking (see locking.md) /// * `MemberList::entries` (read) pub fn wait_for_health_of_mlr(&self, diff --git a/components/butterfly/tests/rumor/election.rs b/components/butterfly/tests/rumor/election.rs index 9dc938de92..246ea6296e 100644 --- a/components/butterfly/tests/rumor/election.rs +++ b/components/butterfly/tests/rumor/election.rs @@ -1,3 +1,7 @@ +use std::iter::FromIterator; + +use rand::prelude::SliceRandom; + use crate::btest; use habitat_butterfly::{member::Health, rumor::{election::ElectionStatus, @@ -92,8 +96,6 @@ fn five_members_elect_a_new_leader_when_the_old_one_dies() { #[test] #[allow(clippy::cognitive_complexity)] fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { - env_logger::init(); - let mut net = btest::SwimNet::new_with_suitability_rhw(vec![1, 0, 0, 0, 0]); net[0].myself().lock_smw().set_persistent(); net[4].myself().lock_smw().set_persistent(); @@ -173,3 +175,110 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { assert_eq!(new_leader_id.as_ref(), Some(&e.member_id)); }); } + +#[test] +#[allow(clippy::cognitive_complexity)] +fn three_persistent_members_reelect_same_leader_follower_partition() { + // Server '0' will be the leader always. + let mut net = btest::SwimNet::new_with_suitability_rhw(vec![1, 0, 0]); + net[0].myself().lock_smw().set_persistent(); + net[1].myself().lock_smw().set_persistent(); + net[2].myself().lock_smw().set_persistent(); + + net.add_service(0, "core/foobar/1.2.3/20241010101010"); + net.add_service(1, "core/foobar/1.2.3/20241010101010"); + net.add_service(2, "core/foobar/1.2.3/20241010101010"); + net.add_election(0, "foobar"); + net.connect_smr(0, 1); + net.connect_smr(1, 2); + assert_wait_for_health_of_mlr!(net, [0..3, 0..3], Health::Alive); + assert_wait_for_election_status!(net, [0..3], "foobar.prod", ElectionStatus::Finished); + assert_wait_for_equal_election!(net, [0..3, 0..3], "foobar.prod"); + + let leader_id = net[0].election_store + .lock_rsr() + .service_group("foobar.prod") + .map_rumor(Election::const_id(), |e| e.member_id.clone()); + + assert_eq!(leader_id, Some(net[0].member_id().to_string())); + + net.partition(0..2, 2..3); + assert_wait_for_health_of_mlr!(net, [0..2, 2..3], Health::Confirmed); + assert_wait_for_election_status!(net, 0, "foobar.prod", ElectionStatus::Finished); + assert_wait_for_election_status!(net, 1, "foobar.prod", ElectionStatus::Finished); + + net[2].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + assert_wait_for_election_status!(net, 2, "foobar.prod", ElectionStatus::NoQuorum); + + net.unpartition(0..2, 2..3); + assert_wait_for_health_of_mlr!(net, [0..2, 2..3], Health::Alive); + assert_wait_for_election_status!(net, 2, "foobar.prod", ElectionStatus::Finished); + + let new_leader_id = net[2].election_store + .lock_rsr() + .service_group("foobar.prod") + .map_rumor(Election::const_id(), |e| e.member_id.clone()); + + assert_eq!(leader_id, new_leader_id, + "OLD: {:?}, NEW: {:?}", + leader_id, new_leader_id); +} + +#[test] +#[allow(clippy::cognitive_complexity)] +fn five_persistent_members_same_leader_multiple_non_quorum_partitions() { + let mut net = btest::SwimNet::new_with_suitability_rhw(vec![1, 0, 0, 0, 0]); + for i in 0..5 { + net[i].myself().lock_smw().set_persistent(); + net.add_service(i, "core/foobar/1.2.3/20241010101010"); + } + net.add_election(0, "foobar"); + + for servers in Vec::from_iter(0..5).windows(2) { + net.connect_smr(servers[0], servers[1]) + } + assert_wait_for_health_of_mlr!(net, [0..5, 0..5], Health::Alive); + assert_wait_for_election_status!(net, [0..5], "foobar.prod", ElectionStatus::Finished); + assert_wait_for_equal_election!(net, [0..3, 0..3], "foobar.prod"); + + let leader_id = net[0].election_store + .lock_rsr() + .service_group("foobar.prod") + .map_rumor(Election::const_id(), |e| e.member_id.clone()); + + assert_eq!(leader_id, Some(net[0].member_id().to_string())); + + // Making sure - running multiple times after a subset of follower (non-quorum) is partitioned + // and reconnected the leader stays the same. + let mut rng = rand::thread_rng(); + for _ in 0..2 { + let idxes = Vec::from_iter(1_usize..5_usize).choose_multiple(&mut rng, 2) + .copied() + .collect::>(); + for idx in idxes.iter() { + println!("idx: {}", idx); + net.partition_node(*idx); + assert_wait_for_health_of_mlr!(net, *idx, Health::Confirmed); + } + let first = idxes[0]; + net[first].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + assert_wait_for_election_status!(net, 0, "foobar.prod", ElectionStatus::Finished); + + assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::NoQuorum); + + for idx in idxes.iter() { + net.unpartition_node(*idx); + assert_wait_for_health_of_mlr!(net, *idx, Health::Alive); + } + assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::Finished); + + let new_leader_id = net[first].election_store + .lock_rsr() + .service_group("foobar.prod") + .map_rumor(Election::const_id(), |e| e.member_id.clone()); + + assert_eq!(leader_id, new_leader_id, + "OLD: {:?}, NEW: {:?}", + leader_id, new_leader_id); + } +} From 8eea7067739ac16a477583a00af6863a69335f9c Mon Sep 17 00:00:00 2001 From: Abhijit Gadgil Date: Fri, 11 Oct 2024 20:22:57 +0530 Subject: [PATCH 13/13] Update `max_gossip_rounds` and removed partition iteration Signed-off-by: Abhijit Gadgil --- components/butterfly/tests/common/mod.rs | 2 +- components/butterfly/tests/rumor/election.rs | 50 ++++++++++---------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/components/butterfly/tests/common/mod.rs b/components/butterfly/tests/common/mod.rs index 9125dc3b67..3c11d2dd62 100644 --- a/components/butterfly/tests/common/mod.rs +++ b/components/butterfly/tests/common/mod.rs @@ -224,7 +224,7 @@ impl SwimNet { pub fn max_rounds(&self) -> isize { 4 } - pub fn max_gossip_rounds(&self) -> isize { 5 } + pub fn max_gossip_rounds(&self) -> isize { 8 } pub fn rounds(&self) -> Vec { self.members.iter().map(Server::swim_rounds).collect() } diff --git a/components/butterfly/tests/rumor/election.rs b/components/butterfly/tests/rumor/election.rs index 246ea6296e..f12ea9ae49 100644 --- a/components/butterfly/tests/rumor/election.rs +++ b/components/butterfly/tests/rumor/election.rs @@ -251,34 +251,32 @@ fn five_persistent_members_same_leader_multiple_non_quorum_partitions() { // Making sure - running multiple times after a subset of follower (non-quorum) is partitioned // and reconnected the leader stays the same. let mut rng = rand::thread_rng(); - for _ in 0..2 { - let idxes = Vec::from_iter(1_usize..5_usize).choose_multiple(&mut rng, 2) - .copied() - .collect::>(); - for idx in idxes.iter() { - println!("idx: {}", idx); - net.partition_node(*idx); - assert_wait_for_health_of_mlr!(net, *idx, Health::Confirmed); - } - let first = idxes[0]; - net[first].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); - assert_wait_for_election_status!(net, 0, "foobar.prod", ElectionStatus::Finished); + let idxes = Vec::from_iter(1_usize..5_usize).choose_multiple(&mut rng, 2) + .copied() + .collect::>(); + for idx in idxes.iter() { + println!("idx: {}", idx); + net.partition_node(*idx); + assert_wait_for_health_of_mlr!(net, *idx, Health::Confirmed); + } + let first = idxes[0]; + net[first].restart_elections_rsw_mlr_rhw_msr(FeatureFlag::empty()); + assert_wait_for_election_status!(net, 0, "foobar.prod", ElectionStatus::Finished); - assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::NoQuorum); + assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::NoQuorum); - for idx in idxes.iter() { - net.unpartition_node(*idx); - assert_wait_for_health_of_mlr!(net, *idx, Health::Alive); - } - assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::Finished); + for idx in idxes.iter() { + net.unpartition_node(*idx); + assert_wait_for_health_of_mlr!(net, *idx, Health::Alive); + } + assert_wait_for_election_status!(net, first, "foobar.prod", ElectionStatus::Finished); - let new_leader_id = net[first].election_store - .lock_rsr() - .service_group("foobar.prod") - .map_rumor(Election::const_id(), |e| e.member_id.clone()); + let new_leader_id = net[first].election_store + .lock_rsr() + .service_group("foobar.prod") + .map_rumor(Election::const_id(), |e| e.member_id.clone()); - assert_eq!(leader_id, new_leader_id, - "OLD: {:?}, NEW: {:?}", - leader_id, new_leader_id); - } + assert_eq!(leader_id, new_leader_id, + "OLD: {:?}, NEW: {:?}", + leader_id, new_leader_id); }