Skip to content

Commit

Permalink
redeem: service now includes payment info
Browse files Browse the repository at this point in the history
The information on whether a redeemable utxo was used in a payment is
now accessible through the redeem service.
  • Loading branch information
JssDWt committed Sep 25, 2024
1 parent 014f2ab commit 4c842f7
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 80 deletions.
1 change: 1 addition & 0 deletions swapd/proto/swap_internal/swap_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message Redeemable {
message RedeemableUtxo {
string outpoint = 1;
uint64 confirmation_height = 2;
optional string paid_with_request = 3;
}

message StopRequest {}
Expand Down
5 changes: 3 additions & 2 deletions swapd/src/internal_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ where
.utxos
.iter()
.map(|utxo| RedeemableUtxo {
confirmation_height: utxo.block_height,
outpoint: utxo.outpoint.to_string(),
confirmation_height: utxo.utxo.block_height,
outpoint: utxo.utxo.outpoint.to_string(),
paid_with_request: utxo.paid_with_request.clone(),
})
.collect(),
})
Expand Down
1 change: 0 additions & 1 deletion swapd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
chain_client: Arc::clone(&chain_client),
fee_estimator: Arc::clone(&fee_estimator),
poll_interval: Duration::from_secs(args.redeem_poll_interval_seconds),
swap_repository: Arc::clone(&swap_repository),
swap_service: Arc::clone(&swap_service),
redeem_repository: Arc::clone(&redeem_repository),
redeem_service: Arc::clone(&redeem_service),
Expand Down
139 changes: 112 additions & 27 deletions swapd/src/postgresql/swap_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use tracing::instrument;
use crate::{
lightning::PaymentResult,
swap::{
AddPaymentResultError, GetPaidUtxosError, GetSwapError, GetSwapsError, PaymentAttempt,
Swap, SwapPersistenceError, SwapPrivateData, SwapPublicData, SwapState,
AddPaymentResultError, GetPaidUtxosError, GetSwapError, GetSwapsError, PaidOutpoint,
PaymentAttempt, Swap, SwapPersistenceError, SwapPrivateData, SwapPublicData, SwapState,
SwapStatePaidOutpoints,
},
};

Expand Down Expand Up @@ -351,31 +352,6 @@ impl crate::swap::SwapRepository for SwapRepository {
})
}

#[instrument(level = "trace", skip(self))]
async fn get_paid_outpoints(
&self,
hash: &sha256::Hash,
) -> Result<Vec<OutPoint>, GetPaidUtxosError> {
let mut rows = sqlx::query(
r#"SELECT DISTINCT patx.tx_id
, patx.output_index
FROM swaps s
INNER JOIN payment_attempts pa ON s.payment_hash = pa.swap_payment_hash
INNER JOIN payment_attempt_tx_outputs patx ON pa.id = patx.payment_attempt_id
WHERE pa.success = true"#,
)
.bind(hash.as_byte_array().to_vec())
.fetch(&*self.pool);

let mut result = Vec::new();
while let Some(row) = rows.try_next().await? {
let tx_id: String = row.try_get("tx_id")?;
let output_index: i64 = row.try_get("output_index")?;
result.push(OutPoint::new(tx_id.parse()?, output_index as u32));
}
Ok(result)
}

