Skip to content

Commit

Permalink
pushtx: add broadcast mapping function.
Browse files Browse the repository at this point in the history
The rebroadcaster has now the ability to be supplied with a
custom error mapping function which should map different backend
errors to the neutrino internal broadcast error.
  • Loading branch information
ziggie1984 committed Jul 7, 2023
1 parent cb61d7f commit eab6cc3
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 10 deletions.
29 changes: 25 additions & 4 deletions pushtx/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type Config struct {
// RebroadcastInterval is the interval that we'll continually try to
// re-broadcast transactions in-between new block arrival.
RebroadcastInterval time.Duration

// MapCustomBroadcastError allows the Rebroadcaster to map broadcast
// errors from other backends to the neutrino internal BroadcastError.
// This allows the Rebroadcaster to behave consistently over different
// backends.
MapCustomBroadcastError func(error) error
}

// Broadcaster is a subsystem responsible for reliably broadcasting transactions
Expand Down Expand Up @@ -171,10 +177,19 @@ func (b *Broadcaster) broadcastHandler(sub *blockntfns.Subscription) {
// A new broadcast request was submitted by an external caller.
case req := <-b.broadcastReqs:
err := b.cfg.Broadcast(req.tx)
if err != nil && !IsBroadcastError(err, Mempool) {
log.Errorf("Broadcast attempt failed: %v", err)
req.errChan <- err
continue
if err != nil {
// We apply the custom err mapping function if
// it was supplied which allows to map other
// backend errors to the neutrino BroadcastError.
if b.cfg.MapCustomBroadcastError != nil {
err = b.cfg.MapCustomBroadcastError(err)
}
if !IsBroadcastError(err, Mempool) {
log.Errorf("Broadcast attempt "+
"failed: %v", err)
req.errChan <- err
continue
}
}

transactions[req.tx.TxHash()] = req.tx
Expand Down Expand Up @@ -231,6 +246,12 @@ func (b *Broadcaster) rebroadcast(txs map[chainhash.Hash]*wire.MsgTx,
}

err := b.cfg.Broadcast(tx)
// We apply the custom err mapping function if it was supplied
// which allows to map other backend errors to the neutrino
// BroadcastError.
if err != nil && b.cfg.MapCustomBroadcastError != nil {
err = b.cfg.MapCustomBroadcastError(err)
}
switch {
// If the transaction has already confirmed on-chain, we can
// stop broadcasting it further.
Expand Down
86 changes: 80 additions & 6 deletions pushtx/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package pushtx

import (
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/blockntfns"
)
Expand Down Expand Up @@ -78,7 +81,7 @@ func TestBroadcaster(t *testing.T) {
func TestRebroadcast(t *testing.T) {
t.Parallel()

const numTxs = 3
const numTxs = 5

// We'll start by setting up the broadcaster with channels to mock the
// behavior of its external dependencies.
Expand Down Expand Up @@ -177,8 +180,9 @@ func TestRebroadcast(t *testing.T) {
wire.BlockHeader{}, 100, wire.BlockHeader{},
)

// This time however, only the last two transactions will be rebroadcast
// since the first one confirmed in the previous rebroadcast attempt.
// This time however, only the last four transactions will be
// rebroadcasted since the first one confirmed in the previous
// rebroadcast attempt.
assertBroadcastOrder(txs[1:])

// We now manually mark one of the transactions as confirmed.
Expand All @@ -187,16 +191,86 @@ func TestRebroadcast(t *testing.T) {
// Trigger a new block notification to rebroadcast the transactions.
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 101)

// We assert that only the last transaction is rebroadcast.
// We assert that only the last three transactions are rebroadcasted.
assertBroadcastOrder(txs[2:])

// Manually mark the last transaction as confirmed.
// Manually mark the third transaction as confirmed.
broadcaster.MarkAsConfirmed(txs[2].TxHash())

// Now we inject a custom error mapping function for backend errors
// other than neutrino.
broadcaster.cfg.MapCustomBroadcastError = func(err error) error {
// match is a helper method to easily string match on the error
// message.
match := func(err error, s string) bool {
return strings.Contains(strings.ToLower(err.Error()), s)
}

switch {
case match(err, "mempool min fee not met"):
return &BroadcastError{
Code: Mempool,
Reason: err.Error(),
}

case match(err, "transaction already exists"):
return &BroadcastError{
Code: Confirmed,
Reason: err.Error(),
}

default:
return fmt.Errorf("unmatched backend error: %v", err)
}
}

// Now, we'll modify the Broadcast method to mark the fourth transaction
// as confirmed but with a bitcoind backend notification to test that
// the mapping between different backend errors and the neutrino
// BroadcastError works as expected. We also mark the last transaction
// with the bitcoind backend error for not having enough fees to be
// included in the mempool. We expected that it gets rebroadcasted too.
broadcaster.cfg.Broadcast = func(tx *wire.MsgTx) error {
broadcastChan <- tx
if tx.TxHash() == txs[3].TxHash() {
return &btcjson.RPCError{
Code: btcjson.ErrRPCVerifyAlreadyInChain,
Message: "transaction already exists",
}
}
if tx.TxHash() == txs[4].TxHash() {
return &btcjson.RPCError{
Code: btcjson.ErrRPCTxRejected,
Message: "mempool min fee not met",
}
}

return nil
}

// Trigger a new block notification.
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 102)

// Assert that no transactions were rebroadcast.
// We assert that only the last two transactions are rebroadcasted.
assertBroadcastOrder(txs[3:])

// Trigger another block notification simulating a reorg in the chain.
// The transactions should be rebroadcasted again to ensure they
// properly propagate throughout the network.
ntfnChan <- blockntfns.NewBlockDisconnected(
wire.BlockHeader{}, 102, wire.BlockHeader{},
)

// We assert that only the last transaction is rebroadcasted.
assertBroadcastOrder(txs[4:])

// Manually mark the last transaction as confirmed.
broadcaster.MarkAsConfirmed(txs[4].TxHash())

// Trigger a new block notification.
ntfnChan <- blockntfns.NewBlockConnected(wire.BlockHeader{}, 103)

// Assert that no transactions were rebroadcasted.
select {
case tx := <-broadcastChan:
t.Fatalf("unexpected rebroadcast of tx %s", tx.TxHash())
Expand Down

0 comments on commit eab6cc3

Please sign in to comment.