From c02f2ec1c77c99e5bf2523825cd5756930601204 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 20 Nov 2024 22:48:18 +0000 Subject: [PATCH 1/2] Store TXO values --- src/config.rs | 6 +++++- src/downloader.rs | 11 ++++++++++- src/persistence.rs | 10 +++++++--- src/tests/mod.rs | 26 +++++++++++++------------- src/types.rs | 2 +- src/verifier.rs | 27 +++++++++++++++++++++++---- 6 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/config.rs b/src/config.rs index 54c7b01..82b90e6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; -pub(crate) const SCHEMA_VERSION: i32 = 14; +pub(crate) const SCHEMA_VERSION: i32 = 15; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks // generate symlinks based on a 3-hour-granularity @@ -120,6 +120,7 @@ pub(crate) fn db_announcement_table_creation_query() -> &'static str { "CREATE TABLE IF NOT EXISTS channel_announcements ( id SERIAL PRIMARY KEY, short_channel_id bigint NOT NULL UNIQUE, + funding_amount_sats bigint NOT NULL, announcement_signed BYTEA, seen timestamp NOT NULL DEFAULT NOW() )" @@ -313,6 +314,9 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap(); tx.commit().await.unwrap(); } + if schema >= 1 && schema <= 14 { + client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap(); + } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); } diff --git a/src/downloader.rs b/src/downloader.rs index 49e3019..696ae43 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -64,7 +64,16 @@ impl GossipRouter where L::Target: Logger { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); + let mut funding_amount_sats = self.verifier.get_cached_funding_value(msg.contents.short_channel_id); + if funding_amount_sats.is_none() { + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async { + funding_amount_sats = self.verifier.retrieve_funding_value(msg.contents.short_channel_id).await.ok(); + })}); + } + let funding_amount_sats = funding_amount_sats + .expect("If we've accepted a ChannelAnnouncement, we must be able to fetch the TXO for it"); + + let gossip_message = GossipMessage::ChannelAnnouncement(msg, funding_amount_sats, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { diff --git a/src/persistence.rs b/src/persistence.rs index 0db3091..da259dc 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -185,7 +185,7 @@ impl GossipPersister where L::Target: Logger { #[cfg(test)] tasks_spawned.push(_task); }, - GossipMessage::ChannelAnnouncement(announcement, seen_override) => { + GossipMessage::ChannelAnnouncement(announcement, funding_value, seen_override) => { let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori @@ -197,10 +197,12 @@ impl GossipPersister where L::Target: Logger { tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_announcements (\ short_channel_id, \ + funding_amount_sats, \ announcement_signed, \ seen \ - ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[ + ) VALUES ($1, $2, $3, TO_TIMESTAMP($4)) ON CONFLICT (short_channel_id) DO NOTHING", &[ &scid, + &(funding_value as i64), &announcement_signed, &(seen_override.unwrap() as f64) ])).await.unwrap().unwrap(); @@ -208,9 +210,11 @@ impl GossipPersister where L::Target: Logger { tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_announcements (\ short_channel_id, \ + funding_amount_sats, \ announcement_signed \ - ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ + ) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[ &scid, + &(funding_value as i64), &announcement_signed ])).await.unwrap().unwrap(); } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 3fc37cc..4eacf66 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -242,7 +242,7 @@ async fn test_trivial_setup() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap(); drop(receiver); @@ -357,7 +357,7 @@ async fn test_node_announcement_delta_detection() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); } @@ -450,7 +450,7 @@ async fn test_unidirectional_intermediate_update_consideration() { network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap(); @@ -521,7 +521,7 @@ async fn test_bidirectional_intermediate_update_consideration() { network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap(); @@ -577,7 +577,7 @@ async fn test_channel_reminders() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); } @@ -598,7 +598,7 @@ async fn test_channel_reminders() { network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap(); receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap(); @@ -653,7 +653,7 @@ async fn test_full_snapshot_recency() { let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); { // direction false { // first update @@ -734,7 +734,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() { let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); { // direction false { // first update, seen latest @@ -815,7 +815,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() { let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); { // direction false // apply updates in their timestamp order @@ -898,7 +898,7 @@ async fn test_full_snapshot_mutiny_scenario() { let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); { // direction false { @@ -1036,13 +1036,13 @@ async fn test_full_snapshot_interlaced_channel_timestamps() { { // main channel let announcement = generate_channel_announcement(main_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); } { // secondary channel let announcement = generate_channel_announcement(secondary_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); } { // main channel @@ -1145,7 +1145,7 @@ async fn test_full_snapshot_persistence() { let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); let announcement = generate_channel_announcement(short_channel_id); network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap(); { // direction true let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10); diff --git a/src/types.rs b/src/types.rs index f38a376..b068448 100644 --- a/src/types.rs +++ b/src/types.rs @@ -16,7 +16,7 @@ pub(crate) type GossipPeerManager = Arc), // the second element is an optional override for the seen value - ChannelAnnouncement(ChannelAnnouncement, Option), + ChannelAnnouncement(ChannelAnnouncement, u64, Option), ChannelUpdate(ChannelUpdate, Option), } diff --git a/src/verifier.rs b/src/verifier.rs index 8256916..a0d5442 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io::ErrorKind; use std::ops::Deref; use std::sync::Arc; @@ -23,6 +24,9 @@ pub(crate) struct ChainVerifier where graph: Arc>, outbound_gossiper: Arc>, Arc, L>>, peer_handler: Mutex>>, + /// A cache on the funding amounts for each channel that we've looked up, mapping from SCID to + /// funding satoshis. + channel_funding_amounts: Arc>>, logger: L } @@ -35,14 +39,24 @@ impl ChainVerifier where L::Target: outbound_gossiper, graph, peer_handler: Mutex::new(None), - logger + channel_funding_amounts: Arc::new(Mutex::new(HashMap::new())), + logger, } } pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { *self.peer_handler.lock().unwrap() = Some(peer_handler); } - async fn retrieve_utxo(client: Arc, short_channel_id: u64, logger: L) -> Result { + pub(crate) fn get_cached_funding_value(&self, scid: u64) -> Option { + self.channel_funding_amounts.lock().unwrap().get(&scid).map(|v| *v) + } + + pub(crate) async fn retrieve_funding_value(&self, scid: u64) -> Result { + Self::retrieve_cache_txo(Arc::clone(&self.rest_client), Some(Arc::clone(&self.channel_funding_amounts)), scid, self.logger.clone()) + .await.map(|txo| txo.value.to_sat()) + } + + async fn retrieve_cache_txo(client: Arc, channel_funding_amounts: Option>>>, short_channel_id: u64, logger: L) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; let output_index = (short_channel_id & 0xffff) as u16; @@ -57,7 +71,11 @@ impl ChainVerifier where L::Target: log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.compute_txid()); return Err(UtxoLookupError::UnknownTx); } - Ok(transaction.output.swap_remove(output_index as usize)) + let txo = transaction.output.swap_remove(output_index as usize); + if let Some(channel_funding_amounts) = channel_funding_amounts { + channel_funding_amounts.lock().unwrap().insert(short_channel_id, txo.value.to_sat()); + } + Ok(txo) } async fn retrieve_block(client: Arc, block_height: u32, logger: L) -> Result { @@ -99,10 +117,11 @@ impl UtxoLookup for ChainVerifier w let graph_ref = Arc::clone(&self.graph); let client_ref = Arc::clone(&self.rest_client); let gossip_ref = Arc::clone(&self.outbound_gossiper); + let channel_funding_amounts_cache_ref = Arc::clone(&self.channel_funding_amounts); let pm_ref = self.peer_handler.lock().unwrap().clone(); let logger_ref = self.logger.clone(); tokio::spawn(async move { - let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await; + let res = Self::retrieve_cache_txo(client_ref, Some(channel_funding_amounts_cache_ref), short_channel_id, logger_ref).await; fut.resolve(&*graph_ref, &*gossip_ref, res); if let Some(pm) = pm_ref { pm.process_events(); } }); From 29dfeb8b80a78b296d6bf66b4ea36b0049643f16 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 20 Nov 2024 22:48:37 +0000 Subject: [PATCH 2/2] Backfill TXO values --- src/config.rs | 37 +++++++++++++++++++++++++++++++++++-- src/persistence.rs | 4 ++-- src/verifier.rs | 4 ++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 82b90e6..26c2a78 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,10 @@ use crate::hex_utils; +use crate::verifier::ChainVerifier; use std::env; use std::net::{SocketAddr, ToSocketAddrs}; +use std::ops::Deref; +use std::sync::Arc; use std::time::Duration; use bitcoin::io::Cursor; @@ -10,10 +13,14 @@ use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::PublicKey; use futures::stream::{FuturesUnordered, StreamExt}; use lightning::ln::msgs::ChannelAnnouncement; +use lightning::util::logger::Logger; use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; +use lightning_block_sync::rest::RestClient; use tokio_postgres::Config; +use tokio::sync::Semaphore; + pub(crate) const SCHEMA_VERSION: i32 = 15; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks @@ -168,7 +175,9 @@ pub(crate) fn db_index_creation_query() -> &'static str { " } -pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) { +pub(crate) async fn upgrade_db( + schema: i32, client: &mut tokio_postgres::Client, logger: L, +) where L::Target: Logger { if schema == 1 { let tx = client.transaction().await.unwrap(); tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap(); @@ -315,7 +324,31 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.commit().await.unwrap(); } if schema >= 1 && schema <= 14 { - client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap(); + println!("Upgrading to schema 15 requiring UTXO lookups for each historical channel announcement. This may take some time"); + // Note that we don't bother doing this one in a transaction, and as such need to support + // resuming on a crash. + let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await; + tokio::spawn(async move { + let client = crate::connect_to_db().await; + let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap()); + let sem = Arc::new(Semaphore::new(16)); + while let Some(scid_res) = scids.next().await { + let scid: i64 = scid_res.unwrap().get(0); + let permit = Arc::clone(&sem).acquire_owned().await.unwrap(); + let logger = logger.clone(); + tokio::spawn(async move { + let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap()); + let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await + .expect("We shouldn't have accepted a channel announce with a bad TXO"); + let client = crate::connect_to_db().await; + client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap(); + std::mem::drop(permit); + }); + } + let _all_updates_complete = sem.acquire_many(16).await.unwrap(); + client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap(); + client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap(); + }); } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); diff --git a/src/persistence.rs b/src/persistence.rs index da259dc..1bf5223 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -23,7 +23,7 @@ pub(crate) struct GossipPersister where L::Target: Logger { logger: L } -impl GossipPersister where L::Target: Logger { +impl GossipPersister where L::Target: Logger { pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); @@ -50,7 +50,7 @@ impl GossipPersister where L::Target: Logger { let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap(); if !cur_schema.is_empty() { - config::upgrade_db(cur_schema[0].get(0), &mut client).await; + config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await; } let preparation = client.execute("set time zone UTC", &[]).await; diff --git a/src/verifier.rs b/src/verifier.rs index a0d5442..c61cdc0 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -56,6 +56,10 @@ impl ChainVerifier where L::Target: .await.map(|txo| txo.value.to_sat()) } + pub(crate) async fn retrieve_txo(client: Arc, short_channel_id: u64, logger: L) -> Result { + Self::retrieve_cache_txo(client, None, short_channel_id, logger).await + } + async fn retrieve_cache_txo(client: Arc, channel_funding_amounts: Option>>>, short_channel_id: u64, logger: L) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;