#[instrument(level = "trace", skip(self))]
async fn get_swaps(
&self,
Expand Down Expand Up @@ -449,6 +425,115 @@ impl crate::swap::SwapRepository for SwapRepository {

Ok(result)
}

#[instrument(level = "trace", skip(self))]
async fn get_swaps_with_paid_outpoints(
&self,
addresses: &[Address],
) -> Result<HashMap<Address, SwapStatePaidOutpoints>, GetSwapsError> {
let addresses: Vec<String> = addresses.iter().map(|a| a.to_string()).collect();
let mut rows = sqlx::query(
r#"SELECT s.creation_time
, s.payment_hash
, s.payer_pubkey
, s.swapper_pubkey
, s.script
, s.address
, s.lock_time
, s.swapper_privkey
, s.preimage
, po.payment_request
, po.tx_id
, po.output_index
FROM swaps s
LEFT JOIN (
SELECT DISTINCT s_sub.payment_hash
, pa.payment_request
, patx.tx_id
, patx.output_index
FROM swaps s_sub
INNER JOIN payment_attempts pa ON s_sub.payment_hash = pa.swap_payment_hash
INNER JOIN payment_attempt_tx_outputs patx ON pa.id = patx.payment_attempt_id
WHERE pa.success = true
) po ON s.payment_hash = po.payment_hash
WHERE s.address = ANY($1)
ORDER BY s.payment_hash"#,
)
.bind(addresses)
.fetch(&*self.pool);

let mut result = HashMap::new();
while let Some(row) = rows.try_next().await? {
let address: &str = row.try_get("address")?;
let address = address
.parse::<Address<NetworkUnchecked>>()?
.require_network(self.network)?;
if !result.contains_key(&address) {
let creation_time: i64 = row.try_get("creation_time")?;
let payment_hash: Vec<u8> = row.try_get("payment_hash")?;
let payer_pubkey: Vec<u8> = row.try_get("payer_pubkey")?;
let swapper_pubkey: Vec<u8> = row.try_get("swapper_pubkey")?;
let script: Vec<u8> = row.try_get("script")?;

let lock_time: i64 = row.try_get("lock_time")?;
let swapper_privkey: Vec<u8> = row.try_get("swapper_privkey")?;
let preimage: Option<Vec<u8>> = row.try_get("preimage")?;

let creation_time = SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(creation_time as u64))
.ok_or(GetSwapsError::General("invalid timestamp".into()))?;
let swap = Swap {
creation_time,
public: SwapPublicData {
address: address.clone(),
hash: sha256::Hash::from_slice(&payment_hash)?,
lock_time: lock_time as u32,
payer_pubkey: PublicKey::from_slice(&payer_pubkey)?,
swapper_pubkey: PublicKey::from_slice(&swapper_pubkey)?,
script: ScriptBuf::from_bytes(script),
},
private: SwapPrivateData {
swapper_privkey: PrivateKey::from_slice(&swapper_privkey, self.network)?,
},
};

let preimage = match preimage {
Some(preimage) => Some(
preimage
.try_into()
.map_err(|_| GetSwapsError::InvalidPreimage)?,
),
None => None,
};

result.insert(
address.clone(),
SwapStatePaidOutpoints {
paid_outpoints: Vec::new(),
swap_state: SwapState { swap, preimage },
},
);
}

let tx_id: Option<String> = row.try_get("tx_id")?;
let output_index: Option<i64> = row.try_get("output_index")?;
let payment_request: Option<String> = row.try_get("payment_request")?;

if let (Some(tx_id), Some(output_index), Some(payment_request)) =
(tx_id, output_index, payment_request)
{
let entry = result.get_mut(&address).ok_or(GetSwapsError::General(
"missing expected address in map".into(),
))?;
entry.paid_outpoints.push(PaidOutpoint {
outpoint: OutPoint::new(tx_id.parse()?, output_index as u32),
payment_request,
});
}
}

Ok(result)
}
}

