-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RGS v2: NodeAnnouncement Delta Serialization #76
Changes from all commits
1fd6c67
529c666
74dd28d
dcec600
fe33964
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL; | |
use crate::lookup::DeltaSet; | ||
|
||
use crate::persistence::GossipPersister; | ||
use crate::serialization::UpdateSerialization; | ||
use crate::serialization::{SerializationSet, UpdateSerialization}; | ||
use crate::snapshot::Snapshotter; | ||
use crate::types::RGSSLogger; | ||
|
||
|
@@ -49,7 +49,7 @@ mod tests; | |
/// sync formats arise in the future. | ||
/// | ||
/// The fourth byte is the protocol version in case our format gets updated. | ||
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; | ||
const GOSSIP_PREFIX: [u8; 3] = [76, 68, 75]; | ||
|
||
pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger { | ||
network_graph: Arc<NetworkGraph<L>>, | ||
|
@@ -59,7 +59,13 @@ pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger { | |
pub struct SerializedResponse { | ||
pub data: Vec<u8>, | ||
pub message_count: u32, | ||
pub announcement_count: u32, | ||
pub node_announcement_count: u32, | ||
/// Despite the name, the count of node announcements that have associated updates, be those | ||
/// features, addresses, or both | ||
pub node_update_count: u32, | ||
pub node_feature_update_count: u32, | ||
pub node_address_update_count: u32, | ||
pub channel_announcement_count: u32, | ||
pub update_count: u32, | ||
pub update_count_full: u32, | ||
pub update_count_incremental: u32, | ||
|
@@ -171,18 +177,31 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> { | |
blob | ||
} | ||
|
||
async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> SerializedResponse where L::Target: Logger { | ||
async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> SerializationSet where L::Target: Logger { | ||
let client = connect_to_db().await; | ||
|
||
network_graph.remove_stale_channels_and_tracking(); | ||
|
||
let mut output: Vec<u8> = vec![]; | ||
let snapshot_interval = config::snapshot_generation_interval(); | ||
|
||
// set a flag if the chain hash is prepended | ||
// chain hash only necessary if either channel announcements or non-incremental updates are present | ||
// for announcement-free incremental-only updates, chain hash can be skipped | ||
|
||
let mut delta_set = DeltaSet::new(); | ||
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; | ||
log_info!(logger, "announcement channel count: {}", delta_set.len()); | ||
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; | ||
log_info!(logger, "update-fetched channel count: {}", delta_set.len()); | ||
let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await; | ||
log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); | ||
lookup::filter_delta_set(&mut delta_set, logger.clone()); | ||
log_info!(logger, "update-filtered channel count: {}", delta_set.len()); | ||
serialization::serialize_delta_set(delta_set, node_delta_set, last_sync_timestamp) | ||
} | ||
|
||
fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, serialization_version: u8, logger: L) -> SerializedResponse where L::Target: Logger { | ||
let mut output: Vec<u8> = vec![]; | ||
let snapshot_interval = config::snapshot_generation_interval(); | ||
|
||
let mut node_id_set: HashSet<NodeId> = HashSet::new(); | ||
let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new(); | ||
let mut node_ids: Vec<NodeId> = Vec::new(); | ||
|
@@ -199,21 +218,12 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
node_id_indices[&node_id] | ||
}; | ||
|
||
let mut delta_set = DeltaSet::new(); | ||
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; | ||
log_info!(logger, "announcement channel count: {}", delta_set.len()); | ||
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; | ||
log_info!(logger, "update-fetched channel count: {}", delta_set.len()); | ||
lookup::filter_delta_set(&mut delta_set, logger.clone()); | ||
log_info!(logger, "update-filtered channel count: {}", delta_set.len()); | ||
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); | ||
|
||
// process announcements | ||
// write the number of channel announcements to the output | ||
let announcement_count = serialization_details.announcements.len() as u32; | ||
announcement_count.write(&mut output).unwrap(); | ||
let mut previous_announcement_scid = 0; | ||
for current_announcement in serialization_details.announcements { | ||
for current_announcement in &serialization_details.announcements { | ||
let id_index_1 = get_node_id_index(current_announcement.node_id_1); | ||
let id_index_2 = get_node_id_index(current_announcement.node_id_2); | ||
let mut stripped_announcement = serialization::serialize_stripped_channel_announcement(¤t_announcement, id_index_1, id_index_2, previous_announcement_scid); | ||
|
@@ -227,7 +237,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
let update_count = serialization_details.updates.len() as u32; | ||
update_count.write(&mut output).unwrap(); | ||
|
||
let default_update_values = serialization_details.full_update_defaults; | ||
let default_update_values = &serialization_details.full_update_defaults; | ||
if update_count > 0 { | ||
default_update_values.cltv_expiry_delta.write(&mut output).unwrap(); | ||
default_update_values.htlc_minimum_msat.write(&mut output).unwrap(); | ||
|
@@ -238,7 +248,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
|
||
let mut update_count_full = 0; | ||
let mut update_count_incremental = 0; | ||
for current_update in serialization_details.updates { | ||
for current_update in &serialization_details.updates { | ||
match ¤t_update { | ||
UpdateSerialization::Full(_) => { | ||
update_count_full += 1; | ||
|
@@ -258,6 +268,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
let message_count = announcement_count + update_count; | ||
|
||
let mut prefixed_output = GOSSIP_PREFIX.to_vec(); | ||
prefixed_output.push(serialization_version); | ||
|
||
// always write the chain hash | ||
serialization_details.chain_hash.write(&mut prefixed_output).unwrap(); | ||
|
@@ -267,11 +278,97 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds); | ||
serialized_seen_timestamp.write(&mut prefixed_output).unwrap(); | ||
|
||
if serialization_version >= 2 { // serialize the most common node features | ||
for mutated_node_id in serialization_details.node_mutations.keys() { | ||
// consider mutated nodes outside channel announcements | ||
get_node_id_index(mutated_node_id.clone()); | ||
} | ||
|
||
let default_feature_count = serialization_details.node_announcement_feature_defaults.len() as u8; | ||
debug_assert!(default_feature_count <= config::NODE_DEFAULT_FEATURE_COUNT, "Default feature count cannot exceed maximum"); | ||
default_feature_count.write(&mut prefixed_output).unwrap(); | ||
|
||
for current_feature in &serialization_details.node_announcement_feature_defaults { | ||
current_feature.write(&mut prefixed_output).unwrap(); | ||
} | ||
} | ||
|
||
let node_id_count = node_ids.len() as u32; | ||
node_id_count.write(&mut prefixed_output).unwrap(); | ||
|
||
let mut node_update_count = 0u32; | ||
let mut node_feature_update_count = 0u32; | ||
let mut node_address_update_count = 0u32; | ||
|
||
for current_node_id in node_ids { | ||
arik-so marked this conversation as resolved.
Show resolved
Hide resolved
|
||
current_node_id.write(&mut prefixed_output).unwrap(); | ||
let mut current_node_delta_serialization: Vec<u8> = Vec::new(); | ||
current_node_id.write(&mut current_node_delta_serialization).unwrap(); | ||
|
||
if serialization_version >= 2 { | ||
if let Some(node_delta) = serialization_details.node_mutations.get(¤t_node_id) { | ||
/* | ||
Bitmap: | ||
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so bit 6 is now unused? Should we clearly define it to mean something? Not sure what it would mean but worth mentioning. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could use it for more feature defaults? Frankly, I think we can leave it unused on the server, and decide its interpretation on the client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we definitely don't have to interpret it here before merging this PR, I'm just suggesting we think on it. |
||
5-3: index of new features among default (1-6). If index is 7 (all 3 bits are set, it's | ||
outside the present default range). 0 means no feature changes. | ||
2: addresses have changed | ||
|
||
1: used for all keys | ||
0: used for odd keys | ||
*/ | ||
|
||
if node_delta.has_address_set_changed { | ||
node_address_update_count += 1; | ||
|
||
let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses; | ||
let mut address_serialization = Vec::new(); | ||
|
||
// we don't know a priori how many are <= 255 bytes | ||
let mut total_address_count = 0u8; | ||
|
||
for address in address_set.iter() { | ||
if total_address_count == u8::MAX { | ||
// don't serialize more than 255 addresses | ||
break; | ||
} | ||
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { | ||
total_address_count += 1; | ||
serialized_length.write(&mut address_serialization).unwrap(); | ||
address.write(&mut address_serialization).unwrap(); | ||
}; | ||
} | ||
|
||
if total_address_count > 0 { | ||
// signal the presence of node addresses | ||
current_node_delta_serialization[0] |= 1 << 2; | ||
// serialize the actual addresses and count | ||
total_address_count.write(&mut current_node_delta_serialization).unwrap(); | ||
current_node_delta_serialization.append(&mut address_serialization); | ||
} | ||
} | ||
|
||
if node_delta.has_feature_set_changed { | ||
node_feature_update_count += 1; | ||
|
||
let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features; | ||
|
||
// are these features among the most common ones? | ||
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { | ||
// this feature set is among the 6 defaults | ||
arik-so marked this conversation as resolved.
Show resolved
Hide resolved
|
||
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; | ||
} else { | ||
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 | ||
latest_features.write(&mut current_node_delta_serialization).unwrap(); | ||
} | ||
} | ||
|
||
if node_delta.has_address_set_changed || node_delta.has_feature_set_changed { | ||
node_update_count += 1; | ||
} | ||
} | ||
} | ||
|
||
prefixed_output.append(&mut current_node_delta_serialization); | ||
} | ||
|
||
prefixed_output.append(&mut output); | ||
|
@@ -282,7 +379,11 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, | |
SerializedResponse { | ||
data: prefixed_output, | ||
message_count, | ||
announcement_count, | ||
node_announcement_count: node_id_count, | ||
node_update_count, | ||
node_feature_update_count, | ||
node_address_update_count, | ||
channel_announcement_count: announcement_count, | ||
update_count, | ||
update_count_full, | ||
update_count_incremental, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Probably best to return the sets in all of these instead of passing in a
&mut
, but can be done in a follow-up.