Skip to content

Commit

Permalink
Delay considering a channel closed when seeing an on-chain spend (#2936)
Browse files Browse the repository at this point in the history
Fixes Issue #2437

When an external channel is spent, add it to the `spentChannels` list instead of immediately removing it from the graph.

RBF attempts can produce multiple spending txs in the mempool for the same channel.

The `spendChannels` list maps the txid of the spending tx to the scid of the spent channel.

When a channel announcement is validated with a funding tx on the `spentChannels` list, consider the new channel a splice of the corresponding spent channel.

A splice updates the graph edges to preserve balance estimate information in the graph.

If a spending tx from the `spentChannels` list is deeply buried before appearing in a valid channel announcement, remove the corresponding spent channel from the graph.

The integration test demonstrates that local channels update their capacity, but we can not test the remote node (carol) because the ChannelAnnouncements are ignored because it has a duplicate scid. After PR #2941 we can fix this test.
---------

Co-authored-by: t-bast <[email protected]>
  • Loading branch information
remyers and t-bast authored Dec 13, 2024
1 parent 61af10a commit c390560
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ object ZmqWatcher {
def hints: Set[TxId]
}

/**
* Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the
* mempool (i.e. the outpoint exists).
*
* NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to
* react to the transaction spending the outpoint, you should use [[WatchSpent]] instead.
*/
sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] {
/** TxId of the outpoint to watch. */
def txId: TxId
/** Index of the outpoint to watch. */
def outputIndex: Int
}

/** This event is sent when a [[WatchConfirmed]] condition is met. */
sealed trait WatchConfirmedTriggered extends WatchTriggered {
/** Block in which the transaction was confirmed. */
Expand All @@ -134,11 +120,9 @@ object ZmqWatcher {
def spendingTx: Transaction
}

/** This event is sent when a [[WatchSpentBasic]] condition is met. */
sealed trait WatchSpentBasicTriggered extends WatchTriggered

case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered]
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty }
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered
case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command

case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
Expand Down Expand Up @@ -197,7 +181,6 @@ object ZmqWatcher {
private def utxo(w: GenericWatch): Option[OutPoint] = {
w match {
case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex))
case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex))
case _ => None
}
}
Expand Down Expand Up @@ -245,7 +228,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
.flatMap(watchedUtxos.get)
.flatten
.foreach {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx))
case _: WatchPublished => // nothing to do
Expand Down Expand Up @@ -339,9 +322,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
val result = w match {
case _ if watches.contains(w) =>
Ignore // we ignore duplicates
case w: WatchSpentBasic[_] =>
checkSpentBasic(w)
Keep
case w: WatchSpent[_] =>
checkSpent(w)
Keep
Expand Down Expand Up @@ -375,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
watching(watches -- deprecatedWatches, watchedUtxos)

case UnwatchExternalChannelSpent(txId, outputIndex) =>
val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w }
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)

case ValidateRequest(replyTo, ann) =>
client.validate(ann).map(replyTo ! _)
Behaviors.same
Expand All @@ -390,17 +375,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
}

private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = {
// NB: we assume parent tx was published, we just need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
case false =>
log.info(s"output=${w.txId}:${w.outputIndex} has already been spent")
w match {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
}
}
}

private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
// first let's see if the parent tx was published or not
client.getTxConfirmations(w.txId).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,8 @@ case class Commitments(params: ChannelParams,
val remoteCommitIndex = active.head.remoteCommit.index
val nextRemoteCommitIndex = remoteCommitIndex + 1

// While we have multiple active commitments, we use the most restrictive one.
val capacity = active.map(_.capacity).min
lazy val availableBalanceForSend: MilliSatoshi = active.map(_.availableBalanceForSend(params, changes)).min
lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -195,6 +195,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
)
}

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
copy(
// a capacity decrease will decrease the low bound, but not below 0
low = (low + capacityDelta.min(0 msat)).max(0 msat),
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
capacities = newCapacities
)
}