impl From<sqlx::Error> for SwapPersistenceError {
Expand Down
53 changes: 14 additions & 39 deletions swapd/src/redeem/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::future::{FusedFuture, FutureExt};
use futures::{stream::FuturesUnordered, StreamExt};
use thiserror::Error;
use tokio::join;
use tracing::{debug, error, field, instrument, trace, warn};
use tracing::{debug, error, field, instrument, trace};

use crate::chain::BroadcastError;
use crate::{
Expand Down Expand Up @@ -42,7 +42,6 @@ where
pub chain_client: Arc<CC>,
pub fee_estimator: Arc<FE>,
pub poll_interval: Duration,
pub swap_repository: Arc<SR>,
pub swap_service: Arc<SwapService<P>>,
pub redeem_repository: Arc<RR>,
pub redeem_service: Arc<RedeemService<CR, SR>>,
Expand All @@ -62,7 +61,6 @@ where
chain_client: Arc<CC>,
fee_estimator: Arc<FE>,
poll_interval: Duration,
swap_repository: Arc<SR>,
swap_service: Arc<SwapService<P>>,
redeem_repository: Arc<RR>,
redeem_service: Arc<RedeemService<CR, SR>>,
Expand All @@ -84,7 +82,6 @@ where
chain_client: params.chain_client,
fee_estimator: params.fee_estimator,
poll_interval: params.poll_interval,
swap_repository: params.swap_repository,
swap_service: params.swap_service,
redeem_repository: params.redeem_repository,
redeem_service: params.redeem_service,
Expand Down Expand Up @@ -143,41 +140,19 @@ where
redeemable: Redeemable,
current_height: u64,
) -> Result<(), RedeemError> {
let utxos: Vec<_> = match self
.swap_repository
.get_paid_outpoints(&redeemable.swap.public.hash)
.await
{
Ok(outpoints) => {
if outpoints.is_empty() {
warn!(
hash = field::display(redeemable.swap.public.hash),
"Could not find paid outpoints for paid swap, redeeming all known utxos"
);

// If the outpoint list is empty, claim all utxos to be sure to redeem something.
redeemable.utxos
} else {
// Take only outputs that are still unspent. If some are skipped, that may be a loss.
redeemable
.utxos
.into_iter()
.filter(|u| outpoints.contains(&u.outpoint))
.collect()
}
}
Err(e) => {
error!(
hash = field::display(redeemable.swap.public.hash),
"Failed to get paid outpoints for paid swap, redeeming all known utxos: {:?}",
e
);

// If the database call failed, claim all utxos to be sure to redeem something.
redeemable.utxos
}
};

// Only redeem utxos that were paid as part of an invoice payment. Users
// sometimes send funds to the same address multiple times, even though
// it is no longer safe to do so, because it's not obvious to a user
// a P2WSH address should not be reused. Be a good citizen and allow the
// user to refund those utxos. Note these utxos can still be redeemed
// manually by the swap server if needed.
let utxos: Vec<_> = redeemable
.utxos
.iter()
.filter(|utxo| utxo.paid_with_request.is_some())
.map(|utxo| &utxo.utxo)
.cloned()
.collect();
if utxos.is_empty() {
return Ok(());
}
Expand Down
36 changes: 29 additions & 7 deletions swapd/src/redeem/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ use crate::{
#[derive(Debug)]
pub struct Redeemable {
pub swap: Swap,
pub utxos: Vec<Utxo>,
pub utxos: Vec<RedeemableUtxo>,
pub preimage: [u8; 32],
}

#[derive(Debug)]
pub struct RedeemableUtxo {
pub utxo: Utxo,
pub paid_with_request: Option<String>,
}

impl Redeemable {
pub fn blocks_left(&self, current_height: u64) -> i32 {
let min_conf_height = self.utxos.iter().map(|u| u.block_height).min().unwrap_or(0);
let min_conf_height = self
.utxos
.iter()
.map(|u| u.utxo.block_height)
.min()
.unwrap_or(0);
(self.swap.public.lock_time as i32)
- (current_height.saturating_sub(min_conf_height) as i32)
}
Expand Down Expand Up @@ -55,23 +66,34 @@ where
pub async fn list_redeemable(&self) -> Result<Vec<Redeemable>, RedeemServiceError> {
let utxos = self.chain_repository.get_utxos().await?;
let addresses: Vec<_> = utxos.iter().map(|u| u.address.clone()).collect();
let swaps = self.swap_repository.get_swaps(&addresses).await?;
let swaps = self
.swap_repository
.get_swaps_with_paid_outpoints(&addresses)
.await?;
let mut redeemable_swaps = HashMap::new();
for utxo in utxos {
let swap = match swaps.get(&utxo.address) {
Some(swap) => swap,
None => continue,
};

let preimage = match swap.preimage {
let preimage = match swap.swap_state.preimage {
Some(preimage) => preimage,
None => continue,
};

let entry = redeemable_swaps
.entry(swap.swap.public.address.clone())
.or_insert((swap.swap.clone(), preimage, Vec::new()));
entry.2.push(utxo.utxo);
.entry(swap.swap_state.swap.public.address.clone())
.or_insert((swap.swap_state.swap.clone(), preimage, Vec::new()));
let paid_with_invoice = swap
.paid_outpoints
.iter()
.find(|po| po.outpoint == utxo.utxo.outpoint)
.map(|po| po.payment_request.clone());
entry.2.push(RedeemableUtxo {
utxo: utxo.utxo,
paid_with_request: paid_with_invoice,
});
}

Ok(redeemable_swaps
Expand Down
20 changes: 16 additions & 4 deletions swapd/src/swap/swap_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ pub struct PaymentAttempt {
pub payment_request: String,
}

#[derive(Debug)]
pub struct SwapStatePaidOutpoints {
pub swap_state: SwapState,
pub paid_outpoints: Vec<PaidOutpoint>,
}

#[derive(Debug)]
pub struct PaidOutpoint {
pub outpoint: OutPoint,
pub payment_request: String,
}

#[async_trait::async_trait]
pub trait SwapRepository {
async fn add_swap(&self, swap: &Swap) -> Result<(), SwapPersistenceError>;
Expand All @@ -65,10 +77,6 @@ pub trait SwapRepository {
label: &str,
result: &PaymentResult,
) -> Result<(), AddPaymentResultError>;
async fn get_paid_outpoints(
&self,
hash: &sha256::Hash,
) -> Result<Vec<OutPoint>, GetPaidUtxosError>;
async fn get_swap_by_hash(&self, hash: &sha256::Hash) -> Result<SwapState, GetSwapError>;
async fn get_swap_by_address(&self, address: &Address) -> Result<SwapState, GetSwapError>;
async fn get_swap_by_payment_request(
Expand All @@ -79,4 +87,8 @@ pub trait SwapRepository {
&self,
addresses: &[Address],
) -> Result<HashMap<Address, SwapState>, GetSwapsError>;
async fn get_swaps_with_paid_outpoints(
&self,
addresses: &[Address],
) -> Result<HashMap<Address, SwapStatePaidOutpoints>, GetSwapsError>;
}

0 comments on commit 4c842f7

Please sign in to comment.