Skip to content

Commit

Permalink
Work around fundrawtransaction consolidation
Browse files Browse the repository at this point in the history
We don't have any control over bitcoind's coin selection: it sometimes
ends up creating transactions with a large number of inputs (and thus
a high fee), which we want to avoid. We run multiple coin selection
attempts in parallel and use the best result to mitigate that.

We should monitor how this behaves in practice and revert that change
if it doesn't prove to be useful.

We should also push for more configuration options on the bitcoind side
to get funding results that better match our transaction patterns.
  • Loading branch information
t-bast committed Jan 5, 2024
1 parent 61f1e1f commit 9c49413
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import akka.actor.typed.{ActorRef, Behavior}
import com.softwaremill.quicklens.{ModifyPimp, QuicklensEach}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{KotlinUtils, OutPoint, Satoshi, SatoshiLong, Script, ScriptWitness, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain.OnChainChannelFunder
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.blockchain.{OnChainChannelFunder, OnChainWallet}
import fr.acinq.eclair.channel.fund.InteractiveTxBuilder._
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol.TxAddInput
import fr.acinq.eclair.{Logs, UInt64}
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Random, Success}
import scala.util.{Failure, Random, Success, Try}

/**
* Created by t-bast on 05/01/2023.
Expand Down Expand Up @@ -182,14 +182,44 @@ private class InteractiveTxFunder(replyTo: ActorRef[InteractiveTxFunder.Response
}
}

/**
* We don't have any control over bitcoind's coin selection: it sometimes ends up creating transactions with a large
* number of inputs (and thus a high fee), which we want to avoid. We run multiple coin selection attempts in parallel
* and use the best result.
*
* We should get rid of this hack if in practice it doesn't significantly improve the fees we pay, or if bitcoind
* provides better configuration hooks for the coin selection algorithm.
*/
private def fundTransaction(txNotFunded: Transaction): Future[OnChainWallet.FundTransactionResponse] = {
val sharedInputWeight = fundingParams.sharedInput_opt.toSeq.map(i => i.info.outPoint -> i.weight.toLong).toMap
val fundingAttempts = (1 to 3).map { _ =>
wallet.fundTransaction(txNotFunded, fundingParams.targetFeerate, replaceable = true, externalInputsWeight = sharedInputWeight)
.map(Success(_))
.recover { case f => Failure(f) }
}
Future.foldLeft(fundingAttempts)(Try(Seq.empty[OnChainWallet.FundTransactionResponse])) {
case (current, Success(next)) => Success(current.getOrElse(Nil) :+ next)
case (Success(current), Failure(_)) if current.nonEmpty => Success(current)
case (_, Failure(f)) => Failure(f)
}.flatMap {
case Failure(f) => Future.failed(f)
case Success(candidates) =>
if (candidates.size > 1) log.info("multiple funding candidates found: {}", candidates.map(r => s"${r.fee} fees with ${r.tx.txIn.size} inputs").mkString(", "))
val sorted = candidates.sortBy(_.fee)
// We must unlock the inputs used by the other result that don't appear in the selected result.
val unusedInputs = sorted.tail.flatMap(_.tx.txIn.map(_.outPoint)).toSet -- sorted.head.tx.txIn.map(_.outPoint).toSet
val dummyTx = Transaction(2, unusedInputs.toSeq.map(o => TxIn(o, Nil, 0)), Nil, 0)
wallet.rollback(dummyTx).transformWith(_ => Future.successful(sorted.head))
}
}

