Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logpoller.FilteredLogs and support for filtering on subkey #1020

Merged
merged 7 commits into from
Jan 27, 2025
1 change: 0 additions & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L
bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
}

// TODO: import typeProvider function from codec package and pass to constructor
ch.lp = logpoller.New(logger.Sugared(logger.Named(lggr, "LogPoller")), logpoller.NewORM(ch.ID(), ds, lggr), ch.multiClient)
ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
Expand Down
6 changes: 3 additions & 3 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (fl *filters) PruneFilters(ctx context.Context) error {
// that matches filter.EventSig signature will be captured starting from filter.StartingBlock.
// The filter may be unregistered later by filter.Name.
// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if
// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter.
// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubKeyPaths) does not match original filter.
// Otherwise, updates remaining fields and schedules backfill.
// Warnings/debug information is keyed by filter name.
func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {
Expand Down Expand Up @@ -446,8 +446,8 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
return nil
}

// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subkeyPath. It uses the decoder
// associated with that filter to decode the event and extract the subkey value from the specified subKeyPath.
// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subKeyPath. It uses the decoder
// associated with that filter to decode the event and extract the subKey value from the specified subKeyPath.
// WARNING: not thread safe, should only be called while fl.filtersMutex is held and after filters have been loaded.
func (fl *filters) DecodeSubKey(ctx context.Context, lggr logger.SugaredLogger, raw []byte, ID int64, subKeyPath []string) (any, error) {
filter, ok := fl.filtersByID[ID]
Expand Down
4 changes: 2 additions & 2 deletions pkg/solana/logpoller/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func TestFilters_RegisterFilter(t *testing.T) {
},
},
{
Name: "SubkeyPaths",
Name: "SubKeyPaths",
ModifyField: func(f *Filter) {
f.SubkeyPaths = [][]string{{uuid.NewString()}}
f.SubKeyPaths = [][]string{{uuid.NewString()}}
},
},
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
)

