Skip to content

Commit

Permalink
Lazy-start LogPoller from ContractReader, stop if started on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Feb 7, 2025
1 parent b0ef82e commit c2088a9
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 3 deletions.
4 changes: 3 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ packages:
dir: "pkg/solana/logpoller"
filename: mock_orm.go
mockname: MockORM

github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader:
interfaces:
EventsReader:
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-solana
go 1.23.3

require (
github.com/cometbft/cometbft v0.37.5
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/gagliardetto/binary v0.8.0
github.com/gagliardetto/gofuzz v1.2.2
Expand Down Expand Up @@ -50,6 +51,8 @@ require (
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,13 @@ github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
4 changes: 4 additions & 0 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

type LogPoller interface {
Start(context.Context) error
Ready() error
Close() error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, name string) error
Expand Down Expand Up @@ -547,6 +548,9 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping multinode")
closeAll = append(closeAll, c.multiNode, c.txSender)
}
if c.lp.Ready() == nil {
c.lp.Close()
}
return services.CloseAll(closeAll...)
})
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/solana/chainreader/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
)

type EventsReader interface {
Start(ctx context.Context) error
Ready() error
RegisterFilter(context.Context, logpoller.Filter) error
FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)
}
Expand Down Expand Up @@ -109,13 +111,24 @@ func (s *ContractReaderService) Name() string {
// and error.
func (s *ContractReaderService) Start(ctx context.Context) error {
return s.StartOnce(ServiceName, func() error {
if len(s.filters) == 0 {
// No dependency on EventReader
return nil
}
if s.reader.Ready() != nil {
// Start EventReader if it hasn't already been
// Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it
err := s.reader.Start(ctx)
if err != nil {
return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err)
}
}
// registering filters needs a context so we should be able to use the start function context.
for _, filter := range s.filters {
if err := s.reader.RegisterFilter(ctx, filter); err != nil {
return err
}
}

return nil
})
}
Expand Down Expand Up @@ -457,7 +470,7 @@ func toLPFilter(
Address: logpoller.PublicKey(address),
EventName: f.EventName,
EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]),
SubkeyPaths: logpoller.SubKeyPaths(subKeyPaths),
SubkeyPaths: subKeyPaths,
Retention: f.Retention,
MaxLogsKept: f.MaxLogsKept,
}
Expand Down
97 changes: 97 additions & 0 deletions pkg/solana/chainreader/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/service"
"github.com/gagliardetto/solana-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader/mocks"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
Expand Down Expand Up @@ -85,6 +88,100 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
require.Error(t, svc.Close())
}

func TestSolanaChainReaderService_Start(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
lggr := logger.Test(t)
rpcClient := new(mockedRPCClient)
pk := solana.NewWallet().PublicKey()

accountReadDef := config.ReadDefinition{
ChainSpecificName: "myAccount",
ReadType: config.Account,
}
eventReadDef := config.ReadDefinition{
ChainSpecificName: "myEvent",
ReadType: config.Event,
PollingFilter: &config.PollingFilter{EventName: "myEventSig.........."},
}

testCases := []struct {
Name string
ReadDef config.ReadDefinition
StartError error
RegisterFilterError error
}{
{Name: "no event reads", ReadDef: accountReadDef},
{Name: "already started", ReadDef: eventReadDef},
{Name: "successful start", ReadDef: eventReadDef},
{Name: "unsucessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")},
{Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")},
}

boolType := codec.IdlType{}
boolType.UnmarshalJSON([]byte(codec.IdlTypeBool))

for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
cfg := config.ContractReader{
map[string]config.ChainContractReader{
"myChainReader": {
IDL: codec.IDL{
Accounts: []codec.IdlTypeDef{{"myAccount",
codec.IdlTypeDefTy{
Kind: codec.IdlTypeDefTyKindStruct,
Fields: &[]codec.IdlField{}}}},
Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}},
},
ContractAddress: pk,
Reads: map[string]config.ReadDefinition{
"myRead": tt.ReadDef},
},
},
}
er := mocks.NewEventsReader(t)
svc, err := chainreader.NewContractReaderService(
lggr,
rpcClient,
cfg, er,
)
require.NoError(t, err)

er.On("Ready").Maybe().Return(func() error {
if tt.Name == "already started" {
return nil
}
return service.ErrNotStarted
}())
er.On("Start", mock.Anything).Maybe().Return(tt.StartError)
er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError)
err = svc.Start(ctx)
if tt.StartError != nil || tt.RegisterFilterError != nil {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}

var expectedReadyCalls, expectedStartCalls, expectedRegisterFilterCalls int
if tt.ReadDef.ReadType == config.Event {
expectedStartCalls = 1
expectedReadyCalls = 1
expectedRegisterFilterCalls = 1
}
er.AssertNumberOfCalls(t, "Ready", expectedReadyCalls)
if tt.Name == "already started" {
expectedStartCalls = 0
}
er.AssertNumberOfCalls(t, "Start", expectedStartCalls)
if tt.StartError != nil {
expectedRegisterFilterCalls = 0
}
er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls)
})
}
}

func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
ctx := tests.Context(t)

Expand Down

0 comments on commit c2088a9

Please sign in to comment.