Skip to content

Commit

Permalink
feat(events): adds duplicate cache + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mattevans committed Jan 9, 2025
1 parent a9286ea commit 51d8dce
Show file tree
Hide file tree
Showing 25 changed files with 1,012 additions and 25 deletions.
21 changes: 18 additions & 3 deletions cmd/sentry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/ethpandaops/contributoor/internal/clockdrift"
"github.com/ethpandaops/contributoor/internal/events"
"github.com/ethpandaops/contributoor/internal/sinks"
"github.com/ethpandaops/contributoor/pkg/config/v1"
"github.com/ethpandaops/contributoor/pkg/ethereum"
Expand All @@ -29,6 +30,7 @@ type contributoor struct {
beaconNode *ethereum.BeaconNode
clockDrift clockdrift.ClockDrift
sinks []sinks.ContributoorSink
cache *events.DuplicateCache
metricsServer *http.Server
pprofServer *http.Server
}
Expand Down Expand Up @@ -82,6 +84,10 @@ func main() {
return err
}

if err := s.initCache(); err != nil {
return err
}

if err := s.initBeaconNode(); err != nil {
return err
}
Expand All @@ -100,13 +106,13 @@ func main() {
}()

if err := s.start(ctx); err != nil {
// Cancel context to trigger cleanup
// Cancel context to trigger cleanup.
cancel()

// Wait for cleanup to complete
// Wait for cleanup to complete.
<-done

// Only return the error if it's not due to context cancellation
// Only return the error if it's not due to context cancellation.
if err != context.Canceled {
return err
}
Expand Down Expand Up @@ -150,6 +156,8 @@ func (s *contributoor) start(ctx context.Context) error {
return err
}

s.cache.Start()

return s.beaconNode.Start(ctx)
}

Expand Down Expand Up @@ -279,6 +287,12 @@ func (s *contributoor) initSinks(ctx context.Context, debug bool) error {
return nil
}

func (s *contributoor) initCache() error {
s.cache = events.NewDuplicateCache()

return nil
}

func (s *contributoor) initBeaconNode() error {
b, err := ethereum.NewBeaconNode(
s.log,
Expand All @@ -289,6 +303,7 @@ func (s *contributoor) initBeaconNode() error {
s.name,
s.sinks,
s.clockDrift,
s.cache,
&ethereum.Options{},
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ require (
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8Rv
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
63 changes: 63 additions & 0 deletions internal/events/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package events

import (
"time"

"github.com/jellydator/ttlcache/v3"
)

type DuplicateCache struct {
BeaconETHV1EventsAttestation *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsBlock *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsChainReorg *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsFinalizedCheckpoint *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsHead *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsVoluntaryExit *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsContributionAndProof *ttlcache.Cache[string, time.Time]
BeaconETHV1EventsBlobSidecar *ttlcache.Cache[string, time.Time]
}

const (
// best to keep this > 1 epoch as some clients may send the same attestation on new epoch.
TTL = 7 * time.Minute
)

func NewDuplicateCache() *DuplicateCache {
return &DuplicateCache{
BeaconETHV1EventsAttestation: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsBlock: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsChainReorg: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsFinalizedCheckpoint: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsHead: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsVoluntaryExit: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsContributionAndProof: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
BeaconETHV1EventsBlobSidecar: ttlcache.New(
ttlcache.WithTTL[string, time.Time](TTL),
),
}
}

func (d *DuplicateCache) Start() {
go d.BeaconETHV1EventsAttestation.Start()
go d.BeaconETHV1EventsBlock.Start()
go d.BeaconETHV1EventsChainReorg.Start()
go d.BeaconETHV1EventsFinalizedCheckpoint.Start()
go d.BeaconETHV1EventsHead.Start()
go d.BeaconETHV1EventsVoluntaryExit.Start()
go d.BeaconETHV1EventsContributionAndProof.Start()
go d.BeaconETHV1EventsBlobSidecar.Start()
}
5 changes: 5 additions & 0 deletions internal/events/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"context"
"time"

"github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -28,6 +29,8 @@ type BeaconDataProvider interface {
committeeIndex phase0.CommitteeIndex,
position uint64,
) (phase0.ValidatorIndex, error)
// Synced returns true if the beacon node is synced.
Synced(ctx context.Context) error
}

// Event is the interface that all events must implement.
Expand All @@ -42,6 +45,8 @@ type Event interface {
Data() interface{}
// Decorated returns the decorated event.
Decorated() *xatu.DecoratedEvent
// Ignore returns true if the event should be ignored.
Ignore(ctx context.Context) (bool, error)
}

// BaseEvent provides common functionality for all events.
Expand Down
15 changes: 15 additions & 0 deletions internal/events/mock/beacon_data_provider.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions internal/events/mock/event.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions internal/events/v1/attestation.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1

import (
"context"
"fmt"
"time"

Expand All @@ -9,6 +10,8 @@ import (
xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/mitchellh/hashstructure/v2"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -20,12 +23,14 @@ type AttestationEvent struct {
log logrus.FieldLogger
data *phase0.Attestation
beacon events.BeaconDataProvider
cache *ttlcache.Cache[string, time.Time]
recvTime time.Time
}

func NewAttestationEvent(
log logrus.FieldLogger,
beacon events.BeaconDataProvider,
cache *ttlcache.Cache[string, time.Time],
meta *xatu.Meta,
data *phase0.Attestation,
recvTime time.Time,
Expand All @@ -34,6 +39,7 @@ func NewAttestationEvent(
BaseEvent: events.NewBaseEvent(meta),
data: data,
beacon: beacon,
cache: cache,
recvTime: recvTime,
log: log.WithField("event", xatu.Event_BEACON_API_ETH_V1_EVENTS_ATTESTATION_V2.String()),
}
Expand Down Expand Up @@ -141,3 +147,27 @@ func (e *AttestationEvent) Decorated() *xatu.DecoratedEvent {

return decorated
}

func (e *AttestationEvent) Ignore(ctx context.Context) (bool, error) {
if err := e.beacon.Synced(ctx); err != nil {
return true, err
}

hash, err := hashstructure.Hash(e.data, hashstructure.FormatV2, nil)
if err != nil {
return true, err
}

item, retrieved := e.cache.GetOrSet(fmt.Sprint(hash), e.recvTime, ttlcache.WithTTL[string, time.Time](ttlcache.DefaultTTL))
if retrieved {
e.log.WithFields(logrus.Fields{
"hash": hash,
"time_since_first_item": time.Since(item.Value()),
"slot": e.data.Data.Slot,
}).Debug("Duplicate attestation event received")

return true, nil
}

return false, nil
}
Loading

0 comments on commit 51d8dce

Please sign in to comment.