Skip to content

Commit

Permalink
Removed the dynamic config mechanism which was unused.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 authored and rnijveld committed Apr 18, 2024
1 parent 76b9009 commit 7351996
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 149 deletions.
109 changes: 27 additions & 82 deletions ntp-proto/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,6 @@ impl Peer {
}
}

pub fn update_config(&mut self, peer_defaults_config: SourceDefaultsConfig) {
self.peer_defaults_config = peer_defaults_config;
}

pub fn current_poll_interval(&self, system: SystemSnapshot) -> PollInterval {
system
.time_snapshot
Expand All @@ -435,7 +431,6 @@ impl Peer {
&mut self,
buf: &'a mut [u8],
system: SystemSnapshot,
peer_defaults_config: &SourceDefaultsConfig,
) -> Result<&'a [u8], PollError> {
if !self.reach.is_reachable() && self.tries >= STARTUP_TRIES_THRESHOLD {
return Err(PollError::PeerUnreachable);
Expand Down Expand Up @@ -481,7 +476,7 @@ impl Peer {
self.current_request_identifier = Some((identifier, NtpInstant::now() + POLL_WINDOW));

// Ensure we don't spam the remote with polls if it is not reachable
self.backoff_interval = poll_interval.inc(peer_defaults_config.poll_interval_limits);
self.backoff_interval = poll_interval.inc(self.peer_defaults_config.poll_interval_limits);

#[cfg(feature = "ntpv5")]
if let NtpHeader::V5(header) = packet.header() {
Expand Down Expand Up @@ -938,9 +933,7 @@ mod test {

let prev = peer.current_poll_interval(system);
let mut buf = [0; 1024];
let packetbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let packetbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let packet = NtpPacket::deserialize(packetbuf, &NoCipher).unwrap().0;
assert!(peer.current_poll_interval(system) > prev);
let mut response = NtpPacket::test();
Expand All @@ -960,9 +953,7 @@ mod test {

let prev = peer.current_poll_interval(system);
let mut buf = [0; 1024];
let packetbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let packetbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let packet = NtpPacket::deserialize(packetbuf, &NoCipher).unwrap().0;
assert!(peer.current_poll_interval(system) > prev);
let mut response = NtpPacket::test();
Expand Down Expand Up @@ -990,9 +981,7 @@ mod test {

let system = SystemSnapshot::default();
let mut buf = [0; 1024];
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
Expand Down Expand Up @@ -1028,17 +1017,11 @@ mod test {
let mut peer = Peer::test_peer();
let system = SystemSnapshot::default();
let mut buf = [0; 1024];
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(matches!(
peer.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default()),
peer.generate_poll_message(&mut buf, system),
Err(PollError::PeerUnreachable)
));
}
Expand All @@ -1050,9 +1033,7 @@ mod test {

let system = SystemSnapshot::default();
let mut buf = [0; 1024];
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
Expand All @@ -1071,32 +1052,16 @@ mod test {
)
.is_ok());

assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(peer.generate_poll_message(&mut buf, system).is_ok());
assert!(matches!(
peer.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default()),
peer.generate_poll_message(&mut buf, system),
Err(PollError::PeerUnreachable)
));
}
Expand All @@ -1108,9 +1073,7 @@ mod test {

let system = SystemSnapshot::default();
let mut buf = [0; 1024];
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
Expand Down Expand Up @@ -1164,9 +1127,7 @@ mod test {
let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
let mut buf = [0; 1024];
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
packet.set_reference_id(ReferenceId::KISS_RSTR);
packet.set_origin_timestamp(outgoing.transmit_timestamp());
Expand Down Expand Up @@ -1199,9 +1160,7 @@ mod test {

let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
packet.set_reference_id(ReferenceId::KISS_DENY);
packet.set_origin_timestamp(outgoing.transmit_timestamp());
Expand Down Expand Up @@ -1237,9 +1196,7 @@ mod test {
let mut packet = NtpPacket::test();
let system = SystemSnapshot::default();
let mut buf = [0; 1024];
let outgoingbuf = peer
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let outgoingbuf = peer.generate_poll_message(&mut buf, system).unwrap();
let outgoing = NtpPacket::deserialize(outgoingbuf, &NoCipher).unwrap().0;
packet.set_reference_id(ReferenceId::KISS_RATE);
packet.set_origin_timestamp(outgoing.transmit_timestamp());
Expand All @@ -1262,7 +1219,6 @@ mod test {
let mut peer = Peer::test_peer();
let mut buf = [0; 1024];
let system = SystemSnapshot::default();
let peer_defaults_config = SourceDefaultsConfig::default();
let clock = TestClock {};

assert!(matches!(
Expand All @@ -1271,9 +1227,7 @@ mod test {
));

for _ in 0..8 {
let poll = peer
.generate_poll_message(&mut buf, system, &peer_defaults_config)
.unwrap();
let poll = peer.generate_poll_message(&mut buf, system).unwrap();

let poll_len: usize = poll.len();
let (poll, _) = NtpPacket::deserialize(poll, &NoCipher).unwrap();
Expand All @@ -1299,9 +1253,7 @@ mod test {
.unwrap();
}

let poll = peer
.generate_poll_message(&mut buf, system, &peer_defaults_config)
.unwrap();
let poll = peer.generate_poll_message(&mut buf, system).unwrap();
let (poll, _) = NtpPacket::deserialize(poll, &NoCipher).unwrap();
assert_eq!(poll.version(), 4);
assert!(!poll.is_upgrade());
Expand All @@ -1313,17 +1265,14 @@ mod test {
let mut peer = Peer::test_peer();
let mut buf = [0; 1024];
let system = SystemSnapshot::default();
let peer_defaults_config = SourceDefaultsConfig::default();
let clock = TestClock {};

assert!(matches!(
peer.protocol_version,
ProtocolVersion::V4UpgradingToV5 { .. }
));

let poll = peer
.generate_poll_message(&mut buf, system, &peer_defaults_config)
.unwrap();
let poll = peer.generate_poll_message(&mut buf, system).unwrap();

let poll_len = poll.len();
let (poll, _) = NtpPacket::deserialize(poll, &NoCipher).unwrap();
Expand All @@ -1348,9 +1297,7 @@ mod test {
// We should have received a upgrade response and updated to NTPv5
assert!(matches!(peer.protocol_version, ProtocolVersion::V5));

let poll = peer
.generate_poll_message(&mut buf, system, &peer_defaults_config)
.unwrap();
let poll = peer.generate_poll_message(&mut buf, system).unwrap();
let (poll, _) = NtpPacket::deserialize(poll, &NoCipher).unwrap();
assert_eq!(poll.version(), 5);
}
Expand All @@ -1376,9 +1323,7 @@ mod test {

while client.bloom_filter.full_filter().is_none() && tries < 100 {
let mut buf = [0; 1024];
let req = client
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
.unwrap();
let req = client.generate_poll_message(&mut buf, system).unwrap();

let (req, _) = NtpPacket::deserialize(req, &NoCipher).unwrap();
let response =
Expand Down
30 changes: 5 additions & 25 deletions ntpd/src/daemon/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::{future::Future, marker::PhantomData, net::SocketAddr, pin::Pin};

use ntp_proto::{
IgnoreReason, Measurement, NtpClock, NtpInstant, NtpTimestamp, Peer, PeerNtsData, PeerSnapshot,
PollError, ProtocolVersion, SourceDefaultsConfig, SynchronizationConfig, SystemSnapshot,
Update,
PollError, ProtocolVersion, SourceDefaultsConfig, SystemSnapshot, Update,
};
use rand::{thread_rng, Rng};
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -49,8 +48,6 @@ pub enum MsgForSystem {
pub struct PeerChannels {
pub msg_for_system_sender: tokio::sync::mpsc::Sender<MsgForSystem>,
pub system_snapshot_receiver: tokio::sync::watch::Receiver<SystemSnapshot>,
pub synchronization_config_receiver: tokio::sync::watch::Receiver<SynchronizationConfig>,
pub source_defaults_config_receiver: tokio::sync::watch::Receiver<SourceDefaultsConfig>,
}

pub(crate) struct PeerTask<C: 'static + NtpClock + Send, T: Wait> {
Expand Down Expand Up @@ -117,17 +114,9 @@ where

async fn handle_poll(&mut self, poll_wait: &mut Pin<&mut T>) -> PollResult {
let system_snapshot = *self.channels.system_snapshot_receiver.borrow();
let peer_defaults_snapshot_system = *self
.channels
.source_defaults_config_receiver
.borrow_and_update();

let mut buf = [0; 1024];
let packet = match self.peer.generate_poll_message(
&mut buf,
system_snapshot,
&peer_defaults_snapshot_system,
) {
let packet = match self.peer.generate_poll_message(&mut buf, system_snapshot) {
Ok(packet) => packet,
Err(PollError::Io(e)) => {
warn!(error = ?e, "Could not generate poll message");
Expand Down Expand Up @@ -317,9 +306,6 @@ where
AcceptResult::Ignore => {},
}
},
_ = self.channels.synchronization_config_receiver.changed(), if self.channels.synchronization_config_receiver.has_changed().is_ok() => {
self.peer.update_config(*self.channels.source_defaults_config_receiver.borrow_and_update());
},
}
}
}
Expand All @@ -337,14 +323,14 @@ where
interface: Option<InterfaceName>,
clock: C,
timestamp_mode: TimestampMode,
mut channels: PeerChannels,
channels: PeerChannels,
protocol_version: ProtocolVersion,
config_snapshot: SourceDefaultsConfig,
nts: Option<Box<PeerNtsData>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(
(async move {
let local_clock_time = NtpInstant::now();
let config_snapshot = *channels.source_defaults_config_receiver.borrow_and_update();
let peer = if let Some(nts) = nts {
Peer::new_nts(
source_addr,
Expand Down Expand Up @@ -577,17 +563,13 @@ mod tests {
.unwrap();

let (_, system_snapshot_receiver) = tokio::sync::watch::channel(SystemSnapshot::default());
let (_, synchronization_config_receiver) =
tokio::sync::watch::channel(SynchronizationConfig::default());
let (_, mut peer_defaults_config_receiver) =
tokio::sync::watch::channel(SourceDefaultsConfig::default());
let (msg_for_system_sender, msg_for_system_receiver) = mpsc::channel(1);

let local_clock_time = NtpInstant::now();
let peer = Peer::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, port_base)),
local_clock_time,
*peer_defaults_config_receiver.borrow_and_update(),
SourceDefaultsConfig::default(),
ProtocolVersion::default(),
);

Expand All @@ -598,8 +580,6 @@ mod tests {
channels: PeerChannels {
msg_for_system_sender,
system_snapshot_receiver,
synchronization_config_receiver,
source_defaults_config_receiver: peer_defaults_config_receiver,
},
source_addr: SocketAddr::from((Ipv4Addr::LOCALHOST, port_base)),
interface: None,
Expand Down
Loading

0 comments on commit 7351996

Please sign in to comment.