From 9b553ff6924029da153e39552fd0cc110c9bed90 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Mon, 24 Jun 2024 05:47:52 -0400 Subject: [PATCH 1/3] Get previous blockchain event before establishing listeners Signed-off-by: Peter Broadhurst --- internal/contracts/manager.go | 22 +++++++- internal/contracts/manager_test.go | 56 +++++++++++++++---- internal/multiparty/manager.go | 18 +++++- internal/multiparty/manager_test.go | 45 ++++++++++++++- mocks/apiservermocks/ffi_swagger_gen.go | 2 +- mocks/apiservermocks/server.go | 2 +- mocks/assetmocks/manager.go | 2 +- mocks/batchmocks/manager.go | 2 +- .../firefly_subscriptions.go | 2 +- mocks/blockchainmocks/callbacks.go | 2 +- mocks/blockchainmocks/plugin.go | 30 +++++----- mocks/broadcastmocks/manager.go | 2 +- mocks/cachemocks/manager.go | 2 +- mocks/contractmocks/manager.go | 2 +- mocks/coremocks/operation_callbacks.go | 2 +- mocks/databasemocks/callbacks.go | 2 +- mocks/databasemocks/plugin.go | 2 +- mocks/dataexchangemocks/callbacks.go | 2 +- mocks/dataexchangemocks/dx_event.go | 2 +- mocks/dataexchangemocks/plugin.go | 2 +- mocks/datamocks/manager.go | 2 +- mocks/definitionsmocks/handler.go | 2 +- mocks/definitionsmocks/sender.go | 2 +- mocks/eventmocks/event_manager.go | 2 +- mocks/eventsmocks/callbacks.go | 2 +- mocks/eventsmocks/plugin.go | 2 +- mocks/identitymanagermocks/manager.go | 2 +- mocks/identitymocks/callbacks.go | 2 +- mocks/identitymocks/plugin.go | 2 +- mocks/metricsmocks/manager.go | 2 +- mocks/multipartymocks/manager.go | 2 +- mocks/namespacemocks/manager.go | 2 +- mocks/networkmapmocks/manager.go | 2 +- mocks/operationmocks/manager.go | 2 +- mocks/orchestratormocks/orchestrator.go | 2 +- mocks/privatemessagingmocks/manager.go | 2 +- mocks/shareddownloadmocks/callbacks.go | 2 +- mocks/shareddownloadmocks/manager.go | 2 +- mocks/sharedstoragemocks/callbacks.go | 2 +- mocks/sharedstoragemocks/plugin.go | 2 +- mocks/spieventsmocks/manager.go | 2 +- mocks/syncasyncmocks/bridge.go | 2 +- mocks/syncasyncmocks/sender.go | 2 +- mocks/systemeventmocks/event_interface.go | 2 +- mocks/tokenmocks/callbacks.go | 2 +- mocks/tokenmocks/plugin.go | 2 +- mocks/txcommonmocks/helper.go | 2 +- mocks/txwritermocks/writer.go | 2 +- .../websocketsmocks/web_sockets_namespaced.go | 2 +- mocks/wsmocks/ws_client.go | 2 +- pkg/blockchain/plugin.go | 4 +- 51 files changed, 189 insertions(+), 76 deletions(-) diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index 628f09450..153c6a074 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -804,7 +804,24 @@ func (cm *contractManager) checkContractListenerExists(ctx context.Context, list log.L(ctx).Debugf("Validated listener %s:%s (BackendID=%s)", listener.Signature, listener.ID, listener.BackendID) return nil } - if err = cm.blockchain.AddContractListener(ctx, listener); err != nil { + + // For the case that we're establishing a listener from "latest" we obtain the protocol ID + // of the latest event confirmed from the blockchain for a given subscription. + // This protocolID should be parsed and used by the blockchain plugin if SubOptsFirstEventNewest + // is passed through, and the listener does not exist. + fb := database.BlockchainEventQueryFactory.NewFilter(ctx).Sort("-protocolid").Limit(1) + latestEvents, _, err := cm.database.GetBlockchainEvents(ctx, cm.namespace, fb.Eq( + "listener", listener.ID, + )) + if err != nil { + return err + } + lastProtocolID := "" + if len(latestEvents) > 0 { + lastProtocolID = latestEvents[0].ProtocolID + } + + if err = cm.blockchain.AddContractListener(ctx, listener, lastProtocolID); err != nil { return err } return cm.database.UpdateContractListener(ctx, cm.namespace, listener.ID, @@ -886,7 +903,8 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co if err := cm.validateFFIEvent(ctx, &listener.Event.FFIEventDefinition); err != nil { return nil, err } - if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener); err != nil { + + if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener, ""); err != nil { return nil, err } if listener.Name == "" { diff --git a/internal/contracts/manager_test.go b/internal/contracts/manager_test.go index deb02a352..cefeaff56 100644 --- a/internal/contracts/manager_test.go +++ b/internal/contracts/manager_test.go @@ -776,7 +776,7 @@ func TestAddContractListenerInline(t *testing.T) { mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil) mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) - mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil) + mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil) mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil) result, err := cm.AddContractListener(context.Background(), sub) @@ -816,7 +816,7 @@ func TestAddContractListenerInlineNilLocation(t *testing.T) { mbi.On("AddContractListener", context.Background(), mock.MatchedBy(func(cl *core.ContractListener) bool { // Normalize is not called for this case return cl.Location == nil - })).Return(nil) + }), "").Return(nil) mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil) result, err := cm.AddContractListener(context.Background(), sub) @@ -853,7 +853,7 @@ func TestAddContractListenerNoLocationOK(t *testing.T) { mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) - mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil) + mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil) mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil) result, err := cm.AddContractListener(context.Background(), sub) @@ -902,7 +902,7 @@ func TestAddContractListenerByEventPath(t *testing.T) { mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil) mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) - mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil) + mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil) mdi.On("GetFFIByID", context.Background(), "ns1", interfaceID).Return(&fftypes.FFI{}, nil) mdi.On("GetFFIEvent", context.Background(), "ns1", interfaceID, sub.EventPath).Return(event, nil) mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil) @@ -1071,6 +1071,13 @@ func TestAddContractListenerVerifyOk(t *testing.T) { fi, _ := f.Finalize() return fi.Skip == 50 && fi.Limit == 50 })).Return([]*core.ContractListener{}, nil, nil).Once() + mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool { + fi, err := f.Finalize() + assert.NoError(t, err) + return fi.Limit == 1 && strings.Contains(fi.String(), "listener") + })).Return([]*core.BlockchainEvent{ + {Namespace: "ns1", ID: fftypes.NewUUID(), ProtocolID: "001/002/003"}, + }, nil, nil).Once() mbi := cm.blockchain.(*blockchainmocks.Plugin) mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil) @@ -1079,7 +1086,7 @@ func TestAddContractListenerVerifyOk(t *testing.T) { prevBackendID := l.BackendID l.BackendID = "34567" return prevBackendID == "23456" - })).Return(nil) + }), "001/002/003").Return(nil) mdi.On("UpdateContractListener", ctx, "ns1", mock.Anything, mock.MatchedBy(func(u ffapi.Update) bool { uu, _ := u.Finalize() @@ -1106,6 +1113,7 @@ func TestAddContractListenerVerifyUpdateFail(t *testing.T) { {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"}, {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"}, }, nil, nil).Once() + mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return([]*core.BlockchainEvent{}, nil, nil).Once() mbi := cm.blockchain.(*blockchainmocks.Plugin) mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil) @@ -1114,7 +1122,7 @@ func TestAddContractListenerVerifyUpdateFail(t *testing.T) { prevBackendID := l.BackendID l.BackendID = "34567" return prevBackendID == "23456" - })).Return(nil) + }), "").Return(nil) mdi.On("UpdateContractListener", ctx, "ns1", mock.Anything, mock.MatchedBy(func(u ffapi.Update) bool { uu, _ := u.Finalize() @@ -1141,6 +1149,7 @@ func TestAddContractListenerVerifyAddFail(t *testing.T) { {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"}, {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"}, }, nil, nil).Once() + mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return([]*core.BlockchainEvent{}, nil, nil).Once() mbi := cm.blockchain.(*blockchainmocks.Plugin) mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil) @@ -1149,7 +1158,34 @@ func TestAddContractListenerVerifyAddFail(t *testing.T) { prevBackendID := l.BackendID l.BackendID = "34567" return prevBackendID == "23456" - })).Return(fmt.Errorf("pop")) + }), "").Return(fmt.Errorf("pop")) + + err := cm.verifyListeners(ctx) + assert.Regexp(t, "pop", err) + + mdi.AssertExpectations(t) + mbi.AssertExpectations(t) +} + +func TestAddContractListenerGetEventsFail(t *testing.T) { + cm := newTestContractManager() + + ctx := context.Background() + + mdi := cm.database.(*databasemocks.Plugin) + mdi.On("GetContractListeners", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool { + fi, _ := f.Finalize() + return fi.Skip == 0 && fi.Limit == 50 + })).Return([]*core.ContractListener{ + {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"}, + {Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"}, + }, nil, nil).Once() + mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything). + Return(nil, nil, fmt.Errorf("pop")).Once() + + mbi := cm.blockchain.(*blockchainmocks.Plugin) + mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil) + mbi.On("GetContractListenerStatus", ctx, "ns1", "23456", true).Return(false, nil, core.ContractListenerStatusUnknown, nil) err := cm.verifyListeners(ctx) assert.Regexp(t, "pop", err) @@ -1386,7 +1422,7 @@ func TestAddContractListenerBlockchainFail(t *testing.T) { mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil) mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) - mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(fmt.Errorf("pop")) + mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(fmt.Errorf("pop")) _, err := cm.AddContractListener(context.Background(), sub) assert.EqualError(t, err, "pop") @@ -1423,7 +1459,7 @@ func TestAddContractListenerUpsertSubFail(t *testing.T) { mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil) mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed") mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) - mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil) + mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil) mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(fmt.Errorf("pop")) _, err := cm.AddContractListener(context.Background(), sub) @@ -1464,7 +1500,7 @@ func TestAddContractAPIListener(t *testing.T) { mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil) mbi.On("AddContractListener", context.Background(), mock.MatchedBy(func(l *core.ContractListener) bool { return *l.Interface.ID == *interfaceID && l.Topic == "test-topic" - })).Return(nil) + }), "").Return(nil) mdi.On("InsertContractListener", context.Background(), mock.MatchedBy(func(l *core.ContractListener) bool { return *l.Interface.ID == *interfaceID && l.Event.Name == "changed" && l.Topic == "test-topic" })).Return(nil) diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index b0c7e1a61..811e56141 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -155,7 +155,23 @@ func (mm *multipartyManager) configureContractCommon(ctx context.Context, migrat } } - subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace, current) + // For the case that we're establishing a listener from "latest" we obtain the protocol ID + // of the latest event confirmed from the blockchain for a given subscription. + // This protocolID should be parsed and used by the blockchain plugin if SubOptsFirstEventNewest + // is passed through, and the listener does not exist. + fb := database.BlockchainEventQueryFactory.NewFilter(ctx).Sort("-protocolid").Limit(1) + latestEvents, _, err := mm.database.GetBlockchainEvents(ctx, mm.namespace.Name, fb.Eq( + "listener", nil, + )) + if err != nil { + return err + } + lastProtocolID := "" + if len(latestEvents) > 0 { + lastProtocolID = latestEvents[0].ProtocolID + } + + subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace, current, lastProtocolID) if err == nil { active.Location = current.Location active.FirstEvent = current.FirstEvent diff --git a/internal/multiparty/manager_test.go b/internal/multiparty/manager_test.go index 8dfc1218c..dc9392c8c 100644 --- a/internal/multiparty/manager_test.go +++ b/internal/multiparty/manager_test.go @@ -19,8 +19,10 @@ package multiparty import ( "context" "fmt" + "strings" "testing" + "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/mocks/blockchainmocks" @@ -115,8 +117,15 @@ func TestConfigureContract(t *testing.T) { defer mp.cleanup(t) mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) - mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) + mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, "001/002/003").Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool { + fi, err := f.Finalize() + assert.NoError(t, err) + return fi.Limit == 1 && strings.Contains(fi.String(), "listener") + })).Return([]*core.BlockchainEvent{ + {Namespace: "ns1", ID: fftypes.NewUUID(), ProtocolID: "001/002/003"}, + }, nil, nil).Once() mp.multipartyManager.config.Contracts = []blockchain.MultipartyContract{{ FirstEvent: "0", @@ -141,6 +150,7 @@ func TestConfigureContractLocationChanged(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{ Active: &core.MultipartyContract{ @@ -168,6 +178,7 @@ func TestConfigureContractDeprecatedConfig(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() err := mp.ConfigureContract(context.Background()) @@ -263,6 +274,7 @@ func TestSubmitNetworkAction(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(txid, nil) mp.mbi.On("Name").Return("ut") mp.mom.On("AddOrReuseOperation", context.Background(), mock.MatchedBy(func(op *core.Operation) bool { @@ -306,6 +318,7 @@ func TestSubmitNetworkActionTXFail(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(nil, fmt.Errorf("pop")) err := mp.ConfigureContract(context.Background()) @@ -317,6 +330,32 @@ func TestSubmitNetworkActionTXFail(t *testing.T) { mp.mth.AssertExpectations(t) } +func TestConfigureContractLookupBlockchainEventFail(t *testing.T) { + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + + mp := newTestMultipartyManager() + defer mp.cleanup(t) + + mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{ + Active: &core.MultipartyContract{Index: 0}, + } + mp.multipartyManager.config.Contracts = []blockchain.MultipartyContract{{ + FirstEvent: "0", + Location: location, + }} + + mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once() + + err := mp.ConfigureContract(context.Background()) + assert.EqualError(t, err, "pop") + + mp.mbi.AssertExpectations(t) + mp.mth.AssertExpectations(t) +} + func TestSubmitNetworkActionOpFail(t *testing.T) { location := fftypes.JSONAnyPtr(fftypes.JSONObject{ "address": "0x123", @@ -337,6 +376,7 @@ func TestSubmitNetworkActionOpFail(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(txid, nil) mp.mbi.On("Name").Return("ut") mp.mom.On("AddOrReuseOperation", context.Background(), mock.Anything).Return(fmt.Errorf("pop")) @@ -362,6 +402,7 @@ func TestSubmitNetworkActionBadType(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{ Active: &core.MultipartyContract{Index: 0}, @@ -623,6 +664,7 @@ func TestGetNetworkVersion(t *testing.T) { mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil) mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once() mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{ Active: &core.MultipartyContract{Index: 0}, @@ -651,6 +693,7 @@ func TestConfgureAndTerminateContract(t *testing.T) { mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil) mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(nil) + mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil) mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{ Active: &core.MultipartyContract{Index: 0}, diff --git a/mocks/apiservermocks/ffi_swagger_gen.go b/mocks/apiservermocks/ffi_swagger_gen.go index 0751f3f27..08a9311f5 100644 --- a/mocks/apiservermocks/ffi_swagger_gen.go +++ b/mocks/apiservermocks/ffi_swagger_gen.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package apiservermocks diff --git a/mocks/apiservermocks/server.go b/mocks/apiservermocks/server.go index 97593c40d..71247dfb9 100644 --- a/mocks/apiservermocks/server.go +++ b/mocks/apiservermocks/server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package apiservermocks diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 8b95588cc..3f2788b22 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package assetmocks diff --git a/mocks/batchmocks/manager.go b/mocks/batchmocks/manager.go index 9054cb661..a8ceb379e 100644 --- a/mocks/batchmocks/manager.go +++ b/mocks/batchmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package batchmocks diff --git a/mocks/blockchaincommonmocks/firefly_subscriptions.go b/mocks/blockchaincommonmocks/firefly_subscriptions.go index a41334e63..835d36edf 100644 --- a/mocks/blockchaincommonmocks/firefly_subscriptions.go +++ b/mocks/blockchaincommonmocks/firefly_subscriptions.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package blockchaincommonmocks diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index 0ee5cf387..e4cd093a9 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package blockchainmocks diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index 34a540874..108b752e9 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package blockchainmocks @@ -24,17 +24,17 @@ type Plugin struct { mock.Mock } -// AddContractListener provides a mock function with given fields: ctx, subscription -func (_m *Plugin) AddContractListener(ctx context.Context, subscription *core.ContractListener) error { - ret := _m.Called(ctx, subscription) +// AddContractListener provides a mock function with given fields: ctx, subscription, lastProtocolID +func (_m *Plugin) AddContractListener(ctx context.Context, subscription *core.ContractListener, lastProtocolID string) error { + ret := _m.Called(ctx, subscription, lastProtocolID) if len(ret) == 0 { panic("no return value specified for AddContractListener") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.ContractListener) error); ok { - r0 = rf(ctx, subscription) + if rf, ok := ret.Get(0).(func(context.Context, *core.ContractListener, string) error); ok { + r0 = rf(ctx, subscription, lastProtocolID) } else { r0 = ret.Error(0) } @@ -42,9 +42,9 @@ func (_m *Plugin) AddContractListener(ctx context.Context, subscription *core.Co return r0 } -// AddFireflySubscription provides a mock function with given fields: ctx, namespace, contract -func (_m *Plugin) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { - ret := _m.Called(ctx, namespace, contract) +// AddFireflySubscription provides a mock function with given fields: ctx, namespace, contract, lastProtocolID +func (_m *Plugin) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) { + ret := _m.Called(ctx, namespace, contract, lastProtocolID) if len(ret) == 0 { panic("no return value specified for AddFireflySubscription") @@ -52,17 +52,17 @@ func (_m *Plugin) AddFireflySubscription(ctx context.Context, namespace *core.Na var r0 string var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract) (string, error)); ok { - return rf(ctx, namespace, contract) + if rf, ok := ret.Get(0).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract, string) (string, error)); ok { + return rf(ctx, namespace, contract, lastProtocolID) } - if rf, ok := ret.Get(0).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract) string); ok { - r0 = rf(ctx, namespace, contract) + if rf, ok := ret.Get(0).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract, string) string); ok { + r0 = rf(ctx, namespace, contract, lastProtocolID) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract) error); ok { - r1 = rf(ctx, namespace, contract) + if rf, ok := ret.Get(1).(func(context.Context, *core.Namespace, *blockchain.MultipartyContract, string) error); ok { + r1 = rf(ctx, namespace, contract, lastProtocolID) } else { r1 = ret.Error(1) } diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 4d7158747..957f64e75 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package broadcastmocks diff --git a/mocks/cachemocks/manager.go b/mocks/cachemocks/manager.go index dee91e14f..ed923f155 100644 --- a/mocks/cachemocks/manager.go +++ b/mocks/cachemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package cachemocks diff --git a/mocks/contractmocks/manager.go b/mocks/contractmocks/manager.go index 3d53ea10e..dca991262 100644 --- a/mocks/contractmocks/manager.go +++ b/mocks/contractmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package contractmocks diff --git a/mocks/coremocks/operation_callbacks.go b/mocks/coremocks/operation_callbacks.go index f404e0506..0e6acabc5 100644 --- a/mocks/coremocks/operation_callbacks.go +++ b/mocks/coremocks/operation_callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package coremocks diff --git a/mocks/databasemocks/callbacks.go b/mocks/databasemocks/callbacks.go index 5b936ba7c..4bb148b57 100644 --- a/mocks/databasemocks/callbacks.go +++ b/mocks/databasemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package databasemocks diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 6cb9f1b0f..7aa75a825 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package databasemocks diff --git a/mocks/dataexchangemocks/callbacks.go b/mocks/dataexchangemocks/callbacks.go index bdfa4370c..e39b409a0 100644 --- a/mocks/dataexchangemocks/callbacks.go +++ b/mocks/dataexchangemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/dx_event.go b/mocks/dataexchangemocks/dx_event.go index c0cdcd3f7..2ee5e469f 100644 --- a/mocks/dataexchangemocks/dx_event.go +++ b/mocks/dataexchangemocks/dx_event.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index c1671088c..ecd545651 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index e4395a3b7..ef595ebd6 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package datamocks diff --git a/mocks/definitionsmocks/handler.go b/mocks/definitionsmocks/handler.go index 3c226e186..c4d2ba66f 100644 --- a/mocks/definitionsmocks/handler.go +++ b/mocks/definitionsmocks/handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package definitionsmocks diff --git a/mocks/definitionsmocks/sender.go b/mocks/definitionsmocks/sender.go index cbf22ec0c..23e487e47 100644 --- a/mocks/definitionsmocks/sender.go +++ b/mocks/definitionsmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package definitionsmocks diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index 13afc96fb..a5fad4a76 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package eventmocks diff --git a/mocks/eventsmocks/callbacks.go b/mocks/eventsmocks/callbacks.go index 676e06101..f4ec135b9 100644 --- a/mocks/eventsmocks/callbacks.go +++ b/mocks/eventsmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package eventsmocks diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index 453592403..2efc0a242 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package eventsmocks diff --git a/mocks/identitymanagermocks/manager.go b/mocks/identitymanagermocks/manager.go index 2eb6b8e61..51b648ea9 100644 --- a/mocks/identitymanagermocks/manager.go +++ b/mocks/identitymanagermocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package identitymanagermocks diff --git a/mocks/identitymocks/callbacks.go b/mocks/identitymocks/callbacks.go index 2f90520c7..f3f9946f8 100644 --- a/mocks/identitymocks/callbacks.go +++ b/mocks/identitymocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package identitymocks diff --git a/mocks/identitymocks/plugin.go b/mocks/identitymocks/plugin.go index 62ec273ba..bdb0e52b8 100644 --- a/mocks/identitymocks/plugin.go +++ b/mocks/identitymocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package identitymocks diff --git a/mocks/metricsmocks/manager.go b/mocks/metricsmocks/manager.go index f1c968f47..33a5e7bd7 100644 --- a/mocks/metricsmocks/manager.go +++ b/mocks/metricsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package metricsmocks diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 74885e6f6..65cf1ea11 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package multipartymocks diff --git a/mocks/namespacemocks/manager.go b/mocks/namespacemocks/manager.go index 581956692..fa6e9c5b3 100644 --- a/mocks/namespacemocks/manager.go +++ b/mocks/namespacemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package namespacemocks diff --git a/mocks/networkmapmocks/manager.go b/mocks/networkmapmocks/manager.go index 015a8895a..275128785 100644 --- a/mocks/networkmapmocks/manager.go +++ b/mocks/networkmapmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package networkmapmocks diff --git a/mocks/operationmocks/manager.go b/mocks/operationmocks/manager.go index 4c5e99c4d..98a2a22fe 100644 --- a/mocks/operationmocks/manager.go +++ b/mocks/operationmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package operationmocks diff --git a/mocks/orchestratormocks/orchestrator.go b/mocks/orchestratormocks/orchestrator.go index 7fd34c8ad..6f94f7890 100644 --- a/mocks/orchestratormocks/orchestrator.go +++ b/mocks/orchestratormocks/orchestrator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package orchestratormocks diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index 71859e10d..8988ca585 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package privatemessagingmocks diff --git a/mocks/shareddownloadmocks/callbacks.go b/mocks/shareddownloadmocks/callbacks.go index 5a1d25632..26bcd708f 100644 --- a/mocks/shareddownloadmocks/callbacks.go +++ b/mocks/shareddownloadmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package shareddownloadmocks diff --git a/mocks/shareddownloadmocks/manager.go b/mocks/shareddownloadmocks/manager.go index e24548ac3..33226238f 100644 --- a/mocks/shareddownloadmocks/manager.go +++ b/mocks/shareddownloadmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package shareddownloadmocks diff --git a/mocks/sharedstoragemocks/callbacks.go b/mocks/sharedstoragemocks/callbacks.go index 3a8938155..6296a2b7c 100644 --- a/mocks/sharedstoragemocks/callbacks.go +++ b/mocks/sharedstoragemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/sharedstoragemocks/plugin.go b/mocks/sharedstoragemocks/plugin.go index e1d8e4cb7..a48e27423 100644 --- a/mocks/sharedstoragemocks/plugin.go +++ b/mocks/sharedstoragemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/spieventsmocks/manager.go b/mocks/spieventsmocks/manager.go index 405c4a27e..a61707c9a 100644 --- a/mocks/spieventsmocks/manager.go +++ b/mocks/spieventsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package spieventsmocks diff --git a/mocks/syncasyncmocks/bridge.go b/mocks/syncasyncmocks/bridge.go index 2e67785cd..bce0a3663 100644 --- a/mocks/syncasyncmocks/bridge.go +++ b/mocks/syncasyncmocks/bridge.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/syncasyncmocks/sender.go b/mocks/syncasyncmocks/sender.go index 879ccba37..fd4bafa34 100644 --- a/mocks/syncasyncmocks/sender.go +++ b/mocks/syncasyncmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/systemeventmocks/event_interface.go b/mocks/systemeventmocks/event_interface.go index dd04220c4..dd0d4398e 100644 --- a/mocks/systemeventmocks/event_interface.go +++ b/mocks/systemeventmocks/event_interface.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package systemeventmocks diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 5929ef0ed..36a362a34 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package tokenmocks diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index 79e641537..b3292e799 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package tokenmocks diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index 44af5281b..4fbbc9193 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package txcommonmocks diff --git a/mocks/txwritermocks/writer.go b/mocks/txwritermocks/writer.go index 186bf8e4f..5358086e0 100644 --- a/mocks/txwritermocks/writer.go +++ b/mocks/txwritermocks/writer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package txwritermocks diff --git a/mocks/websocketsmocks/web_sockets_namespaced.go b/mocks/websocketsmocks/web_sockets_namespaced.go index ee93753cc..2a036a3b1 100644 --- a/mocks/websocketsmocks/web_sockets_namespaced.go +++ b/mocks/websocketsmocks/web_sockets_namespaced.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package websocketsmocks diff --git a/mocks/wsmocks/ws_client.go b/mocks/wsmocks/ws_client.go index 77712254f..fccb7288b 100644 --- a/mocks/wsmocks/ws_client.go +++ b/mocks/wsmocks/ws_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.42.1. DO NOT EDIT. +// Code generated by mockery v2.40.2. DO NOT EDIT. package wsmocks diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index ea80a32f5..7b022fff5 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -97,7 +97,7 @@ type Plugin interface { QueryContract(ctx context.Context, signingKey string, location *fftypes.JSONAny, parsedMethod interface{}, input map[string]interface{}, options map[string]interface{}) (interface{}, error) // AddContractListener adds a new subscription to a user-specified contract and event - AddContractListener(ctx context.Context, subscription *core.ContractListener) error + AddContractListener(ctx context.Context, subscription *core.ContractListener, lastProtocolID string) error // DeleteContractListener deletes a previously-created subscription DeleteContractListener(ctx context.Context, subscription *core.ContractListener, okNotFound bool) error @@ -127,7 +127,7 @@ type Plugin interface { GetAndConvertDeprecatedContractConfig(ctx context.Context) (location *fftypes.JSONAny, fromBlock string, err error) // AddFireflySubscription creates a FireFly BatchPin subscription for the provided location - AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *MultipartyContract) (subID string, err error) + AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *MultipartyContract, lastProtocolID string) (subID string, err error) // RemoveFireFlySubscription removes the provided FireFly subscription RemoveFireflySubscription(ctx context.Context, subID string) From 752c510768adcb16fc7f3636a250898916078311 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Mon, 24 Jun 2024 10:54:25 -0400 Subject: [PATCH 2/3] Update all blockchain connectors to handle lastProtocolID Signed-off-by: Peter Broadhurst --- internal/blockchain/ethereum/ethereum.go | 8 ++-- internal/blockchain/ethereum/ethereum_test.go | 44 +++++++++++-------- internal/blockchain/ethereum/eventstream.go | 25 ++++++++--- internal/blockchain/fabric/eventstream.go | 25 +++++++++-- internal/blockchain/fabric/fabric.go | 8 ++-- internal/blockchain/fabric/fabric_test.go | 35 ++++++++------- internal/blockchain/tezos/tezos.go | 12 ++++- internal/blockchain/tezos/tezos_test.go | 18 ++++---- internal/multiparty/manager.go | 2 +- 9 files changed, 112 insertions(+), 65 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 7e9dd73d2..da9ca7809 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -271,7 +271,7 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities { return e.capabilities } -func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) { ethLocation, err := e.parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -286,7 +286,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N if !ok { return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") } - sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI) + sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI, lastProtocolID) if err != nil { return "", err @@ -874,7 +874,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio return result, err } -func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) { +func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) { var location *Location namespace := listener.Namespace if listener.Location != nil { @@ -893,7 +893,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr if listener.Options != nil { firstEvent = listener.Options.FirstEvent } - result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi) + result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID) if err != nil { return err } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 83a624056..01a24f7d4 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -906,7 +906,7 @@ func TestInitAllExistingStreams(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -964,7 +964,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -1022,7 +1022,7 @@ func TestInitAllExistingStreamsOld(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -1080,7 +1080,7 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10416", err) } @@ -2027,7 +2027,7 @@ func TestAddSubscription(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -2062,7 +2062,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -2097,7 +2097,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10311", err) } @@ -2118,7 +2118,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -2147,7 +2147,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10111", err) assert.Regexp(t, "pop", err) @@ -4005,7 +4005,7 @@ func TestAddSubBadLocation(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := e.AddFireflySubscription(e.ctx, ns, contract) + _, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10310", err) } @@ -4025,9 +4025,15 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{})) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", - httpmock.NewJsonResponderOrPanic(200, subscription{ - ID: "sub1", - })) + func(r *http.Request) (*http.Response, error) { + var s subscription + err := json.NewDecoder(r.Body).Decode(&s) + assert.NoError(t, err) + assert.Equal(t, "19", s.FromBlock) + return httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub1", + })(r) + }) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(2)) utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -4055,7 +4061,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := e.AddFireflySubscription(e.ctx, ns, contract) + subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "000000000020/000000/000000") assert.NoError(t, err) assert.NotNil(t, e.subs.GetSubscription("sub1")) @@ -4103,7 +4109,7 @@ func TestAddFireflySubscriptionV1(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.NotNil(t, e.subs.GetSubscription("sub1")) } @@ -4147,7 +4153,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10465", err) } @@ -4189,7 +4195,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } @@ -4231,7 +4237,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } @@ -4273,7 +4279,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 498f3117d..f3853e5ea 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,6 +21,8 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "strconv" + "strings" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly-common/pkg/ffresty" @@ -201,13 +203,26 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) { +func latestOrLastBlock(protocolID string) string { + if len(protocolID) > 0 { + blockStr := strings.Split(protocolID, "/")[0] + blockNumber, err := strconv.ParseUint(blockStr, 10, 64) + if err == nil { + // We jump back on block from the last event, to minimize re-delivery while ensuring + // we get all events since the last delivered (including subsequent events in the same block) + return strconv.FormatUint(blockNumber-1, 10) + } + } + return "latest" +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) { // Map FireFly "firstEvent" values to Ethereum "fromBlock" values switch firstEvent { case string(core.SubOptsFirstEventOldest): firstEvent = "0" case string(core.SubOptsFirstEventNewest): - firstEvent = "latest" + firstEvent = latestOrLastBlock(lastProtocolID) } sub := subscription{ Name: subName, @@ -244,7 +259,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry, lastProtocolID string) (sub *subscription, err error) { // Include a hash of the instance path in the subscription, so if we ever point at a different // contract configuration, we re-subscribe from block 0. // We don't need full strength hashing, so just use the first 16 chars for readability. @@ -286,7 +301,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace name = v1Name } location := &Location{Address: instancePath} - if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil { + if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, lastProtocolID); err != nil { return nil, err } log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 49c2c2ec5..04d15a680 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -19,6 +19,8 @@ package fabric import ( "context" "fmt" + "strconv" + "strings" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly-common/pkg/ffresty" @@ -177,10 +179,25 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent string) (*subscription, error) { +func newestOrLastBlock(protocolID string) string { + if len(protocolID) > 0 { + blockStr := strings.Split(protocolID, "/")[0] + blockNumber, err := strconv.ParseUint(blockStr, 10, 64) + if err == nil { + // We jump back on block from the last event, to minimize re-delivery while ensuring + // we get all events since the last delivered (including subsequent events in the same block) + return strconv.FormatUint(blockNumber-1, 10) + } + } + return "newest" +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { // Map FireFly "firstEvent" values to Fabric "fromBlock" values if firstEvent == string(core.SubOptsFirstEventOldest) { firstEvent = "0" + } else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) { + firstEvent = newestOrLastBlock(lastProtocolID) } sub := subscription{ Name: name, @@ -221,7 +238,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event string) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event, lastProtocolID string) (sub *subscription, err error) { existingSubs, err := s.getSubscriptions(ctx) if err != nil { return nil, err @@ -250,7 +267,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace if version == 1 { name = v1Name } - if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent); err != nil { + if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent, lastProtocolID); err != nil { return nil, err } log.L(ctx).Infof("%s subscription: %s", event, sub.ID) diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 54cdf8b29..b0b4f1cf1 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -411,7 +411,7 @@ func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsT return nil } -func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) { fabricOnChainLocation, err := parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -437,7 +437,7 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Nam if !ok { return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") } - sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent) + sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent, lastProtocolID) if err != nil { return "", err } @@ -925,7 +925,7 @@ func encodeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, return result, err } -func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener) error { +func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) error { namespace := listener.Namespace location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -933,7 +933,7 @@ func (f *Fabric) AddContractListener(ctx context.Context, listener *core.Contrac } subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID) - result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent) + result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent, lastProtocolID) if err != nil { return err } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 1b315e453..1c5cc7550 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -442,7 +442,7 @@ func TestInitAllExistingStreams(t *testing.T) { <-toServer - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -500,7 +500,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) { <-toServer - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -525,6 +525,7 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) { json.NewDecoder(req.Body).Decode(&body) assert.Equal(t, "firefly", body["channel"]) assert.Equal(t, nil, body["chaincode"]) + assert.Equal(t, "9", body["fromBlock"]) return httpmock.NewJsonResponderOrPanic(200, body)(req) }) @@ -554,7 +555,7 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") assert.NoError(t, err) } @@ -604,7 +605,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10465", err) } @@ -646,7 +647,7 @@ func TestAddFireflySubscriptionBadOptions(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -687,7 +688,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -729,7 +730,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -784,7 +785,7 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := e.AddFireflySubscription(e.ctx, ns, contract) + subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -846,7 +847,7 @@ func TestAddFireflySubscriptionInvalidSubName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10416", err) } @@ -860,7 +861,7 @@ func TestAddFFSubscriptionBadLocation(t *testing.T) { FirstEvent: "oldest", } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := e.AddFireflySubscription(e.ctx, ns, contract) + _, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "F10310", err) } @@ -1091,7 +1092,7 @@ func TestSubQueryCreateError(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10284.*pop", err) } @@ -1139,7 +1140,7 @@ func TestSubQueryCreate(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) } @@ -1914,7 +1915,7 @@ func TestAddSubscription(t *testing.T) { return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req) }) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -1948,7 +1949,7 @@ func TestAddSubscriptionNoChannel(t *testing.T) { return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req) }) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310.*channel", err) } @@ -1971,7 +1972,7 @@ func TestAddSubscriptionNoLocation(t *testing.T) { }, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310.*channel", err) } @@ -1992,7 +1993,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -2022,7 +2023,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10284.*pop", err) } diff --git a/internal/blockchain/tezos/tezos.go b/internal/blockchain/tezos/tezos.go index e9b1a2383..9f2225e5b 100644 --- a/internal/blockchain/tezos/tezos.go +++ b/internal/blockchain/tezos/tezos.go @@ -261,7 +261,11 @@ func (t *Tezos) Capabilities() *blockchain.Capabilities { return t.capabilities } -func (t *Tezos) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (t *Tezos) AddFireflySubscription(ctx context.Context, + namespace *core.Namespace, + contract *blockchain.MultipartyContract, + _ string, // Tezos lexicographically sortable protocol IDs for not yet implemented for events +) (string, error) { tezosLocation, err := t.parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -406,7 +410,11 @@ func (t *Tezos) NormalizeContractLocation(ctx context.Context, ntype blockchain. return t.encodeContractLocation(ctx, parsed) } -func (t *Tezos) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) { +func (t *Tezos) AddContractListener( + ctx context.Context, + listener *core.ContractListener, + _ string, // Tezos lexicographically sortable protocol IDs for not yet implemented for events +) (err error) { var location *Location if listener.Location != nil { location, err = t.parseContractLocation(ctx, listener.Location) diff --git a/internal/blockchain/tezos/tezos_test.go b/internal/blockchain/tezos/tezos_test.go index 2a55d109f..ee9c3dd8a 100644 --- a/internal/blockchain/tezos/tezos_test.go +++ b/internal/blockchain/tezos/tezos_test.go @@ -548,7 +548,7 @@ func TestInitAllExistingStreams(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -827,7 +827,7 @@ func TestAddSubscription(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -856,7 +856,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -877,7 +877,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -905,7 +905,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10283.*pop", err) } @@ -1549,7 +1549,7 @@ func TestAddSubBadLocation(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err := tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10310", err) } @@ -1597,7 +1597,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := tz.AddFireflySubscription(tz.ctx, ns, contract) + subID, err := tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.NoError(t, err) assert.NotNil(t, tz.subs.GetSubscription("sub1")) @@ -1641,7 +1641,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10283", err) } @@ -1681,7 +1681,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10283", err) } diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index 811e56141..5ac4ac35b 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // From 8dd5a0d2899a0b002d080a60f15ab64770ef1be0 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 26 Jun 2024 21:43:15 -0400 Subject: [PATCH 3/3] Avoid replaying entire chain for zero or explicit block catchup Signed-off-by: Peter Broadhurst --- internal/blockchain/ethereum/eventstream.go | 54 +++++++++++---- .../blockchain/ethereum/eventstream_test.go | 69 +++++++++++++++++++ internal/blockchain/fabric/eventstream.go | 54 +++++++++++---- .../blockchain/fabric/eventstream_test.go | 69 +++++++++++++++++++ internal/coremsgs/en_error_messages.go | 2 + 5 files changed, 221 insertions(+), 27 deletions(-) create mode 100644 internal/blockchain/ethereum/eventstream_test.go create mode 100644 internal/blockchain/fabric/eventstream_test.go diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index f3853e5ea..e913139f4 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -203,31 +203,57 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func latestOrLastBlock(protocolID string) string { - if len(protocolID) > 0 { - blockStr := strings.Split(protocolID, "/")[0] - blockNumber, err := strconv.ParseUint(blockStr, 10, 64) - if err == nil { +func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) { + // Parse the lastProtocolID if supplied + var blockBeforeNewestEvent *uint64 + if len(lastProtocolID) > 0 { + blockStr := strings.Split(lastProtocolID, "/")[0] + parsedUint, err := strconv.ParseUint(blockStr, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID) + } + if parsedUint > 0 { // We jump back on block from the last event, to minimize re-delivery while ensuring // we get all events since the last delivered (including subsequent events in the same block) - return strconv.FormatUint(blockNumber-1, 10) + parsedUint-- + blockBeforeNewestEvent = &parsedUint + } + } + + // If the user requested newest, then we use the last block number if we have one, + // or we pass the request for newest down to the connector + if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" { + if blockBeforeNewestEvent != nil { + return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil } + return "latest", nil + } + + // Otherwise we expect to be able to parse the block, with "oldest" being the same as "0" + if firstEvent == string(core.SubOptsFirstEventOldest) { + firstEvent = "0" } - return "latest" + blockNumber, err := strconv.ParseUint(firstEvent, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent) + } + // If the last event is already dispatched after this block, recreate the listener from that block + if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber { + blockNumber = *blockBeforeNewestEvent + } + return strconv.FormatUint(blockNumber, 10), nil } func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) { - // Map FireFly "firstEvent" values to Ethereum "fromBlock" values - switch firstEvent { - case string(core.SubOptsFirstEventOldest): - firstEvent = "0" - case string(core.SubOptsFirstEventNewest): - firstEvent = latestOrLastBlock(lastProtocolID) + fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID) + if err != nil { + return nil, err } + sub := subscription{ Name: subName, Stream: stream, - FromBlock: firstEvent, + FromBlock: fromBlock, EthCompatEvent: abi, } diff --git a/internal/blockchain/ethereum/eventstream_test.go b/internal/blockchain/ethereum/eventstream_test.go new file mode 100644 index 000000000..dfb28f872 --- /dev/null +++ b/internal/blockchain/ethereum/eventstream_test.go @@ -0,0 +1,69 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethereum + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCreateSubscriptionBadBlock(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + _, err := e.streams.createSubscription(context.Background(), nil, "", "", "wrongness", nil, "") + assert.Regexp(t, "FF10473", err) +} + +func TestResolveFromBlockCombinations(t *testing.T) { + + ctx := context.Background() + + fromBlock, err := resolveFromBlock(ctx, "", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "latest", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "newest", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "") + assert.Equal(t, "0", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/000000/000050") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/000000/000050") + assert.Equal(t, "20", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "", "000000000010/000000/000050") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + _, err = resolveFromBlock(ctx, "", "wrong") + assert.Regexp(t, "FF10472", err) + +} diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 04d15a680..3b38142b4 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -179,26 +179,54 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func newestOrLastBlock(protocolID string) string { - if len(protocolID) > 0 { - blockStr := strings.Split(protocolID, "/")[0] - blockNumber, err := strconv.ParseUint(blockStr, 10, 64) - if err == nil { +func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) { + // Parse the lastProtocolID if supplied + var blockBeforeNewestEvent *uint64 + if len(lastProtocolID) > 0 { + blockStr := strings.Split(lastProtocolID, "/")[0] + parsedUint, err := strconv.ParseUint(blockStr, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID) + } + if parsedUint > 0 { // We jump back on block from the last event, to minimize re-delivery while ensuring // we get all events since the last delivered (including subsequent events in the same block) - return strconv.FormatUint(blockNumber-1, 10) + parsedUint-- + blockBeforeNewestEvent = &parsedUint } } - return "newest" -} -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { - // Map FireFly "firstEvent" values to Fabric "fromBlock" values + // If the user requested newest, then we use the last block number if we have one, + // or we pass the request for newest down to the connector + if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" { + if blockBeforeNewestEvent != nil { + return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil + } + return "newest", nil + } + + // Otherwise we expect to be able to parse the block, with "oldest" being the same as "0" if firstEvent == string(core.SubOptsFirstEventOldest) { firstEvent = "0" - } else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) { - firstEvent = newestOrLastBlock(lastProtocolID) } + blockNumber, err := strconv.ParseUint(firstEvent, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent) + } + // If the last event is already dispatched after this block, recreate the listener from that block + if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber { + blockNumber = *blockBeforeNewestEvent + } + return strconv.FormatUint(blockNumber, 10), nil +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { + + fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID) + if err != nil { + return nil, err + } + sub := subscription{ Name: name, Channel: location.Channel, @@ -207,7 +235,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati Filter: eventFilter{ EventFilter: event, }, - FromBlock: firstEvent, + FromBlock: fromBlock, } if location.Chaincode != "" { diff --git a/internal/blockchain/fabric/eventstream_test.go b/internal/blockchain/fabric/eventstream_test.go new file mode 100644 index 000000000..b5a15eda4 --- /dev/null +++ b/internal/blockchain/fabric/eventstream_test.go @@ -0,0 +1,69 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fabric + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCreateSubscriptionBadBlock(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + _, err := e.streams.createSubscription(context.Background(), nil, "", "", "", "wrongness", "") + assert.Regexp(t, "FF10473", err) +} + +func TestResolveFromBlockCombinations(t *testing.T) { + + ctx := context.Background() + + fromBlock, err := resolveFromBlock(ctx, "", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "latest", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "newest", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "") + assert.Equal(t, "0", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "20", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + _, err = resolveFromBlock(ctx, "", "wrong") + assert.Regexp(t, "FF10472", err) + +} diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index efb2e9db2..aeaea5b0d 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -310,4 +310,6 @@ var ( MsgNoRegistrationMessageData = ffe("FF10469", "Unable to check message registration data for org %s", 500) MsgUnexpectedRegistrationType = ffe("FF10470", "Unexpected type checking registration status: %s", 500) MsgUnableToParseRegistrationData = ffe("FF10471", "Unable to parse registration message data: %s", 500) + MsgInvalidLastEventProtocolID = ffe("FF10472", "Unable to parse protocol ID of previous event: %s", 500) + MsgInvalidFromBlockNumber = ffe("FF10473", "Unable to parse block number: %s", 500) )