From 4c842f7a5c26188a696112a9cbdd345064c2cd6e Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Wed, 25 Sep 2024 11:56:11 +0200 Subject: [PATCH] redeem: service now includes payment info The information on whether a redeemable utxo was used in a payment is now accessible through the redeem service. --- swapd/proto/swap_internal/swap_internal.proto | 1 + swapd/src/internal_server.rs | 5 +- swapd/src/main.rs | 1 - swapd/src/postgresql/swap_repository.rs | 139 ++++++++++++++---- swapd/src/redeem/monitor.rs | 53 ++----- swapd/src/redeem/service.rs | 36 ++++- swapd/src/swap/swap_repository.rs | 20 ++- 7 files changed, 175 insertions(+), 80 deletions(-) diff --git a/swapd/proto/swap_internal/swap_internal.proto b/swapd/proto/swap_internal/swap_internal.proto index 4568fc2..db7b098 100644 --- a/swapd/proto/swap_internal/swap_internal.proto +++ b/swapd/proto/swap_internal/swap_internal.proto @@ -48,6 +48,7 @@ message Redeemable { message RedeemableUtxo { string outpoint = 1; uint64 confirmation_height = 2; + optional string paid_with_request = 3; } message StopRequest {} diff --git a/swapd/src/internal_server.rs b/swapd/src/internal_server.rs index 8971b58..8f55155 100644 --- a/swapd/src/internal_server.rs +++ b/swapd/src/internal_server.rs @@ -233,8 +233,9 @@ where .utxos .iter() .map(|utxo| RedeemableUtxo { - confirmation_height: utxo.block_height, - outpoint: utxo.outpoint.to_string(), + confirmation_height: utxo.utxo.block_height, + outpoint: utxo.utxo.outpoint.to_string(), + paid_with_request: utxo.paid_with_request.clone(), }) .collect(), }) diff --git a/swapd/src/main.rs b/swapd/src/main.rs index e34b90f..8b885e6 100644 --- a/swapd/src/main.rs +++ b/swapd/src/main.rs @@ -248,7 +248,6 @@ async fn main() -> Result<(), Box> { chain_client: Arc::clone(&chain_client), fee_estimator: Arc::clone(&fee_estimator), poll_interval: Duration::from_secs(args.redeem_poll_interval_seconds), - swap_repository: Arc::clone(&swap_repository), swap_service: Arc::clone(&swap_service), redeem_repository: Arc::clone(&redeem_repository), redeem_service: Arc::clone(&redeem_service), diff --git a/swapd/src/postgresql/swap_repository.rs b/swapd/src/postgresql/swap_repository.rs index da4db4b..54be596 100644 --- a/swapd/src/postgresql/swap_repository.rs +++ b/swapd/src/postgresql/swap_repository.rs @@ -16,8 +16,9 @@ use tracing::instrument; use crate::{ lightning::PaymentResult, swap::{ - AddPaymentResultError, GetPaidUtxosError, GetSwapError, GetSwapsError, PaymentAttempt, - Swap, SwapPersistenceError, SwapPrivateData, SwapPublicData, SwapState, + AddPaymentResultError, GetPaidUtxosError, GetSwapError, GetSwapsError, PaidOutpoint, + PaymentAttempt, Swap, SwapPersistenceError, SwapPrivateData, SwapPublicData, SwapState, + SwapStatePaidOutpoints, }, }; @@ -351,31 +352,6 @@ impl crate::swap::SwapRepository for SwapRepository { }) } - #[instrument(level = "trace", skip(self))] - async fn get_paid_outpoints( - &self, - hash: &sha256::Hash, - ) -> Result, GetPaidUtxosError> { - let mut rows = sqlx::query( - r#"SELECT DISTINCT patx.tx_id - , patx.output_index - FROM swaps s - INNER JOIN payment_attempts pa ON s.payment_hash = pa.swap_payment_hash - INNER JOIN payment_attempt_tx_outputs patx ON pa.id = patx.payment_attempt_id - WHERE pa.success = true"#, - ) - .bind(hash.as_byte_array().to_vec()) - .fetch(&*self.pool); - - let mut result = Vec::new(); - while let Some(row) = rows.try_next().await? { - let tx_id: String = row.try_get("tx_id")?; - let output_index: i64 = row.try_get("output_index")?; - result.push(OutPoint::new(tx_id.parse()?, output_index as u32)); - } - Ok(result) - } - #[instrument(level = "trace", skip(self))] async fn get_swaps( &self, @@ -449,6 +425,115 @@ impl crate::swap::SwapRepository for SwapRepository { Ok(result) } + + #[instrument(level = "trace", skip(self))] + async fn get_swaps_with_paid_outpoints( + &self, + addresses: &[Address], + ) -> Result, GetSwapsError> { + let addresses: Vec = addresses.iter().map(|a| a.to_string()).collect(); + let mut rows = sqlx::query( + r#"SELECT s.creation_time + , s.payment_hash + , s.payer_pubkey + , s.swapper_pubkey + , s.script + , s.address + , s.lock_time + , s.swapper_privkey + , s.preimage + , po.payment_request + , po.tx_id + , po.output_index + FROM swaps s + LEFT JOIN ( + SELECT DISTINCT s_sub.payment_hash + , pa.payment_request + , patx.tx_id + , patx.output_index + FROM swaps s_sub + INNER JOIN payment_attempts pa ON s_sub.payment_hash = pa.swap_payment_hash + INNER JOIN payment_attempt_tx_outputs patx ON pa.id = patx.payment_attempt_id + WHERE pa.success = true + ) po ON s.payment_hash = po.payment_hash + WHERE s.address = ANY($1) + ORDER BY s.payment_hash"#, + ) + .bind(addresses) + .fetch(&*self.pool); + + let mut result = HashMap::new(); + while let Some(row) = rows.try_next().await? { + let address: &str = row.try_get("address")?; + let address = address + .parse::>()? + .require_network(self.network)?; + if !result.contains_key(&address) { + let creation_time: i64 = row.try_get("creation_time")?; + let payment_hash: Vec = row.try_get("payment_hash")?; + let payer_pubkey: Vec = row.try_get("payer_pubkey")?; + let swapper_pubkey: Vec = row.try_get("swapper_pubkey")?; + let script: Vec = row.try_get("script")?; + + let lock_time: i64 = row.try_get("lock_time")?; + let swapper_privkey: Vec = row.try_get("swapper_privkey")?; + let preimage: Option> = row.try_get("preimage")?; + + let creation_time = SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(creation_time as u64)) + .ok_or(GetSwapsError::General("invalid timestamp".into()))?; + let swap = Swap { + creation_time, + public: SwapPublicData { + address: address.clone(), + hash: sha256::Hash::from_slice(&payment_hash)?, + lock_time: lock_time as u32, + payer_pubkey: PublicKey::from_slice(&payer_pubkey)?, + swapper_pubkey: PublicKey::from_slice(&swapper_pubkey)?, + script: ScriptBuf::from_bytes(script), + }, + private: SwapPrivateData { + swapper_privkey: PrivateKey::from_slice(&swapper_privkey, self.network)?, + }, + }; + + let preimage = match preimage { + Some(preimage) => Some( + preimage + .try_into() + .map_err(|_| GetSwapsError::InvalidPreimage)?, + ), + None => None, + }; + + result.insert( + address.clone(), + SwapStatePaidOutpoints { + paid_outpoints: Vec::new(), + swap_state: SwapState { swap, preimage }, + }, + ); + } + + let tx_id: Option = row.try_get("tx_id")?; + let output_index: Option = row.try_get("output_index")?; + let payment_request: Option = row.try_get("payment_request")?; + + if let (Some(tx_id), Some(output_index), Some(payment_request)) = + (tx_id, output_index, payment_request) + { + let entry = result.get_mut(&address).ok_or(GetSwapsError::General( + "missing expected address in map".into(), + ))?; + entry.paid_outpoints.push(PaidOutpoint { + outpoint: OutPoint::new(tx_id.parse()?, output_index as u32), + payment_request, + }); + } + } + + Ok(result) + } } impl From for SwapPersistenceError { diff --git a/swapd/src/redeem/monitor.rs b/swapd/src/redeem/monitor.rs index eab6147..146c3a2 100644 --- a/swapd/src/redeem/monitor.rs +++ b/swapd/src/redeem/monitor.rs @@ -5,7 +5,7 @@ use futures::future::{FusedFuture, FutureExt}; use futures::{stream::FuturesUnordered, StreamExt}; use thiserror::Error; use tokio::join; -use tracing::{debug, error, field, instrument, trace, warn}; +use tracing::{debug, error, field, instrument, trace}; use crate::chain::BroadcastError; use crate::{ @@ -42,7 +42,6 @@ where pub chain_client: Arc, pub fee_estimator: Arc, pub poll_interval: Duration, - pub swap_repository: Arc, pub swap_service: Arc>, pub redeem_repository: Arc, pub redeem_service: Arc>, @@ -62,7 +61,6 @@ where chain_client: Arc, fee_estimator: Arc, poll_interval: Duration, - swap_repository: Arc, swap_service: Arc>, redeem_repository: Arc, redeem_service: Arc>, @@ -84,7 +82,6 @@ where chain_client: params.chain_client, fee_estimator: params.fee_estimator, poll_interval: params.poll_interval, - swap_repository: params.swap_repository, swap_service: params.swap_service, redeem_repository: params.redeem_repository, redeem_service: params.redeem_service, @@ -143,41 +140,19 @@ where redeemable: Redeemable, current_height: u64, ) -> Result<(), RedeemError> { - let utxos: Vec<_> = match self - .swap_repository - .get_paid_outpoints(&redeemable.swap.public.hash) - .await - { - Ok(outpoints) => { - if outpoints.is_empty() { - warn!( - hash = field::display(redeemable.swap.public.hash), - "Could not find paid outpoints for paid swap, redeeming all known utxos" - ); - - // If the outpoint list is empty, claim all utxos to be sure to redeem something. - redeemable.utxos - } else { - // Take only outputs that are still unspent. If some are skipped, that may be a loss. - redeemable - .utxos - .into_iter() - .filter(|u| outpoints.contains(&u.outpoint)) - .collect() - } - } - Err(e) => { - error!( - hash = field::display(redeemable.swap.public.hash), - "Failed to get paid outpoints for paid swap, redeeming all known utxos: {:?}", - e - ); - - // If the database call failed, claim all utxos to be sure to redeem something. - redeemable.utxos - } - }; - + // Only redeem utxos that were paid as part of an invoice payment. Users + // sometimes send funds to the same address multiple times, even though + // it is no longer safe to do so, because it's not obvious to a user + // a P2WSH address should not be reused. Be a good citizen and allow the + // user to refund those utxos. Note these utxos can still be redeemed + // manually by the swap server if needed. + let utxos: Vec<_> = redeemable + .utxos + .iter() + .filter(|utxo| utxo.paid_with_request.is_some()) + .map(|utxo| &utxo.utxo) + .cloned() + .collect(); if utxos.is_empty() { return Ok(()); } diff --git a/swapd/src/redeem/service.rs b/swapd/src/redeem/service.rs index 6467d57..d65bfbc 100644 --- a/swapd/src/redeem/service.rs +++ b/swapd/src/redeem/service.rs @@ -10,13 +10,24 @@ use crate::{ #[derive(Debug)] pub struct Redeemable { pub swap: Swap, - pub utxos: Vec, + pub utxos: Vec, pub preimage: [u8; 32], } +#[derive(Debug)] +pub struct RedeemableUtxo { + pub utxo: Utxo, + pub paid_with_request: Option, +} + impl Redeemable { pub fn blocks_left(&self, current_height: u64) -> i32 { - let min_conf_height = self.utxos.iter().map(|u| u.block_height).min().unwrap_or(0); + let min_conf_height = self + .utxos + .iter() + .map(|u| u.utxo.block_height) + .min() + .unwrap_or(0); (self.swap.public.lock_time as i32) - (current_height.saturating_sub(min_conf_height) as i32) } @@ -55,7 +66,10 @@ where pub async fn list_redeemable(&self) -> Result, RedeemServiceError> { let utxos = self.chain_repository.get_utxos().await?; let addresses: Vec<_> = utxos.iter().map(|u| u.address.clone()).collect(); - let swaps = self.swap_repository.get_swaps(&addresses).await?; + let swaps = self + .swap_repository + .get_swaps_with_paid_outpoints(&addresses) + .await?; let mut redeemable_swaps = HashMap::new(); for utxo in utxos { let swap = match swaps.get(&utxo.address) { @@ -63,15 +77,23 @@ where None => continue, }; - let preimage = match swap.preimage { + let preimage = match swap.swap_state.preimage { Some(preimage) => preimage, None => continue, }; let entry = redeemable_swaps - .entry(swap.swap.public.address.clone()) - .or_insert((swap.swap.clone(), preimage, Vec::new())); - entry.2.push(utxo.utxo); + .entry(swap.swap_state.swap.public.address.clone()) + .or_insert((swap.swap_state.swap.clone(), preimage, Vec::new())); + let paid_with_invoice = swap + .paid_outpoints + .iter() + .find(|po| po.outpoint == utxo.utxo.outpoint) + .map(|po| po.payment_request.clone()); + entry.2.push(RedeemableUtxo { + utxo: utxo.utxo, + paid_with_request: paid_with_invoice, + }); } Ok(redeemable_swaps diff --git a/swapd/src/swap/swap_repository.rs b/swapd/src/swap/swap_repository.rs index a9dc07c..ce063b5 100644 --- a/swapd/src/swap/swap_repository.rs +++ b/swapd/src/swap/swap_repository.rs @@ -52,6 +52,18 @@ pub struct PaymentAttempt { pub payment_request: String, } +#[derive(Debug)] +pub struct SwapStatePaidOutpoints { + pub swap_state: SwapState, + pub paid_outpoints: Vec, +} + +#[derive(Debug)] +pub struct PaidOutpoint { + pub outpoint: OutPoint, + pub payment_request: String, +} + #[async_trait::async_trait] pub trait SwapRepository { async fn add_swap(&self, swap: &Swap) -> Result<(), SwapPersistenceError>; @@ -65,10 +77,6 @@ pub trait SwapRepository { label: &str, result: &PaymentResult, ) -> Result<(), AddPaymentResultError>; - async fn get_paid_outpoints( - &self, - hash: &sha256::Hash, - ) -> Result, GetPaidUtxosError>; async fn get_swap_by_hash(&self, hash: &sha256::Hash) -> Result; async fn get_swap_by_address(&self, address: &Address) -> Result; async fn get_swap_by_payment_request( @@ -79,4 +87,8 @@ pub trait SwapRepository { &self, addresses: &[Address], ) -> Result, GetSwapsError>; + async fn get_swaps_with_paid_outpoints( + &self, + addresses: &[Address], + ) -> Result, GetSwapsError>; }