var (
Expand All @@ -31,6 +32,7 @@ type ORM interface {
GetLatestBlock(ctx context.Context) (int64, error)
InsertLogs(context.Context, []Log) (err error)
SelectSeqNums(ctx context.Context) (map[int64]int64, error)
FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type logsLoader interface {
Expand Down Expand Up @@ -157,8 +159,8 @@ func (lp *Service) Process(ctx context.Context, programEvent ProgramEvent) (err
return err
}

log.SubkeyValues = make([]IndexedValue, 0, len(filter.SubkeyPaths))
for _, path := range filter.SubkeyPaths {
log.SubKeyValues = make([]IndexedValue, 0, len(filter.SubKeyPaths))
for _, path := range filter.SubKeyPaths {
subKeyVal, decodeSubKeyErr := lp.filters.DecodeSubKey(ctx, lp.lggr, log.Data, filter.ID, path)
if decodeSubKeyErr != nil {
return decodeSubKeyErr
Expand All @@ -167,7 +169,7 @@ func (lp *Service) Process(ctx context.Context, programEvent ProgramEvent) (err
if newIndexedValErr != nil {
return newIndexedValErr
}
log.SubkeyValues = append(log.SubkeyValues, indexedVal)
log.SubKeyValues = append(log.SubKeyValues, indexedVal)
}

log.SequenceNum = lp.filters.IncrementSeqNum(filter.ID)
Expand Down Expand Up @@ -376,3 +378,7 @@ func (lp *Service) backgroundWorkerRun(ctx context.Context) {
lp.lggr.Errorw("Failed to prune filters", "err", err)
}
}

func (lp *Service) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}
8 changes: 4 additions & 4 deletions pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ func TestProcess(t *testing.T) {
A int64
B string
}{55, "hello"}
subkeyValA, err := newIndexedValue(event.A)
subKeyValA, err := newIndexedValue(event.A)
require.NoError(t, err)
subkeyValB, err := newIndexedValue(event.B)
subKeyValB, err := newIndexedValue(event.B)
require.NoError(t, err)

filterID := rand.Int63()
Expand All @@ -271,7 +271,7 @@ func TestProcess(t *testing.T) {
expectedLog.LogIndex, err = makeLogIndex(txIndex, txLogIndex)
require.NoError(t, err)
expectedLog.SequenceNum = 1
expectedLog.SubkeyValues = []IndexedValue{subkeyValA, subkeyValB}
expectedLog.SubKeyValues = []IndexedValue{subKeyValA, subKeyValB}

expectedLog.Data, err = bin.MarshalBorsh(&event)
require.NoError(t, err)
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestProcess(t *testing.T) {
Address: addr,
EventSig: eventSig,
EventIdl: idl,
SubkeyPaths: [][]string{{"A"}, {"B"}},
SubKeyPaths: [][]string{{"A"}, {"B"}},
}
orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{filter}, nil).Once()
orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{}, nil).Once()
Expand Down
62 changes: 62 additions & 0 deletions pkg/solana/logpoller/mock_orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/solana/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Filter struct {
EventSig EventSignature
StartingBlock int64
EventIdl EventIdl
SubkeyPaths SubkeyPaths
SubKeyPaths SubKeyPaths
Retention time.Duration
MaxLogsKept int64
IsDeleted bool // only for internal usage. Values set externally are ignored.
Expand All @@ -23,7 +23,7 @@ type Filter struct {

func (f Filter) MatchSameLogs(other Filter) bool {
return f.Address == other.Address && f.EventSig == other.EventSig &&
f.EventIdl.Equal(other.EventIdl) && f.SubkeyPaths.Equal(other.SubkeyPaths)
f.EventIdl.Equal(other.EventIdl) && f.SubKeyPaths.Equal(other.SubKeyPaths)
}

// DiscriminatorRawBytes returns raw discriminator bytes as a string, this string is not base64 encoded and is always len of discriminator which is 8.
Expand All @@ -41,7 +41,7 @@ type Log struct {
BlockTimestamp time.Time
Address PublicKey
EventSig EventSignature
SubkeyValues []IndexedValue
SubKeyValues []IndexedValue
TxHash Signature
Data []byte
CreatedAt time.Time
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err
withEventSig(filter.EventSig).
withStartingBlock(filter.StartingBlock).
withEventIDL(filter.EventIdl).
withSubkeyPaths(filter.SubkeyPaths).
withSubKeyPaths(filter.SubKeyPaths).
withIsBackfilled(filter.IsBackfilled).
toArgs()
if err != nil {
Expand Down
31 changes: 27 additions & 4 deletions pkg/solana/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLogPollerFilters(t *testing.T) {
EventSig: EventSignature{1, 2, 3},
StartingBlock: 1,
EventIdl: EventIdl{},
SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}),
SubKeyPaths: SubKeyPaths([][]string{{"a", "b"}, {"c"}}),
Retention: 1000,
MaxLogsKept: 3,
},
Expand All @@ -44,7 +44,7 @@ func TestLogPollerFilters(t *testing.T) {
EventName: "event",
EventSig: EventSignature{1, 2, 3},
StartingBlock: 1,
SubkeyPaths: SubkeyPaths([][]string{}),
SubKeyPaths: SubKeyPaths([][]string{}),
Retention: 1000,
MaxLogsKept: 3,
},
Expand All @@ -54,7 +54,7 @@ func TestLogPollerFilters(t *testing.T) {
EventName: "event",
EventSig: EventSignature{1, 2, 3},
StartingBlock: 1,
SubkeyPaths: nil,
SubKeyPaths: nil,
Retention: 1000,
MaxLogsKept: 3,
},
Expand Down Expand Up @@ -254,8 +254,31 @@ func newRandomFilter(t *testing.T) Filter {
EventName: "event",
EventSig: newRandomEventSignature(t),
StartingBlock: 1,
SubkeyPaths: [][]string{{"a", "b"}, {"c"}},
SubKeyPaths: [][]string{{"a", "b"}, {"c"}},
Retention: 1000,
MaxLogsKept: 3,
}
}

func newRandomLog(t *testing.T, filterID int64, chainID string) Log {
privateKey, err := solana.NewRandomPrivateKey()
require.NoError(t, err)
pubKey := privateKey.PublicKey()
data := []byte("solana is fun")
signature, err := privateKey.Sign(data)
require.NoError(t, err)
return Log{
FilterID: filterID,
ChainID: chainID,
LogIndex: rand.Int63n(1000),
BlockHash: Hash(pubKey),
BlockNumber: rand.Int63n(1000000),
BlockTimestamp: time.Unix(1731590113, 0),
Address: PublicKey(pubKey),
EventSig: EventSignature{3, 2, 1},
SubKeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()},
TxHash: Signature(signature),
Data: data,
SequenceNum: rand.Int63n(500),
}
}
91 changes: 84 additions & 7 deletions pkg/solana/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ import (
)

const (
blockFieldName = "block_number"
chainIDFieldName = "chain_id"
timestampFieldName = "block_timestamp"
txHashFieldName = "tx_hash"
addressFieldName = "address"
eventSigFieldName = "event_sig"
defaultSort = "block_number ASC, log_index ASC"
blockFieldName = "block_number"
chainIDFieldName = "chain_id"
timestampFieldName = "block_timestamp"
txHashFieldName = "tx_hash"
addressFieldName = "address"
eventSigFieldName = "event_sig"
defaultSort = "block_number ASC, log_index ASC"
subKeyValuesFieldName = "subkey_values"
subKeyValueArg = "subkey_value"
subKeyIndexArgName = "subkey_index"
)

var (
Expand Down Expand Up @@ -54,6 +57,80 @@ var _ primitives.Visitor = (*pgDSLParser)(nil)

func (v *pgDSLParser) Comparator(_ primitives.Comparator) {}

type IndexedValueComparator struct {
Value IndexedValue
Operator primitives.ComparisonOperator
}

type eventBySubKeyFilter struct {
SubKeyIndex uint64
ValueComparers []IndexedValueComparator
}

func (f *eventBySubKeyFilter) Accept(visitor primitives.Visitor) {
switch v := visitor.(type) {
case *pgDSLParser:
v.VisitEventSubKeysByValueFilter(f)
}
}

func NewEventBySubKeyFilter(subKeyIndex uint64, valueComparers []primitives.ValueComparator) (query.Expression, error) {
var indexedValueComparators []IndexedValueComparator
for _, cmp := range valueComparers {
iVal, err := newIndexedValue(cmp.Value)
if err != nil {
return query.Expression{}, err
}
iValCmp := IndexedValueComparator{
Value: iVal,
Operator: cmp.Operator,
}
indexedValueComparators = append(indexedValueComparators, iValCmp)
}
return query.Expression{
Primitive: &eventBySubKeyFilter{
SubKeyIndex: subKeyIndex,
ValueComparers: indexedValueComparators,
},
}, nil
}

func (v *pgDSLParser) VisitEventSubKeysByValueFilter(p *eventBySubKeyFilter) {
if len(p.ValueComparers) > 0 {
if p.SubKeyIndex > 3 { // For now, maximum # of fields that can be indexed is 4--we can increase this if needed by adding more db indexes
v.err = fmt.Errorf("invalid subKey index: %d", p.SubKeyIndex)
return
}

// Add 1 since postgresql arrays are 1-indexed.
subKeyIdx := v.args.withIndexedField(subKeyIndexArgName, p.SubKeyIndex+1)

comps := make([]string, len(p.ValueComparers))
for idx, comp := range p.ValueComparers {
comps[idx], v.err = makeComp(comp, v.args, subKeyValueArg, subKeyIdx, subKeyValuesFieldName+"[:%s] %s :%s")
if v.err != nil {
return
}
}

v.expression = strings.Join(comps, " AND ")
}
}

func makeComp(comp IndexedValueComparator, args *queryArgs, field, subfield, pattern string) (string, error) {
cmp, err := cmpOpToString(comp.Operator)
if err != nil {
return "", err
}

return fmt.Sprintf(
pattern,
subfield,
cmp,
args.withIndexedField(field, comp.Value),
), nil
}

func (v *pgDSLParser) Block(prim primitives.Block) {
cmp, err := cmpOpToString(prim.Operator)
if err != nil {
Expand Down
Loading
Loading