/**
* Estimate the probability that we can successfully send `amount` through the channel
*
Expand Down Expand Up @@ -263,6 +275,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
defaultHalfLife
)

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
balances.updatedWith((desc.a, desc.b)) {
case None => None
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
},
defaultHalfLife
)

def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi)(implicit log: LoggingAdapter): BalancesEstimates = {
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
val estimatedProbability = balance.canSend(amount, TimestampSecond.now())
Expand Down Expand Up @@ -305,6 +325,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
)

def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
GraphWithBalanceEstimates(
graph.updateChannel(desc, newShortChannelId, newCapacity),
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
)
}

def routeCouldRelay(route: Route)(implicit log: LoggingAdapter): GraphWithBalanceEstimates = {
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
case (hop, (balances, amount)) =>
Expand Down
26 changes: 25 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,15 @@ object Graph {
}
}

case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge])
case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge]) {
def update(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): Vertex =
incomingEdges.get(desc) match {
case None => this
case Some(edge) =>
val updatedEdge = edge.copy(desc = desc.copy(shortChannelId = newShortChannelId), capacity = newCapacity)
copy(incomingEdges = incomingEdges - desc + (desc.copy(shortChannelId = newShortChannelId) -> updatedEdge))
}
}

/** A graph data structure that uses an adjacency list, stores the incoming edges of the neighbors */
case class DirectedGraph(private val vertices: Map[PublicKey, Vertex]) {
Expand Down Expand Up @@ -678,6 +686,22 @@ object Graph {
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
}

/**
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
* both edges (corresponding to both directions) are updated.
*
* @param desc the channel description for the channel to update
* @param newShortChannelId the new shortChannelId for this channel
* @param newCapacity the new capacity of the channel
* @return a new graph with updated vertexes
*/
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
DirectedGraph(vertices
.updatedWith(desc.b)(_.map(_.update(desc, newShortChannelId, newCapacity)))
.updatedWith(desc.a)(_.map(_.update(desc.reversed, newShortChannelId, newCapacity)))
)
}

/**
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
*/
Expand Down
33 changes: 19 additions & 14 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
Expand Down Expand Up @@ -113,7 +114,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
scid2PrivateChannels = Map.empty,
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty)
sync = Map.empty,
spentChannels = Map.empty)
startWith(NORMAL, data)
}

Expand Down Expand Up @@ -259,8 +261,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(r: ValidateResult, d) =>
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
case None => stay()
}

case Event(n: NodeAnnouncement, d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
Expand Down Expand Up @@ -409,9 +420,9 @@ object Router {
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2)
def updateChannelUpdateSameSideAs(u: ChannelUpdate): PublicChannel = if (u.channelFlags.isNode1) copy(update_1_opt = Some(u)) else copy(update_2_opt = Some(u))
def updateBalances(commitments: Commitments): PublicChannel = if (commitments.localNodeId == ann.nodeId1) {
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
} else {
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
}
def applyChannelUpdate(update: Either[LocalChannelUpdate, RemoteChannelUpdate]): PublicChannel = update match {
case Left(lcu) => updateChannelUpdateSameSideAs(lcu.channelUpdate).updateBalances(lcu.commitments)
Expand Down Expand Up @@ -573,7 +584,6 @@ object Router {
def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode)
def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes)
def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel)
def emptyNodes(): Ignore = copy(nodes = Set.empty)
def emptyChannels(): Ignore = copy(channels = Set.empty)
// @formatter:on
}
Expand Down Expand Up @@ -622,12 +632,6 @@ object Router {
/** Full route including the final hop, if any. */
val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq

/**
* Fee paid for the trampoline hop, if any.
* Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once.
*/
val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat)

/**
* Fee paid for the blinded route, if any.
* Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the
Expand Down Expand Up @@ -757,7 +761,8 @@ object Router {
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down
Loading

0 comments on commit c390560

Please sign in to comment.