diff --git a/.mockery.yaml b/.mockery.yaml index 3f81f53cc..2c6fb03a9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -60,4 +60,6 @@ packages: dir: "pkg/solana/logpoller" filename: mock_orm.go mockname: MockORM - + github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader: + interfaces: + EventsReader: diff --git a/go.mod b/go.mod index 1b214af7e..2b30e73dd 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-solana go 1.23.3 require ( + github.com/cometbft/cometbft v0.37.5 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/gagliardetto/binary v0.8.0 github.com/gagliardetto/gofuzz v1.2.2 @@ -50,6 +51,8 @@ require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect + github.com/go-kit/log v0.2.1 // indirect + github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect diff --git a/go.sum b/go.sum index 858e1ef7a..ad9ac66bf 100644 --- a/go.sum +++ b/go.sum @@ -168,9 +168,13 @@ github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 0202697b1..c02e90905 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -38,6 +38,7 @@ import ( type LogPoller interface { Start(context.Context) error + Ready() error Close() error RegisterFilter(ctx context.Context, filter logpoller.Filter) error UnregisterFilter(ctx context.Context, name string) error @@ -547,6 +548,9 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping multinode") closeAll = append(closeAll, c.multiNode, c.txSender) } + if c.lp.Ready() == nil { + c.lp.Close() + } return services.CloseAll(closeAll...) }) } diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 4eb17ef44..5199fc8da 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -25,6 +25,8 @@ import ( ) type EventsReader interface { + Start(ctx context.Context) error + Ready() error RegisterFilter(context.Context, logpoller.Filter) error FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error) } @@ -109,13 +111,24 @@ func (s *ContractReaderService) Name() string { // and error. func (s *ContractReaderService) Start(ctx context.Context) error { return s.StartOnce(ServiceName, func() error { + if len(s.filters) == 0 { + // No dependency on EventReader + return nil + } + if s.reader.Ready() != nil { + // Start EventReader if it hasn't already been + // Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it + err := s.reader.Start(ctx) + if err != nil { + return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err) + } + } // registering filters needs a context so we should be able to use the start function context. for _, filter := range s.filters { if err := s.reader.RegisterFilter(ctx, filter); err != nil { return err } } - return nil }) } @@ -457,7 +470,7 @@ func toLPFilter( Address: logpoller.PublicKey(address), EventName: f.EventName, EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]), - SubkeyPaths: logpoller.SubKeyPaths(subKeyPaths), + SubkeyPaths: subKeyPaths, Retention: f.Retention, MaxLogsKept: f.MaxLogsKept, } diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 987b3d256..a8e64de7f 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -13,8 +13,10 @@ import ( "testing" "time" + "github.com/cometbft/cometbft/libs/service" "github.com/gagliardetto/solana-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/smartcontractkit/libocr/commontypes" @@ -30,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader/mocks" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" @@ -85,6 +88,100 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { require.Error(t, svc.Close()) } +func TestSolanaChainReaderService_Start(t *testing.T) { + t.Parallel() + + ctx := tests.Context(t) + lggr := logger.Test(t) + rpcClient := new(mockedRPCClient) + pk := solana.NewWallet().PublicKey() + + accountReadDef := config.ReadDefinition{ + ChainSpecificName: "myAccount", + ReadType: config.Account, + } + eventReadDef := config.ReadDefinition{ + ChainSpecificName: "myEvent", + ReadType: config.Event, + PollingFilter: &config.PollingFilter{EventName: "myEventSig.........."}, + } + + testCases := []struct { + Name string + ReadDef config.ReadDefinition + StartError error + RegisterFilterError error + }{ + {Name: "no event reads", ReadDef: accountReadDef}, + {Name: "already started", ReadDef: eventReadDef}, + {Name: "successful start", ReadDef: eventReadDef}, + {Name: "unsucessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, + {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")}, + } + + boolType := codec.IdlType{} + boolType.UnmarshalJSON([]byte(codec.IdlTypeBool)) + + for _, tt := range testCases { + t.Run(tt.Name, func(t *testing.T) { + cfg := config.ContractReader{ + map[string]config.ChainContractReader{ + "myChainReader": { + IDL: codec.IDL{ + Accounts: []codec.IdlTypeDef{{"myAccount", + codec.IdlTypeDefTy{ + Kind: codec.IdlTypeDefTyKindStruct, + Fields: &[]codec.IdlField{}}}}, + Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}}, + }, + ContractAddress: pk, + Reads: map[string]config.ReadDefinition{ + "myRead": tt.ReadDef}, + }, + }, + } + er := mocks.NewEventsReader(t) + svc, err := chainreader.NewContractReaderService( + lggr, + rpcClient, + cfg, er, + ) + require.NoError(t, err) + + er.On("Ready").Maybe().Return(func() error { + if tt.Name == "already started" { + return nil + } + return service.ErrNotStarted + }()) + er.On("Start", mock.Anything).Maybe().Return(tt.StartError) + er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError) + err = svc.Start(ctx) + if tt.StartError != nil || tt.RegisterFilterError != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + var expectedReadyCalls, expectedStartCalls, expectedRegisterFilterCalls int + if tt.ReadDef.ReadType == config.Event { + expectedStartCalls = 1 + expectedReadyCalls = 1 + expectedRegisterFilterCalls = 1 + } + er.AssertNumberOfCalls(t, "Ready", expectedReadyCalls) + if tt.Name == "already started" { + expectedStartCalls = 0 + } + er.AssertNumberOfCalls(t, "Start", expectedStartCalls) + if tt.StartError != nil { + expectedRegisterFilterCalls = 0 + } + er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls) + }) + } +} + func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { ctx := tests.Context(t)