/**
* We (ab)use bitcoind's `fundrawtransaction` to select available utxos from our wallet. Not all utxos are suitable
* for dual funding though (e.g. they need to use segwit), so we filter them and iterate until we have a valid set of
* inputs.
*/
private def fund(txNotFunded: Transaction, currentInputs: Seq[OutgoingInput], unusableInputs: Set[UnusableInput]): Behavior[Command] = {
val sharedInputWeight = fundingParams.sharedInput_opt.toSeq.map(i => i.info.outPoint -> i.weight.toLong).toMap
context.pipeToSelf(wallet.fundTransaction(txNotFunded, fundingParams.targetFeerate, replaceable = true, externalInputsWeight = sharedInputWeight)) {
context.pipeToSelf(fundTransaction(txNotFunded)) {
case Failure(t) => WalletFailure(t)
case Success(result) => FundTransactionResult(result.tx, result.changePosition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,13 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
// Invalid serial_id.
alice2bob.forward(bob, inputA.copy(serialId = UInt64(1)))
bob2alice.expectMsgType[TxAbort]
awaitCond(wallet.rolledback.length == 1)
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)

// Below dust.
bob2alice.forward(alice, TxAddOutput(channelId(bob), UInt64(1), 150 sat, Script.write(Script.pay2wpkh(randomKey().publicKey))))
alice2bob.expectMsgType[TxAbort]
awaitCond(wallet.rolledback.length == 2)
awaitCond(inputA.previousTx_opt.forall(prevTx => wallet.rolledback.exists(_.txIn.exists(_.outPoint.txid == prevTx.txid))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsgType[OpenChannelResponse.Rejected]
Expand All @@ -118,15 +117,14 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
test("recv TxAbort", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

alice2bob.expectMsgType[TxAddInput]
val inputA = alice2bob.expectMsgType[TxAddInput]
alice2bob.forward(bob, TxAbort(channelId(alice), hex"deadbeef"))
val bobTxAbort = bob2alice.expectMsgType[TxAbort]
awaitCond(wallet.rolledback.size == 1)
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)

bob2alice.forward(alice, bobTxAbort)
awaitCond(wallet.rolledback.size == 2)
awaitCond(inputA.previousTx_opt.forall(prevTx => wallet.rolledback.exists(_.txIn.exists(_.outPoint.txid == prevTx.txid))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsgType[OpenChannelResponse.RemoteError]
Expand All @@ -135,7 +133,7 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
test("recv TxInitRbf", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

alice2bob.expectMsgType[TxAddInput]
val inputA = alice2bob.expectMsgType[TxAddInput]
alice2bob.forward(bob, TxInitRbf(channelId(alice), 0, FeeratePerKw(15_000 sat)))
bob2alice.expectMsgType[Warning]
assert(bob.stateName == WAIT_FOR_DUAL_FUNDING_CREATED)
Expand All @@ -144,13 +142,13 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
alice2bob.expectMsgType[Warning]
assert(alice.stateName == WAIT_FOR_DUAL_FUNDING_CREATED)
aliceOpenReplyTo.expectNoMessage(100 millis)
assert(wallet.rolledback.isEmpty)
inputA.previousTx_opt.foreach(prevTx => assert(!wallet.rolledback.exists(_.txIn.exists(_.outPoint.txid == prevTx.txid))))
}

test("recv TxAckRbf", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

alice2bob.expectMsgType[TxAddInput]
val inputA = alice2bob.expectMsgType[TxAddInput]
alice2bob.forward(bob, TxAckRbf(channelId(alice)))
bob2alice.expectMsgType[Warning]
assert(bob.stateName == WAIT_FOR_DUAL_FUNDING_CREATED)
Expand All @@ -159,21 +157,22 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
alice2bob.expectMsgType[Warning]
assert(alice.stateName == WAIT_FOR_DUAL_FUNDING_CREATED)
aliceOpenReplyTo.expectNoMessage(100 millis)
assert(wallet.rolledback.isEmpty)
inputA.previousTx_opt.foreach(prevTx => assert(!wallet.rolledback.exists(_.txIn.exists(_.outPoint.txid == prevTx.txid))))
}

test("recv Error", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

val finalChannelId = channelId(alice)
awaitCond(wallet.rolledback.size == 2) // we make 3 parallel funding attempts and only keep the best candidate
alice ! Error(finalChannelId, "oops")
awaitCond(wallet.rolledback.size == 1)
awaitCond(wallet.rolledback.size == 3)
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsgType[OpenChannelResponse.RemoteError]

bob ! Error(finalChannelId, "oops")
awaitCond(wallet.rolledback.size == 2)
awaitCond(wallet.rolledback.size == 4)
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}
Expand All @@ -184,32 +183,34 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
val finalChannelId = channelId(alice)
val sender = TestProbe()
val c = CMD_CLOSE(sender.ref, None, None)
awaitCond(wallet.rolledback.size == 2) // we make 3 parallel funding attempts and only keep the best candidate

alice ! c
sender.expectMsg(RES_SUCCESS(c, finalChannelId))
awaitCond(wallet.rolledback.size == 1)
awaitCond(wallet.rolledback.size == 3)
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsg(OpenChannelResponse.Cancelled)

bob ! c
sender.expectMsg(RES_SUCCESS(c, finalChannelId))
awaitCond(wallet.rolledback.size == 2)
awaitCond(wallet.rolledback.size == 4)
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}

test("recv INPUT_DISCONNECTED", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

awaitCond(wallet.rolledback.size == 2) // we make 3 parallel funding attempts and only keep the best candidate
alice ! INPUT_DISCONNECTED
awaitCond(wallet.rolledback.size == 1)
awaitCond(wallet.rolledback.size == 3)
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsg(OpenChannelResponse.Disconnected)

bob ! INPUT_DISCONNECTED
awaitCond(wallet.rolledback.size == 2)
awaitCond(wallet.rolledback.size == 4)
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}
Expand Down Expand Up @@ -255,7 +256,7 @@ class WaitForDualFundingCreatedStateSpec extends TestKitBaseClass with FixtureAn
import f._

alice ! TickChannelOpenTimeout
awaitCond(wallet.rolledback.size == 1)
awaitCond(wallet.rolledback.nonEmpty)
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
aliceOpenReplyTo.expectMsg(OpenChannelResponse.TimedOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,24 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
val bobCommitSig = bob2alice.expectMsgType[CommitSig]
val aliceCommitSig = alice2bob.expectMsgType[CommitSig]

val fundingTxA = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
bob2alice.forward(alice, bobCommitSig.copy(signature = ByteVector64.Zeroes))
alice2bob.expectMsgType[Error]
awaitCond(wallet.rolledback.length == 1)
awaitCond(fundingTxA.tx.localInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)

alice2bob.forward(bob, aliceCommitSig.copy(signature = ByteVector64.Zeroes))
bob2alice.expectMsgType[Error]
awaitCond(wallet.rolledback.length == 2)
awaitCond(fundingTxA.tx.remoteInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}

test("recv invalid TxSignatures", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

val fundingTxA = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
bob2alice.expectMsgType[CommitSig]
bob2alice.forward(alice)
alice2bob.expectMsgType[CommitSig]
Expand All @@ -222,7 +224,7 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
bob2blockchain.expectMsgType[WatchFundingConfirmed]
bob2alice.forward(alice, bobSigs.copy(txId = randomTxId(), witnesses = Nil))
alice2bob.expectMsgType[Error]
awaitCond(wallet.rolledback.size == 1)
awaitCond(fundingTxA.tx.localInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)

Expand All @@ -246,7 +248,9 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
bob2alice.forward(alice, TxInitRbf(channelId(bob), 0, FeeratePerKw(15_000 sat)))
alice2bob.expectMsgType[Warning]
assert(alice.stateName == WAIT_FOR_DUAL_FUNDING_SIGNED)
assert(wallet.rolledback.isEmpty)
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
fundingTx.tx.localInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
fundingTx.tx.remoteInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
}

test("recv TxAckRbf", Tag(ChannelStateTestsTags.DualFunding)) { f =>
Expand All @@ -262,20 +266,23 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
bob2alice.forward(alice, TxAckRbf(channelId(bob)))
alice2bob.expectMsgType[Warning]
assert(alice.stateName == WAIT_FOR_DUAL_FUNDING_SIGNED)
assert(wallet.rolledback.isEmpty)
val fundingTx = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
fundingTx.tx.localInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
fundingTx.tx.remoteInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
}

test("recv Error", Tag(ChannelStateTestsTags.DualFunding)) { f =>
import f._

val finalChannelId = channelId(alice)
val fundingTxA = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
alice ! Error(finalChannelId, "oops")
awaitCond(wallet.rolledback.size == 1)
awaitCond(fundingTxA.tx.localInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)

bob ! Error(finalChannelId, "oops")
awaitCond(wallet.rolledback.size == 2)
awaitCond(fundingTxA.tx.remoteInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}
Expand All @@ -284,18 +291,19 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
import f._

val finalChannelId = channelId(alice)
val fundingTxA = alice.stateData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
val sender = TestProbe()
val c = CMD_CLOSE(sender.ref, None, None)

alice ! c
sender.expectMsg(RES_SUCCESS(c, finalChannelId))
awaitCond(wallet.rolledback.size == 1)
awaitCond(fundingTxA.tx.localInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)

bob ! c
sender.expectMsg(RES_SUCCESS(c, finalChannelId))
awaitCond(wallet.rolledback.size == 2)
awaitCond(fundingTxA.tx.remoteInputs.forall(i => wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
bobListener.expectMsgType[ChannelAborted]
awaitCond(bob.stateName == CLOSED)
}
Expand All @@ -304,6 +312,7 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
import f._

val aliceData = alice.stateData
val fundingTx = aliceData.asInstanceOf[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED].signingSession.fundingTx
alice ! INPUT_DISCONNECTED
awaitCond(alice.stateName == OFFLINE)
assert(alice.stateData == aliceData)
Expand All @@ -314,7 +323,8 @@ class WaitForDualFundingSignedStateSpec extends TestKitBaseClass with FixtureAny
awaitCond(bob.stateName == OFFLINE)
assert(bob.stateData == bobData)
bobListener.expectNoMessage(100 millis)
assert(wallet.rolledback.isEmpty)
fundingTx.tx.localInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
fundingTx.tx.remoteInputs.foreach(i => assert(!wallet.rolledback.exists(_.txIn.map(_.outPoint).contains(i.outPoint))))
}

test("recv INPUT_DISCONNECTED (commit_sig not received)", Tag(ChannelStateTestsTags.DualFunding)) { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
alice ! ProcessCurrentBlockHeight(CurrentBlockHeight(currentBlock))
alice2bob.expectMsgType[Error]
alice2blockchain.expectNoMessage(100 millis)
awaitCond(wallet.rolledback.map(_.txid) == Seq(fundingTx.txid))
awaitCond(wallet.rolledback.map(_.txid).contains(fundingTx.txid))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
}
Expand All @@ -486,7 +486,7 @@ class WaitForDualFundingConfirmedStateSpec extends TestKitBaseClass with Fixture
alice ! ProcessCurrentBlockHeight(CurrentBlockHeight(currentBlock))
alice2bob.expectMsgType[Error]
alice2blockchain.expectNoMessage(100 millis)
awaitCond(wallet.rolledback.map(_.txid) == Seq(fundingTx.txid))
awaitCond(wallet.rolledback.map(_.txid).contains(fundingTx.txid))
aliceListener.expectMsgType[ChannelAborted]
awaitCond(alice.stateName == CLOSED)
}
Expand Down

0 comments on commit 9c49413

Please sign in to comment.