From 793f371b7fdf1a2c9588c6fd8c7ca827118f0190 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 11 Jun 2024 15:59:01 +0100 Subject: [PATCH 1/4] schedule receipt check when there are new blocks Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 51 +++++++++++-------- internal/confirmations/confirmations_test.go | 9 ++-- internal/confirmations/receipt_checker.go | 1 + .../confirmations/receipt_checker_test.go | 1 - 4 files changed, 36 insertions(+), 26 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index af519be6..d96bb61b 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -84,22 +84,23 @@ type RemovedListenerInfo struct { } type blockConfirmationManager struct { - baseContext context.Context - ctx context.Context - cancelFunc func() - newBlockHashes chan *ffcapi.BlockHashEvent - connector ffcapi.API - blockListenerStale bool - metricsEmitter metrics.ConfirmationMetricsEmitter - requiredConfirmations int - staleReceiptTimeout time.Duration - bcmNotifications chan *Notification - highestBlockSeen uint64 - pending map[string]*pendingItem - pendingMux sync.Mutex - receiptChecker *receiptChecker - retry *retry.Retry - done chan struct{} + baseContext context.Context + ctx context.Context + cancelFunc func() + newBlockHashes chan *ffcapi.BlockHashEvent + connector ffcapi.API + blockListenerStale bool + metricsEmitter metrics.ConfirmationMetricsEmitter + requiredConfirmations int + staleReceiptTimeout time.Duration + bcmNotifications chan *Notification + highestBlockSeen uint64 + pending map[string]*pendingItem + pendingMux sync.Mutex + receiptChecker *receiptChecker + retry *retry.Retry + scheduleReceiptChecksOnNewBlocks bool + done chan struct{} } func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.API, desc string, @@ -140,6 +141,7 @@ type pendingItem struct { added time.Time notifiedConfirmations []*apitypes.Confirmation confirmations []*apitypes.Confirmation + scheduledAtLeastOnce bool confirmed bool queuedStale *list.Element // protected by receiptChecker mux lastReceiptCheck time.Time // protected by receiptChecker mux @@ -391,20 +393,25 @@ func (bcm *blockConfirmationManager) confirmationsListener() { notifications = notifications[:0] // Mark receipts stale after duration - bcm.staleReceiptCheck() + bcm.scheduleReceiptChecks(blockHashCount > 0) log.L(bcm.ctx).Tracef("[TimeTrace] Confirmation listener processed %d block hashes and %d notifications in %s, trigger type: %s", blockHashCount, notificationCount, time.Since(startTime), triggerType) } } -func (bcm *blockConfirmationManager) staleReceiptCheck() { +func (bcm *blockConfirmationManager) scheduleReceiptChecks(processedNewBlock bool) { now := time.Now() for _, pending := range bcm.pending { // For efficiency we do a dirty read on the receipt check time before going into the locking // check within the receipt checker - if pending.pType == pendingTypeTransaction && now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout { - bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */) + if pending.pType == pendingTypeTransaction { + if !pending.scheduledAtLeastOnce && processedNewBlock { + bcm.receiptChecker.schedule(pending, false) + } else if now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout { + // schedule stale receipt checks + bcm.receiptChecker.schedule(pending, true /* suspected timeout - prompts re-check in the lock */) + } } } } @@ -423,7 +430,9 @@ func (bcm *blockConfirmationManager) processNotifications(notifications []*Notif case NewTransaction: newItem := n.transactionPendingItem() bcm.addOrReplaceItem(newItem) - bcm.receiptChecker.schedule(newItem, false) + if !bcm.scheduleReceiptChecksOnNewBlocks { + // bcm.receiptChecker.schedule(newItem, false) + } case RemovedEventLog: bcm.removeItem(n.eventPendingItem(), true) case RemovedTransaction: diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 1b61d3a7..3b63d828 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -1199,12 +1199,13 @@ func TestStaleReceiptCheck(t *testing.T) { txHash := "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347" pending := &pendingItem{ - pType: pendingTypeTransaction, - lastReceiptCheck: time.Now().Add(-1 * time.Hour), - transactionHash: txHash, + pType: pendingTypeTransaction, + lastReceiptCheck: time.Now().Add(-1 * time.Hour), + transactionHash: txHash, + scheduledAtLeastOnce: true, } bcm.pending[pending.getKey()] = pending - bcm.staleReceiptCheck() + bcm.scheduleReceiptChecks(false) assert.Equal(t, bcm.receiptChecker.entries.Len(), 1) diff --git a/internal/confirmations/receipt_checker.go b/internal/confirmations/receipt_checker.go index 8daf6bdb..e4d2861d 100644 --- a/internal/confirmations/receipt_checker.go +++ b/internal/confirmations/receipt_checker.go @@ -149,6 +149,7 @@ func (rc *receiptChecker) schedule(pending *pendingItem, suspectedTimeout bool) return } pending.queuedStale = rc.entries.PushBack(pending) + pending.scheduledAtLeastOnce = true rc.cond.Signal() rc.cond.L.Unlock() // Log (outside the lock as it's a contended one) diff --git a/internal/confirmations/receipt_checker_test.go b/internal/confirmations/receipt_checker_test.go index 56cb6858..a42fb96d 100644 --- a/internal/confirmations/receipt_checker_test.go +++ b/internal/confirmations/receipt_checker_test.go @@ -130,5 +130,4 @@ func TestCheckReceiptDoubleQueueProtection(t *testing.T) { // to a worker, and when it's successfully executed (or put back on the end of the list) bcm.receiptChecker.schedule(pending, false) assert.Zero(t, bcm.receiptChecker.entries.Len()) - } From 5f2a84a04e3431a690f2d8f79c8f19aaf6b0ddc7 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 11 Jun 2024 17:22:02 +0100 Subject: [PATCH 2/4] restore default behaviour and add tests Signed-off-by: Chengxuan Xing --- config.md | 1 + internal/confirmations/confirmations.go | 39 ++++++++++---------- internal/confirmations/confirmations_test.go | 21 +++++++---- internal/tmconfig/tmconfig.go | 4 +- internal/tmmsgs/en_config_descriptions.go | 1 + 5 files changed, 39 insertions(+), 27 deletions(-) diff --git a/config.md b/config.md index cd804e6a..5d67ba49 100644 --- a/config.md +++ b/config.md @@ -44,6 +44,7 @@ |Key|Description|Type|Default Value| |---|-----------|----|-------------| |blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50` +|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`true` |notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50` |receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10` |required|Number of confirmations required to consider a transaction/event final|`int`|`20` diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index d96bb61b..b8b0a3df 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -84,23 +84,23 @@ type RemovedListenerInfo struct { } type blockConfirmationManager struct { - baseContext context.Context - ctx context.Context - cancelFunc func() - newBlockHashes chan *ffcapi.BlockHashEvent - connector ffcapi.API - blockListenerStale bool - metricsEmitter metrics.ConfirmationMetricsEmitter - requiredConfirmations int - staleReceiptTimeout time.Duration - bcmNotifications chan *Notification - highestBlockSeen uint64 - pending map[string]*pendingItem - pendingMux sync.Mutex - receiptChecker *receiptChecker - retry *retry.Retry - scheduleReceiptChecksOnNewBlocks bool - done chan struct{} + baseContext context.Context + ctx context.Context + cancelFunc func() + newBlockHashes chan *ffcapi.BlockHashEvent + connector ffcapi.API + blockListenerStale bool + metricsEmitter metrics.ConfirmationMetricsEmitter + requiredConfirmations int + staleReceiptTimeout time.Duration + bcmNotifications chan *Notification + highestBlockSeen uint64 + pending map[string]*pendingItem + pendingMux sync.Mutex + receiptChecker *receiptChecker + retry *retry.Retry + fetchReceiptUponEntry bool + done chan struct{} } func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.API, desc string, @@ -120,6 +120,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A MaximumDelay: config.GetDuration(tmconfig.ConfirmationsRetryMaxDelay), Factor: config.GetFloat64(tmconfig.ConfirmationsRetryFactor), }, + fetchReceiptUponEntry: config.GetBool(tmconfig.ConfirmationsFetchReceiptUponEntry), } bcm.ctx, bcm.cancelFunc = context.WithCancel(baseContext) // add a log context for this specific confirmation manager (as there are many within the ) @@ -430,8 +431,8 @@ func (bcm *blockConfirmationManager) processNotifications(notifications []*Notif case NewTransaction: newItem := n.transactionPendingItem() bcm.addOrReplaceItem(newItem) - if !bcm.scheduleReceiptChecksOnNewBlocks { - // bcm.receiptChecker.schedule(newItem, false) + if bcm.fetchReceiptUponEntry { + bcm.receiptChecker.schedule(newItem, false) } case RemovedEventLog: bcm.removeItem(n.eventPendingItem(), true) diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index 3b63d828..aae412ef 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -1185,7 +1185,7 @@ func TestCheckReceiptWalkFail(t *testing.T) { bcm.dispatchReceipt(pending, receipt, blocks) } -func TestStaleReceiptCheck(t *testing.T) { +func TestScheduleReceiptCheck(t *testing.T) { bcm, _ := newTestBlockConfirmationManager(t, false) emm := &metricsmocks.EventMetricsEmitter{} @@ -1197,17 +1197,24 @@ func TestStaleReceiptCheck(t *testing.T) { emm.On("RecordConfirmationMetrics", mock.Anything, mock.Anything).Maybe() bcm.receiptChecker = newReceiptChecker(bcm, 0, emm) - txHash := "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347" - pending := &pendingItem{ + pendingStale := &pendingItem{ // stale pType: pendingTypeTransaction, lastReceiptCheck: time.Now().Add(-1 * time.Hour), - transactionHash: txHash, + transactionHash: "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", scheduledAtLeastOnce: true, } - bcm.pending[pending.getKey()] = pending - bcm.scheduleReceiptChecks(false) - assert.Equal(t, bcm.receiptChecker.entries.Len(), 1) + pendingNotScheduled := &pendingItem{ // not scheduled + pType: pendingTypeTransaction, + lastReceiptCheck: time.Now().Add(-1 * time.Hour), + transactionHash: "0x531e219d98d81dc9f9a14811ac537479f5d77a74bdba47629bfbebe2d7663ce7", + scheduledAtLeastOnce: false, + } + bcm.pending[pendingStale.getKey()] = pendingStale + bcm.pending[pendingNotScheduled.getKey()] = pendingNotScheduled + bcm.scheduleReceiptChecks(true) + + assert.Equal(t, bcm.receiptChecker.entries.Len(), 2) } diff --git a/internal/tmconfig/tmconfig.go b/internal/tmconfig/tmconfig.go index 0463dc85..a03f4185 100644 --- a/internal/tmconfig/tmconfig.go +++ b/internal/tmconfig/tmconfig.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -30,6 +30,7 @@ var ( ConfirmationsRequired = ffc("confirmations.required") ConfirmationsBlockQueueLength = ffc("confirmations.blockQueueLength") ConfirmationsStaleReceiptTimeout = ffc("confirmations.staleReceiptTimeout") + ConfirmationsFetchReceiptUponEntry = ffc("confirmations.fetchReceiptUponEntry") ConfirmationsNotificationQueueLength = ffc("confirmations.notificationQueueLength") ConfirmationsReceiptWorkers = ffc("confirmations.receiptWorkers") ConfirmationsRetryInitDelay = ffc("confirmations.retry.initialDelay") @@ -98,6 +99,7 @@ func setDefaults() { viper.SetDefault(string(ConfirmationsRetryInitDelay), "100ms") viper.SetDefault(string(ConfirmationsRetryMaxDelay), "15s") viper.SetDefault(string(ConfirmationsRetryFactor), 2.0) + viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), true) viper.SetDefault(string(EventStreamsDefaultsBatchSize), 50) viper.SetDefault(string(EventStreamsDefaultsBatchTimeout), "5s") diff --git a/internal/tmmsgs/en_config_descriptions.go b/internal/tmmsgs/en_config_descriptions.go index 99c84d3f..14422fc8 100644 --- a/internal/tmmsgs/en_config_descriptions.go +++ b/internal/tmmsgs/en_config_descriptions.go @@ -45,6 +45,7 @@ var ( ConfigConfirmationsNotificationsQueueLength = ffc("config.confirmations.notificationQueueLength", "Internal queue length for notifying the confirmations manager of new transactions/events", i18n.IntType) ConfigConfirmationsRequired = ffc("config.confirmations.required", "Number of confirmations required to consider a transaction/event final", i18n.IntType) ConfigConfirmationsStaleReceiptTimeout = ffc("config.confirmations.staleReceiptTimeout", "Duration after which to force a receipt check for a pending transaction", i18n.TimeDurationType) + ConfigConfirmationsFetchReceiptUponEntry = ffc("config.confirmations.fetchReceiptUponEntry", "Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout", i18n.BooleanType) ConfigConfirmationsReceiptWorkers = ffc("config.confirmations.receiptWorkers", "Number of workers to use to query in parallel for receipts", i18n.IntType) ConfigTransactionsNonceStateTimeout = ffc("config.transactions.nonceStateTimeout", "How old the most recently submitted transaction record in our local state needs to be, before we make a request to the node to query the next nonce for a signing address", i18n.TimeDurationType) From eb4e5847cba741dd6b01f7379058b7d133259947 Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Tue, 11 Jun 2024 20:58:10 +0100 Subject: [PATCH 3/4] set fetchReceiptUponEntry default to false Signed-off-by: Chengxuan Xing --- config.md | 2 +- internal/confirmations/confirmations_test.go | 3 ++- internal/tmconfig/tmconfig.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/config.md b/config.md index 5d67ba49..56e8d207 100644 --- a/config.md +++ b/config.md @@ -44,7 +44,7 @@ |Key|Description|Type|Default Value| |---|-----------|----|-------------| |blockQueueLength|Internal queue length for notifying the confirmations manager of new blocks|`int`|`50` -|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`true` +|fetchReceiptUponEntry|Fetch receipt of new transactions immediately when they are added to the internal queue. When set to false, fetch will only happen when a new block is received or the transaction has been queue for more than the stale receipt timeout|`boolean`|`false` |notificationQueueLength|Internal queue length for notifying the confirmations manager of new transactions/events|`int`|`50` |receiptWorkers|Number of workers to use to query in parallel for receipts|`int`|`10` |required|Number of confirmations required to consider a transaction/event final|`int`|`20` diff --git a/internal/confirmations/confirmations_test.go b/internal/confirmations/confirmations_test.go index aae412ef..39d2dd8a 100644 --- a/internal/confirmations/confirmations_test.go +++ b/internal/confirmations/confirmations_test.go @@ -462,6 +462,7 @@ func TestBlockConfirmationManagerE2EForkReNotifyConfirmations(t *testing.T) { func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { bcm, mca := newTestBlockConfirmationManager(t, true) + bcm.fetchReceiptUponEntry = true // mark fetch receipt upon entry to do a fetch receipt before any blocks were retrieved confirmed := make(chan *apitypes.ConfirmationsNotification, 1) receiptReceived := make(chan *ffcapi.TransactionReceiptResponse, 1) @@ -643,7 +644,7 @@ func TestBlockConfirmationManagerE2ETransactionMovedFork(t *testing.T) { bcm.Stop() mca.AssertExpectations(t) - + // false } func TestBlockConfirmationManagerE2EHistoricalEvent(t *testing.T) { diff --git a/internal/tmconfig/tmconfig.go b/internal/tmconfig/tmconfig.go index a03f4185..cf877701 100644 --- a/internal/tmconfig/tmconfig.go +++ b/internal/tmconfig/tmconfig.go @@ -99,7 +99,7 @@ func setDefaults() { viper.SetDefault(string(ConfirmationsRetryInitDelay), "100ms") viper.SetDefault(string(ConfirmationsRetryMaxDelay), "15s") viper.SetDefault(string(ConfirmationsRetryFactor), 2.0) - viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), true) + viper.SetDefault(string(ConfirmationsFetchReceiptUponEntry), false) viper.SetDefault(string(EventStreamsDefaultsBatchSize), 50) viper.SetDefault(string(EventStreamsDefaultsBatchTimeout), "5s") From c7ea578a9b3e68ba6c2d7a01f0ceb5e8058b81fc Mon Sep 17 00:00:00 2001 From: Chengxuan Xing Date: Wed, 12 Jun 2024 06:13:26 +0100 Subject: [PATCH 4/4] improve first schedule logic to be more efficient Signed-off-by: Chengxuan Xing --- internal/confirmations/confirmations.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index b8b0a3df..a05effa5 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -331,6 +331,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() { notifications := make([]*Notification, 0) blockHashes := make([]string, 0) triggerType := "" + receivedFirstBlock := false for { select { case bhe := <-bcm.newBlockHashes: @@ -392,22 +393,23 @@ func (bcm *blockConfirmationManager) confirmationsListener() { } // Clear the notifications array now we've processed them (we keep the slice memory) notifications = notifications[:0] - + scheduleAllTxReceipts := !receivedFirstBlock && blockHashCount > 0 // Mark receipts stale after duration - bcm.scheduleReceiptChecks(blockHashCount > 0) + bcm.scheduleReceiptChecks(scheduleAllTxReceipts) + receivedFirstBlock = receivedFirstBlock || blockHashCount > 0 log.L(bcm.ctx).Tracef("[TimeTrace] Confirmation listener processed %d block hashes and %d notifications in %s, trigger type: %s", blockHashCount, notificationCount, time.Since(startTime), triggerType) } } -func (bcm *blockConfirmationManager) scheduleReceiptChecks(processedNewBlock bool) { +func (bcm *blockConfirmationManager) scheduleReceiptChecks(receivedBlocksFirstTime bool) { now := time.Now() for _, pending := range bcm.pending { // For efficiency we do a dirty read on the receipt check time before going into the locking // check within the receipt checker if pending.pType == pendingTypeTransaction { - if !pending.scheduledAtLeastOnce && processedNewBlock { + if receivedBlocksFirstTime && !pending.scheduledAtLeastOnce { bcm.receiptChecker.schedule(pending, false) } else if now.Sub(pending.lastReceiptCheck) > bcm.staleReceiptTimeout { // schedule stale receipt checks