Skip to content

Commit

Permalink
test(kitsune2_gossip): Add tests to check that op data limits are res…
Browse files Browse the repository at this point in the history
…pected
  • Loading branch information
ThetaSinner committed Feb 4, 2025
1 parent ddd777d commit 1ba1134
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 9 deletions.
277 changes: 268 additions & 9 deletions crates/gossip/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ mod test {
}
})
.await
.unwrap()
.expect("Timed out waiting for ops")
}
}

Expand All @@ -516,6 +516,7 @@ mod test {
pub async fn create(
space: SpaceId,
bootstrap: bool,
config: Option<K2GossipConfig>,
) -> TestGossipFactory {
let mut builder =
default_test_builder().with_default_config().unwrap();
Expand All @@ -529,13 +530,18 @@ mod test {
builder
.config
.set_module_config(&K2GossipModConfig {
k2_gossip: K2GossipConfig {
initiate_interval_ms: 10,
min_initiate_interval_ms: 10,
..Default::default()
k2_gossip: if let Some(config) = config {
config
} else {
K2GossipConfig {
initiate_interval_ms: 10,
min_initiate_interval_ms: 10,
..Default::default()
}
},
})
.unwrap();

let builder = Arc::new(builder);

TestGossipFactory {
Expand Down Expand Up @@ -591,7 +597,8 @@ mod test {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(space.clone(), false).await;
let factory =
TestGossipFactory::create(space.clone(), false, None).await;
let harness_1 = factory.new_instance().await;
let agent_info_1 = harness_1.join_local_agent(DhtArc::FULL).await;

Expand Down Expand Up @@ -635,7 +642,8 @@ mod test {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(space.clone(), true).await;
let factory =
TestGossipFactory::create(space.clone(), true, None).await;
let harness_1 = factory.new_instance().await;
harness_1.join_local_agent(DhtArc::FULL).await;
let op_1 = MemoryOp::new(Timestamp::now(), vec![1; 128]);
Expand Down Expand Up @@ -670,7 +678,8 @@ mod test {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(space.clone(), false).await;
let factory =
TestGossipFactory::create(space.clone(), false, None).await;

let harness_1 = factory.new_instance().await;
let agent_info_1 = harness_1.join_local_agent(DhtArc::FULL).await;
Expand Down Expand Up @@ -742,7 +751,8 @@ mod test {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(space.clone(), false).await;
let factory =
TestGossipFactory::create(space.clone(), false, None).await;
let harness_1 = factory.new_instance().await;
let agent_info_1 = harness_1.join_local_agent(DhtArc::FULL).await;
let op_1 = MemoryOp::new(
Expand Down Expand Up @@ -813,4 +823,253 @@ mod test {
assert_eq!(1, received_ops.len());
assert_eq!(op_1, received_ops[0]);
}

#[tokio::test]
async fn respect_size_limit_for_new_ops() {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(
space.clone(),
true,
Some(K2GossipConfig {
max_gossip_op_bytes: 3 * 128,
// Set the initiate interval low so that gossip will start quickly after
// bootstrapping but leave the min initiate interval high so that we can
// run a single gossip round during the test.
initiate_interval_ms: 10,
..Default::default()
}),
)
.await;
let harness_1 = factory.new_instance().await;
harness_1.join_local_agent(DhtArc::FULL).await;

let mut ops = Vec::new();

for i in 0u8..5 {
let op = MemoryOp::new(Timestamp::now(), vec![i; 128]);
ops.push(op);
}

harness_1
.op_store
.process_incoming_ops(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();
harness_1
.space
.inform_ops_stored(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();

let harness_2 = factory.new_instance().await;
harness_2.join_local_agent(DhtArc::FULL).await;

// Just the first 3 ops should be received, the limit should prevent the next two from
// being sent.
harness_2
.wait_for_ops(
ops.iter().take(3).map(|op| op.compute_op_id()).collect(),
)
.await;

// We received the ones we wanted above, make sure it's just those 3.
let all_ops = harness_2
.op_store
.retrieve_ops(ops.iter().map(|op| op.compute_op_id()).collect())
.await
.unwrap();
assert_eq!(3, all_ops.len());
}

#[tokio::test]
async fn respect_size_limit_for_new_ops_and_dht_disc_diff() {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(
space.clone(),
true,
Some(K2GossipConfig {
max_gossip_op_bytes: 7 * 128,
// Set the initiate interval low so that gossip will start quickly after
// bootstrapping but leave the min initiate interval high so that we can
// run a single gossip round during the test.
initiate_interval_ms: 10,
..Default::default()
}),
)
.await;
let harness_1 = factory.new_instance().await;
let agent_info_1 = harness_1.join_local_agent(DhtArc::FULL).await;

let mut ops = Vec::new();

for i in 0u8..5 {
let op = MemoryOp::new(
Timestamp::from_micros(100 + i as i64),
vec![i; 128],
);
ops.push(op);
}
harness_1
.op_store
.process_incoming_ops(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();

let bookmark = Timestamp::now();

for i in 0u8..5 {
let op = MemoryOp::new(Timestamp::now(), vec![100 + i; 128]);
ops.push(op);
}

harness_1
.op_store
.process_incoming_ops(
ops.clone().into_iter().skip(5).map(Into::into).collect(),
)
.await
.unwrap();
harness_1
.space
.inform_ops_stored(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();

let harness_2 = factory.new_instance().await;

// Set a bookmark so that the first 5 ops are behind the bookmark and need a disc sync and
// the next 5 ops are ahead of the bookmark and a new ops sync
harness_2
.peer_meta_store
.set_new_ops_bookmark(agent_info_1.url.clone().unwrap(), bookmark)
.await
.unwrap();

// Now join an agent to the second harness so that we can send ops to it.
harness_2.join_local_agent(DhtArc::FULL).await;

// Just 2 historical and all 5 new ops should be sent.
harness_2
.wait_for_ops(
ops.iter()
.take(2)
.chain(ops.iter().skip(5))
.map(|op| op.compute_op_id())
.collect(),
)
.await;

// We received the ones we wanted above, make sure it's just those 7.
let all_ops = harness_2
.op_store
.retrieve_ops(ops.iter().map(|op| op.compute_op_id()).collect())
.await
.unwrap();
assert_eq!(7, all_ops.len());
}

#[tokio::test]
async fn respect_size_limit_for_new_ops_and_dht_ring_diff() {
enable_tracing();

let space = TEST_SPACE_ID;
let factory = TestGossipFactory::create(
space.clone(),
true,
Some(K2GossipConfig {
max_gossip_op_bytes: 7 * 128,
// Set the initiate interval low so that gossip will start quickly after
// bootstrapping but leave the min initiate interval high so that we can
// run a single gossip round during the test.
initiate_interval_ms: 10,
..Default::default()
}),
)
.await;
let harness_1 = factory.new_instance().await;
let agent_info_1 = harness_1.join_local_agent(DhtArc::FULL).await;

let mut ops = Vec::new();

for i in 0u8..5 {
let op = MemoryOp::new(
(Timestamp::now() - 2 * UNIT_TIME).unwrap(),
vec![i; 128],
);
ops.push(op);
}
harness_1
.op_store
.process_incoming_ops(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();

let bookmark = Timestamp::now();

for i in 0u8..5 {
let op = MemoryOp::new(Timestamp::now(), vec![100 + i; 128]);
ops.push(op);
}

harness_1
.op_store
.process_incoming_ops(
ops.clone().into_iter().skip(5).map(Into::into).collect(),
)
.await
.unwrap();
harness_1
.space
.inform_ops_stored(
ops.clone().into_iter().map(Into::into).collect(),
)
.await
.unwrap();

let harness_2 = factory.new_instance().await;

// Set a bookmark so that the first 5 ops are behind the bookmark and need a disc sync and
// the next 5 ops are ahead of the bookmark and a new ops sync
harness_2
.peer_meta_store
.set_new_ops_bookmark(agent_info_1.url.clone().unwrap(), bookmark)
.await
.unwrap();

// Now join an agent to the second harness so that we can send ops to it.
harness_2.join_local_agent(DhtArc::FULL).await;

// Just 2 historical and all 5 new ops should be sent.
harness_2
.wait_for_ops(
ops.iter()
.take(2)
.chain(ops.iter().skip(5))
.map(|op| op.compute_op_id())
.collect(),
)
.await;

// We received the ones we wanted above, make sure it's just those 7.
let all_ops = harness_2
.op_store
.retrieve_ops(ops.iter().map(|op| op.compute_op_id()).collect())
.await
.unwrap();
assert_eq!(7, all_ops.len());
}
}
7 changes: 7 additions & 0 deletions crates/gossip/src/respond/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ impl K2Gossip {
// Update the peer's max op data bytes to reflect the amount of data we're sending ids for.
// The remaining limit will be used for the DHT diff as required.
if let Some(state) = lock.as_mut() {
tracing::debug!(
"Used {}/{} op budget to send {} op ids",
used_bytes,
accept.max_op_data_bytes,
send_new_ops.len()
);

// Note that this value will have been initialised to 0 here when we created the
// initial state. So we need to initialise and subtract here.
state.peer_max_op_data_bytes = std::cmp::min(
Expand Down
5 changes: 5 additions & 0 deletions crates/gossip/src/respond/disc_sector_details_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ impl K2Gossip {
.await?;

if let Some(state) = state.as_mut() {
tracing::debug!(
"Used {}/{} op budget to send disc ops",
used_bytes,
state.peer_max_op_data_bytes,
);
state.peer_max_op_data_bytes -= used_bytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ impl K2Gossip {
)
.await?;

tracing::debug!(
"Used {}/{} op budget to send disc ops",
used_bytes,
state.peer_max_op_data_bytes,
);
state.peer_max_op_data_bytes -= used_bytes;

match next_action {
Expand Down
6 changes: 6 additions & 0 deletions crates/gossip/src/respond/initiate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ impl K2Gossip {

// Update the peer's max op data bytes to reflect the amount of data we're sending ids for.
// The remaining limit will be used for the DHT diff as required.
tracing::debug!(
"Used {}/{} op budget to send {} op ids",
used_bytes,
initiate.max_op_data_bytes,
new_ops.len()
);
state.peer_max_op_data_bytes -= used_bytes;

Ok(Some(GossipMessage::Accept(K2GossipAcceptMessage {
Expand Down

0 comments on commit 1ba1134

Please sign in to comment.