Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agadgil/chef 15608 probe ping #9420

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion components/butterfly/protocols/swim.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message Member {
optional int32 gossip_port = 5;
optional bool persistent = 6 [default = false];
optional bool departed = 7 [default = false];
optional bool probe_ping = 8 [default = false];
}

message Ping {
Expand All @@ -22,6 +23,10 @@ message Ack {
optional Member forward_to = 2;
}

message ProbePing {
optional Member from = 1;
}

message PingReq {
optional Member from = 1;
optional Member target = 2;
Expand All @@ -35,14 +40,15 @@ message Membership {
}

message Swim {
enum Type { PING = 1; ACK = 2; PINGREQ = 3; };
enum Type { PING = 1; ACK = 2; PINGREQ = 3; PROBEPING = 4; };

// Identifies which field is filled in.
required Type type = 1;
oneof payload {
Ping ping = 2;
Ack ack = 3;
PingReq pingreq = 4;
ProbePing probeping = 6;
}
repeated Membership membership = 5;
}
Expand Down
2 changes: 2 additions & 0 deletions components/butterfly/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub mod rumor;
pub mod server;
pub mod swim;

pub mod probe_list;

pub use crate::server::Server;
use lazy_static::lazy_static;
use std::cell::UnsafeCell;
Expand Down
49 changes: 41 additions & 8 deletions components/butterfly/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ lazy_static! {
///
/// Note: we're intentionally deriving `Copy` to be able to treat this
/// like a "normal" numeric type.
#[derive(Clone, Debug, Ord, PartialEq, PartialOrd, Eq, Copy, Default)]
#[derive(Clone, Debug, Ord, PartialEq, PartialOrd, Eq, Copy, Default, Hash)]
pub struct Incarnation(u64);

impl From<u64> for Incarnation {
Expand Down Expand Up @@ -128,7 +128,7 @@ pub type UuidSimple = String;

/// A member in the swim group. Passes most of its functionality along to the internal protobuf
/// representation.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Member {
pub id: String,
pub incarnation: Incarnation,
Expand All @@ -137,6 +137,7 @@ pub struct Member {
pub gossip_port: u16,
pub persistent: bool,
pub departed: bool,
pub probe_ping: bool,
}

impl Member {
Expand Down Expand Up @@ -170,7 +171,8 @@ impl Default for Member {
swim_port: 0,
gossip_port: 0,
persistent: false,
departed: false, }
departed: false,
probe_ping: false, }
}
}

Expand All @@ -194,7 +196,8 @@ impl From<Member> for proto::Member {
swim_port: Some(value.swim_port.into()),
gossip_port: Some(value.gossip_port.into()),
persistent: Some(value.persistent),
departed: Some(value.departed), }
departed: Some(value.departed),
probe_ping: Some(value.probe_ping), }
}
}

Expand Down Expand Up @@ -305,7 +308,8 @@ impl FromProto<proto::Member> for Member {
.and_then(as_port)
.ok_or(Error::ProtocolMismatch("gossip-port"))?,
persistent: proto.persistent.unwrap_or(false),
departed: proto.departed.unwrap_or(false), })
departed: proto.departed.unwrap_or(false),
probe_ping: proto.probe_ping.unwrap_or(false), })
}
}

Expand Down Expand Up @@ -499,30 +503,49 @@ impl MemberList {
// TODO (CM): why don't we just insert a membership record here?
pub fn insert_mlw(&self, incoming_member: Member, incoming_health: Health) -> bool {
self.insert_membership_mlw(Membership { member: incoming_member,
health: incoming_health, })
health: incoming_health, },
false)
}

