diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 8e69cf2ff..5f5f63646 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -325,7 +325,6 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() }) } - // TODO: import typeProvider function from codec package and pass to constructor ch.lp = logpoller.New(logger.Sugared(logger.Named(lggr, "LogPoller")), logpoller.NewORM(ch.ID(), ds, lggr), ch.multiClient) ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr) ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 975f6853b..c23ffc662 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -85,7 +85,7 @@ func (fl *filters) PruneFilters(ctx context.Context) error { // that matches filter.EventSig signature will be captured starting from filter.StartingBlock. // The filter may be unregistered later by filter.Name. // In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if -// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. +// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubKeyPaths) does not match original filter. // Otherwise, updates remaining fields and schedules backfill. // Warnings/debug information is keyed by filter name. func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { @@ -446,8 +446,8 @@ func (fl *filters) LoadFilters(ctx context.Context) error { return nil } -// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subkeyPath. It uses the decoder -// associated with that filter to decode the event and extract the subkey value from the specified subKeyPath. +// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subKeyPath. It uses the decoder +// associated with that filter to decode the event and extract the subKey value from the specified subKeyPath. // WARNING: not thread safe, should only be called while fl.filtersMutex is held and after filters have been loaded. func (fl *filters) DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error) { filter, ok := fl.filtersByID[ID] diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 086b911e4..5187118ad 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -105,9 +105,9 @@ func TestFilters_RegisterFilter(t *testing.T) { }, }, { - Name: "SubkeyPaths", + Name: "SubKeyPaths", ModifyField: func(f *Filter) { - f.SubkeyPaths = [][]string{{uuid.NewString()}} + f.SubKeyPaths = [][]string{{uuid.NewString()}} }, }, } diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index f7f058eb7..b06e72d8b 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) var ( @@ -31,6 +32,7 @@ type ORM interface { GetLatestBlock(ctx context.Context) (int64, error) InsertLogs(context.Context, []Log) (err error) SelectSeqNums(ctx context.Context) (map[int64]int64, error) + FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type logsLoader interface { @@ -157,8 +159,8 @@ func (lp *Service) Process(ctx context.Context, programEvent ProgramEvent) (err return err } - log.SubkeyValues = make([]IndexedValue, 0, len(filter.SubkeyPaths)) - for _, path := range filter.SubkeyPaths { + log.SubKeyValues = make([]IndexedValue, 0, len(filter.SubKeyPaths)) + for _, path := range filter.SubKeyPaths { subKeyVal, decodeSubKeyErr := lp.filters.DecodeSubKey(ctx, lp.lggr, log.Data, filter.ID, path) if decodeSubKeyErr != nil { return decodeSubKeyErr @@ -167,7 +169,7 @@ func (lp *Service) Process(ctx context.Context, programEvent ProgramEvent) (err if newIndexedValErr != nil { return newIndexedValErr } - log.SubkeyValues = append(log.SubkeyValues, indexedVal) + log.SubKeyValues = append(log.SubKeyValues, indexedVal) } log.SequenceNum = lp.filters.IncrementSeqNum(filter.ID) @@ -376,3 +378,7 @@ func (lp *Service) backgroundWorkerRun(ctx context.Context) { lp.lggr.Errorw("Failed to prune filters", "err", err) } } + +func (lp *Service) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { + return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName) +} diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index d87403266..796a9709b 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -255,9 +255,9 @@ func TestProcess(t *testing.T) { A int64 B string }{55, "hello"} - subkeyValA, err := newIndexedValue(event.A) + subKeyValA, err := newIndexedValue(event.A) require.NoError(t, err) - subkeyValB, err := newIndexedValue(event.B) + subKeyValB, err := newIndexedValue(event.B) require.NoError(t, err) filterID := rand.Int63() @@ -271,7 +271,7 @@ func TestProcess(t *testing.T) { expectedLog.LogIndex, err = makeLogIndex(txIndex, txLogIndex) require.NoError(t, err) expectedLog.SequenceNum = 1 - expectedLog.SubkeyValues = []IndexedValue{subkeyValA, subkeyValB} + expectedLog.SubKeyValues = []IndexedValue{subKeyValA, subKeyValB} expectedLog.Data, err = bin.MarshalBorsh(&event) require.NoError(t, err) @@ -325,7 +325,7 @@ func TestProcess(t *testing.T) { Address: addr, EventSig: eventSig, EventIdl: idl, - SubkeyPaths: [][]string{{"A"}, {"B"}}, + SubKeyPaths: [][]string{{"A"}, {"B"}}, } orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{filter}, nil).Once() orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{}, nil).Once() diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go index 3b3546266..855683ae3 100644 --- a/pkg/solana/logpoller/mock_orm.go +++ b/pkg/solana/logpoller/mock_orm.go @@ -5,6 +5,7 @@ package logpoller import ( context "context" + query "github.com/smartcontractkit/chainlink-common/pkg/types/query" mock "github.com/stretchr/testify/mock" ) @@ -113,6 +114,67 @@ func (_c *MockORM_DeleteFilters_Call) RunAndReturn(run func(context.Context, map return _c } +// FilteredLogs provides a mock function with given fields: ctx, queryFilter, limitAndSort, queryName +func (_m *MockORM) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { + ret := _m.Called(ctx, queryFilter, limitAndSort, queryName) + + if len(ret) == 0 { + panic("no return value specified for FilteredLogs") + } + + var r0 []Log + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) ([]Log, error)); ok { + return rf(ctx, queryFilter, limitAndSort, queryName) + } + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) []Log); ok { + r0 = rf(ctx, queryFilter, limitAndSort, queryName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]Log) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []query.Expression, query.LimitAndSort, string) error); ok { + r1 = rf(ctx, queryFilter, limitAndSort, queryName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockORM_FilteredLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FilteredLogs' +type MockORM_FilteredLogs_Call struct { + *mock.Call +} + +// FilteredLogs is a helper method to define mock.On call +// - ctx context.Context +// - queryFilter []query.Expression +// - limitAndSort query.LimitAndSort +// - queryName string +func (_e *MockORM_Expecter) FilteredLogs(ctx interface{}, queryFilter interface{}, limitAndSort interface{}, queryName interface{}) *MockORM_FilteredLogs_Call { + return &MockORM_FilteredLogs_Call{Call: _e.mock.On("FilteredLogs", ctx, queryFilter, limitAndSort, queryName)} +} + +func (_c *MockORM_FilteredLogs_Call) Run(run func(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string)) *MockORM_FilteredLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]query.Expression), args[2].(query.LimitAndSort), args[3].(string)) + }) + return _c +} + +func (_c *MockORM_FilteredLogs_Call) Return(_a0 []Log, _a1 error) *MockORM_FilteredLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockORM_FilteredLogs_Call) RunAndReturn(run func(context.Context, []query.Expression, query.LimitAndSort, string) ([]Log, error)) *MockORM_FilteredLogs_Call { + _c.Call.Return(run) + return _c +} + // GetLatestBlock provides a mock function with given fields: ctx func (_m *MockORM) GetLatestBlock(ctx context.Context) (int64, error) { ret := _m.Called(ctx) diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index 0e7768d0f..b0e8cd195 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -14,7 +14,7 @@ type Filter struct { EventSig EventSignature StartingBlock int64 EventIdl EventIdl - SubkeyPaths SubkeyPaths + SubKeyPaths SubKeyPaths Retention time.Duration MaxLogsKept int64 IsDeleted bool // only for internal usage. Values set externally are ignored. @@ -23,7 +23,7 @@ type Filter struct { func (f Filter) MatchSameLogs(other Filter) bool { return f.Address == other.Address && f.EventSig == other.EventSig && - f.EventIdl.Equal(other.EventIdl) && f.SubkeyPaths.Equal(other.SubkeyPaths) + f.EventIdl.Equal(other.EventIdl) && f.SubKeyPaths.Equal(other.SubKeyPaths) } // DiscriminatorRawBytes returns raw discriminator bytes as a string, this string is not base64 encoded and is always len of discriminator which is 8. @@ -41,7 +41,7 @@ type Log struct { BlockTimestamp time.Time Address PublicKey EventSig EventSignature - SubkeyValues []IndexedValue + SubKeyValues []IndexedValue TxHash Signature Data []byte CreatedAt time.Time diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 417b2e6ba..7476cb111 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -53,7 +53,7 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err withEventSig(filter.EventSig). withStartingBlock(filter.StartingBlock). withEventIDL(filter.EventIdl). - withSubkeyPaths(filter.SubkeyPaths). + withSubKeyPaths(filter.SubKeyPaths). withIsBackfilled(filter.IsBackfilled). toArgs() if err != nil { diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 362333d27..9746a5369 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -34,7 +34,7 @@ func TestLogPollerFilters(t *testing.T) { EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIdl: EventIdl{}, - SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}), + SubKeyPaths: SubKeyPaths([][]string{{"a", "b"}, {"c"}}), Retention: 1000, MaxLogsKept: 3, }, @@ -44,7 +44,7 @@ func TestLogPollerFilters(t *testing.T) { EventName: "event", EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, - SubkeyPaths: SubkeyPaths([][]string{}), + SubKeyPaths: SubKeyPaths([][]string{}), Retention: 1000, MaxLogsKept: 3, }, @@ -54,7 +54,7 @@ func TestLogPollerFilters(t *testing.T) { EventName: "event", EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, - SubkeyPaths: nil, + SubKeyPaths: nil, Retention: 1000, MaxLogsKept: 3, }, @@ -254,8 +254,31 @@ func newRandomFilter(t *testing.T) Filter { EventName: "event", EventSig: newRandomEventSignature(t), StartingBlock: 1, - SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, + SubKeyPaths: [][]string{{"a", "b"}, {"c"}}, Retention: 1000, MaxLogsKept: 3, } } + +func newRandomLog(t *testing.T, filterID int64, chainID string) Log { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + data := []byte("solana is fun") + signature, err := privateKey.Sign(data) + require.NoError(t, err) + return Log{ + FilterID: filterID, + ChainID: chainID, + LogIndex: rand.Int63n(1000), + BlockHash: Hash(pubKey), + BlockNumber: rand.Int63n(1000000), + BlockTimestamp: time.Unix(1731590113, 0), + Address: PublicKey(pubKey), + EventSig: EventSignature{3, 2, 1}, + SubKeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, + TxHash: Signature(signature), + Data: data, + SequenceNum: rand.Int63n(500), + } +} diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index fcb3a8da9..78a74d17d 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -15,13 +15,16 @@ import ( ) const ( - blockFieldName = "block_number" - chainIDFieldName = "chain_id" - timestampFieldName = "block_timestamp" - txHashFieldName = "tx_hash" - addressFieldName = "address" - eventSigFieldName = "event_sig" - defaultSort = "block_number ASC, log_index ASC" + blockFieldName = "block_number" + chainIDFieldName = "chain_id" + timestampFieldName = "block_timestamp" + txHashFieldName = "tx_hash" + addressFieldName = "address" + eventSigFieldName = "event_sig" + defaultSort = "block_number ASC, log_index ASC" + subKeyValuesFieldName = "subkey_values" + subKeyValueArg = "subkey_value" + subKeyIndexArgName = "subkey_index" ) var ( @@ -54,6 +57,80 @@ var _ primitives.Visitor = (*pgDSLParser)(nil) func (v *pgDSLParser) Comparator(_ primitives.Comparator) {} +type IndexedValueComparator struct { + Value IndexedValue + Operator primitives.ComparisonOperator +} + +type eventBySubKeyFilter struct { + SubKeyIndex uint64 + ValueComparers []IndexedValueComparator +} + +func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) { + switch v := visitor.(type) { + case *pgDSLParser: + v.VisitEventSubKeysByValueFilter(f) + } +} + +func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) { + var indexedValueComparators []IndexedValueComparator + for _, cmp := range valueComparers { + iVal, err := newIndexedValue(cmp.Value) + if err != nil { + return query.Expression{}, err + } + iValCmp := IndexedValueComparator{ + Value: iVal, + Operator: cmp.Operator, + } + indexedValueComparators = append(indexedValueComparators, iValCmp) + } + return query.Expression{ + Primitive: &eventBySubKeyFilter{ + SubKeyIndex: subKeyIndex, + ValueComparers: indexedValueComparators, + }, + }, nil +} + +func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) { + if len(p.ValueComparers) > 0 { + if p.SubKeyIndex > 3 { // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding more db indexes + v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex) + return + } + + // Add 1 since postgresql arrays are 1-indexed. + subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1) + + comps := make([]string, len(p.ValueComparers)) + for idx, comp := range p.ValueComparers { + comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s") + if v.err != nil { + return + } + } + + v.expression = strings.Join(comps, " AND ") + } +} + +func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) { + cmp, err := cmpOpToString(comp.Operator) + if err != nil { + return "", err + } + + return fmt.Sprintf( + pattern, + subfield, + cmp, + args.withIndexedField(field, comp.Value), + ), nil +} + func (v *pgDSLParser) Block(prim primitives.Block) { cmp, err := cmpOpToString(prim.Operator) if err != nil { diff --git a/pkg/solana/logpoller/parser_test.go b/pkg/solana/logpoller/parser_test.go index 96d2d8656..31588d8d8 100644 --- a/pkg/solana/logpoller/parser_test.go +++ b/pkg/solana/logpoller/parser_test.go @@ -198,6 +198,47 @@ func TestDSLParser(t *testing.T) { }) }) + t.Run("query for event topic", func(t *testing.T) { + t.Parallel() + + subKeyFilter, err := NewEventBySubKeyFilter(2, []primitives.ValueComparator{ + {Value: 4, Operator: primitives.Gt}, + {Value: 7, Operator: primitives.Lt}, + }) + require.NoError(t, err) + + parser := &pgDSLParser{} + expressions := []query.Expression{subKeyFilter} + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + require.NoError(t, err) + expectedQuery := logsQuery( + " WHERE chain_id = :chain_id " + + "AND subkey_values[:subkey_index_0] > :subkey_value_0 AND subkey_values[:subkey_index_0] < :subkey_value_1 ORDER BY " + defaultSort) + + var iValLower, iValUpper IndexedValue + iValLower, err = newIndexedValue(4) + require.NoError(t, err) + iValUpper, err = newIndexedValue(7) + require.NoError(t, err) + + expectedArgs := map[string]any{ + "chain_id": chainID, + "subkey_index_0": uint64(3), + "subkey_value_0": iValLower, + "subkey_value_1": iValUpper, + } + + require.NoError(t, err) + assert.Equal(t, expectedQuery, result) + + var m map[string]any + m, err = args.toArgs() + require.NoError(t, err) + assert.Equal(t, expectedArgs, m) + }) + // nested query -> a & (b || c) t.Run("nested query", func(t *testing.T) { t.Parallel() diff --git a/pkg/solana/logpoller/query.go b/pkg/solana/logpoller/query.go index 4c7844183..59f8db1b8 100644 --- a/pkg/solana/logpoller/query.go +++ b/pkg/solana/logpoller/query.go @@ -90,9 +90,9 @@ func (q *queryArgs) withEventIDL(eventIdl EventIdl) *queryArgs { return q.withField("event_idl", eventIdl) } -// withSubkeyPaths sets the SubkeyPaths field in queryArgs. -func (q *queryArgs) withSubkeyPaths(subkeyPaths [][]string) *queryArgs { - return q.withField("subkey_paths", subkeyPaths) +// withSubKeyPaths sets the SubKeyPaths field in queryArgs. +func (q *queryArgs) withSubKeyPaths(subKeyPaths [][]string) *queryArgs { + return q.withField("subkey_paths", subKeyPaths) } // withRetention sets the Retention field in queryArgs. diff --git a/pkg/solana/logpoller/test_helpers.go b/pkg/solana/logpoller/test_helpers.go index 8f133bab2..e0ef69139 100644 --- a/pkg/solana/logpoller/test_helpers.go +++ b/pkg/solana/logpoller/test_helpers.go @@ -39,7 +39,7 @@ func newRandomLog(t *testing.T, filterID int64, chainID string, eventName string BlockTimestamp: time.Unix(1731590113, 0).UTC(), Address: PublicKey(pubKey), EventSig: EventSignature(codec.NewDiscriminatorHashPrefix(eventName, false)), - SubkeyValues: []IndexedValue{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, + SubKeyValues: []IndexedValue{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, TxHash: Signature(signature), Data: data, SequenceNum: rand.Int63n(500), diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index fcd38a312..c8efe68cc 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -93,17 +93,17 @@ func scanFixedLengthArray(name string, maxLength int, src interface{}, dest []by return nil } -type SubkeyPaths [][]string +type SubKeyPaths [][]string -func (p SubkeyPaths) Value() (driver.Value, error) { +func (p SubKeyPaths) Value() (driver.Value, error) { return json.Marshal([][]string(p)) } -func (p *SubkeyPaths) Scan(src interface{}) error { - return scanJSON("SubkeyPaths", p, src) +func (p *SubKeyPaths) Scan(src interface{}) error { + return scanJSON("SubKeyPaths", p, src) } -func (p SubkeyPaths) Equal(o SubkeyPaths) bool { +func (p SubKeyPaths) Equal(o SubKeyPaths) bool { return slices.EqualFunc(p, o, slices.Equal) }