/// # Locking (see locking.md)
/// * `MemberList::entries` (write)
fn insert_membership_mlw(&self, incoming: Membership) -> bool {
fn insert_membership_mlw(&self,
incoming: Membership,
ignore_incarnation_and_health: bool)
-> bool {
// Is this clone necessary, or can a key be a reference to a field contained in the value?
// Maybe the members we store should not contain the ID to reduce the duplication?
trace!("insert_membership_mlw: Member: {}, Health: {}",
incoming.member.id,
incoming.health);
let modified = match self.write_entries().entry(incoming.member.id.clone()) {
hash_map::Entry::Occupied(mut entry) => {
let val = entry.get_mut();
if incoming.newer_or_less_healthy_than(val.member.incarnation, val.health) {
if incoming.newer_or_less_healthy_than(val.member.incarnation, val.health)
|| (ignore_incarnation_and_health && val.health != incoming.health)
{
trace!("++ current health: {}, incoming health: {}",
val.health,
incoming.health);
*val = member_list::Entry { member: incoming.member,
health: incoming.health,
health_updated_at: Instant::now(), };
trace!("Occupied: Updated");
true
} else {
trace!("-- current health: {}, incoming health: {}, incarnation: {}, ",
val.health,
incoming.health,
val.member.incarnation);
trace!("Occupied: Not Updated");
false
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert(member_list::Entry { member: incoming.member,
health: incoming.health,
health_updated_at: Instant::now(), });
trace!("Empty: Created!");
true
}
};
Expand All @@ -535,6 +558,15 @@ impl MemberList {
modified
}

pub(crate) fn insert_member_ignore_incarnation_mlw(&self,
incoming_member: Member,
incoming_health: Health)
-> bool {
self.insert_membership_mlw(Membership { member: incoming_member,
health: incoming_health, },
true)
}

/// # Locking (see locking.md)
/// * `MemberList::entries` (write)
pub fn set_departed_mlw(&self, member_id: &str) {
Expand Down Expand Up @@ -670,6 +702,7 @@ impl MemberList {
.cloned()
.collect();
members.shuffle(&mut thread_rng());

members
}

Expand Down
22 changes: 22 additions & 0 deletions components/butterfly/src/probe_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Implementation of a probe list - A list of Members to be 'probe'd when reported as `Confirmed`.

use std::collections::HashSet;

use habitat_common::sync::{Lock,
ReadGuard,
WriteGuard};

use crate::member::Member;

#[derive(Debug)]
pub struct ProbeList {
members: Lock<HashSet<Member>>,
}

impl ProbeList {
pub fn new() -> Self { Self { members: Lock::new(HashSet::new()), } }

pub fn members_read(&self) -> ReadGuard<'_, HashSet<Member>> { self.members.read() }

pub fn members_write(&self) -> WriteGuard<'_, HashSet<Member>> { self.members.write() }
}
8 changes: 8 additions & 0 deletions components/butterfly/src/rumor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ mod storage {
}
}

impl<'a, E: ElectionRumor> IterableGuard<'a, RumorMap<E>> {
pub fn get_member_id(&self, service_group: &str) -> Option<&str> {
self.get(service_group)
.map(|sg| sg.get(E::const_id()).map(ElectionRumor::member_id))
.unwrap_or(None)
}
}

/// Allows ergonomic use of the guard for accessing the guarded `RumorMap`:
/// ```
/// # use habitat_butterfly::rumor::{Departure,
Expand Down
12 changes: 10 additions & 2 deletions components/butterfly/src/rumor/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,16 @@ impl Rumor for Election {
*self = other;
true
} else if other.term == self.term && self.status == ElectionStatus::Finished {
debug!("stored rumor is finished and received rumor is for same term; nothing to do");
false
if other.status == ElectionStatus::Finished {
debug!("stored rumor is finished and received rumor is for same term; nothing to \
do");
false
} else {
debug!("stored rumor is finished and received rumor is for same term; Received \
rumor is not 'Finished'. Taking their votes and sharing ourselves.");
self.steal_votes(&mut other);
true
}
} else if self.term > other.term {
debug!("stored rumor represents a newer term than received; keep sharing it");
true
Expand Down
Loading
Loading