Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 14, 2023
1 parent 411b672 commit de27598
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 160 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ require (
github.com/rogpeppe/go-internal v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down Expand Up @@ -49,13 +49,13 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
55 changes: 0 additions & 55 deletions streams/channel_definition_cache.go

This file was deleted.

9 changes: 0 additions & 9 deletions streams/channel_definition_cache_test.go

This file was deleted.

25 changes: 25 additions & 0 deletions streams/offchain_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package streams

import (
"encoding/json"
"fmt"
)

type OffchainConfig struct {
// TODO
// ExpirationWindow uint32 `json:"expirationWindow"` // Integer number of seconds
// BaseUSDFee decimal.Decimal `json:"baseUSDFee"` // Base USD fee
}

// TODO: Use protobuf?
func DecodeOffchainConfig(b []byte) (o OffchainConfig, err error) {
err = json.Unmarshal(b, &o)
if err != nil {
return o, fmt.Errorf("failed to decode offchain config: must be valid JSON (got: 0x%x); %w", b, err)
}
return
}

func (c OffchainConfig) Encode() ([]byte, error) {
return json.Marshal(c)
}
1 change: 1 addition & 0 deletions streams/onchain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type OnchainConfig struct {

var _ OnchainConfigCodec = &JSONOnchainConfigCodec{}

// TODO: Replace this with protobuf
type JSONOnchainConfigCodec struct{}

func (c *JSONOnchainConfigCodec) Encode(OnchainConfig) ([]byte, error) {
Expand Down
74 changes: 26 additions & 48 deletions streams/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ const MAX_OUTCOME_CHANNEL_DEFINITIONS_LENGTH = 500
// TODO: generalize from *big.Int to anything
// TODO: Consider renaming to StreamDataPoints?
// FIXME: error vs. valid
type StreamValues map[StreamID]ObsResult[*big.Int]
type StreamValues map[commontypes.StreamID]ObsResult[*big.Int]

type DataSource interface {
// For each known streamID, Observe should return a non-nil entry in
// StreamValues. Observe should ignore unknown streamIDs.
Observe(ctx context.Context, streamIDs map[StreamID]struct{}) (StreamValues, error)
Observe(ctx context.Context, streamIDs map[commontypes.StreamID]struct{}) (StreamValues, error)
}

// type LifeCycleStage string
Expand All @@ -65,7 +65,7 @@ const (
type RetirementReport struct {
// Carries validity time stamps between protocol instances to ensure there
// are no gaps
ValidAfterSeconds map[ChannelID]uint32
ValidAfterSeconds map[commontypes.ChannelID]uint32
}

type ShouldRetireCache interface { // reads asynchronously from onchain ConfigurationStore
Expand Down Expand Up @@ -96,28 +96,6 @@ const (
// Solana, CosmWasm, kalechain, etc... all go here
)

// QUESTION: Do we also want to include an (optional) designated verifier
// address, i.e. the only address allowed to verify reports from this channel
type ChannelDefinition struct {
ReportFormat commontypes.StreamsReportFormat
// Specifies the chain on which this channel can be verified. Currently uses
// CCIP chain selectors.
ChainSelector uint64
// We assume that StreamIDs is always non-empty and that the 0-th stream
// contains the verification price in LINK and the 1-st stream contains the
// verification price in the native coin.
StreamIDs []StreamID
}

type ChannelDefinitionWithID struct {
ChannelDefinition
ChannelID ChannelID
}

type ChannelDefinitions map[ChannelID]ChannelDefinition

type ChannelHash [32]byte

func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash {
h := sha256.New()
h.Write(cd.ChannelID[:])
Expand Down Expand Up @@ -189,7 +167,7 @@ func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash {
// An ReportingPlugin instance will only ever serve a single protocol instance.
var _ ocr3types.ReportingPluginFactory[commontypes.StreamsReportInfo] = &PluginFactory{}

func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[commontypes.StreamsReportFormat]ReportCodec) *PluginFactory {
func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc commontypes.ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[commontypes.StreamsReportFormat]ReportCodec) *PluginFactory {
return &PluginFactory{
prrc, src, cdc, ds, lggr, codecs,
}
Expand All @@ -198,7 +176,7 @@ func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCac
type PluginFactory struct {
PredecessorRetirementReportCache PredecessorRetirementReportCache
ShouldRetireCache ShouldRetireCache
ChannelDefinitionCache ChannelDefinitionCache
ChannelDefinitionCache commontypes.ChannelDefinitionCache
DataSource DataSource
Logger logger.Logger
Codecs map[commontypes.StreamsReportFormat]ReportCodec
Expand Down Expand Up @@ -256,7 +234,7 @@ type StreamsPlugin struct {
ConfigDigest types.ConfigDigest
PredecessorRetirementReportCache PredecessorRetirementReportCache
ShouldRetireCache ShouldRetireCache
ChannelDefinitionCache ChannelDefinitionCache
ChannelDefinitionCache commontypes.ChannelDefinitionCache
DataSource DataSource
Logger logger.Logger
F int
Expand Down Expand Up @@ -289,8 +267,8 @@ type Observation struct {
// Timestamp from when observation is made
UnixTimestampNanoseconds int64
// Votes to remove/add channels. Subject to MAX_OBSERVATION_*_LENGTH limits
RemoveChannelIDs map[ChannelID]struct{}
AddChannelDefinitions ChannelDefinitions
RemoveChannelIDs map[commontypes.ChannelID]struct{}
AddChannelDefinitions commontypes.ChannelDefinitions
// Observed (numeric) stream values. Subject to
// MAX_OBSERVATION_STREAM_VALUES_LENGTH limit
StreamValues StreamValues
Expand Down Expand Up @@ -339,10 +317,10 @@ func (p *StreamsPlugin) Observation(ctx context.Context, outctx ocr3types.Outcom

// vote to remove channel ids if they're in the previous outcome
// ChannelDefinitions or ValidAfterSeconds
removeChannelIDs := map[ChannelID]struct{}{}
removeChannelIDs := map[commontypes.ChannelID]struct{}{}
// vote to add channel definitions that aren't present in the previous
// outcome ChannelDefinitions
var addChannelDefinitions ChannelDefinitions
var addChannelDefinitions commontypes.ChannelDefinitions
{
expectedChannelDefs := p.ChannelDefinitionCache.Definitions()

Expand All @@ -365,7 +343,7 @@ func (p *StreamsPlugin) Observation(ctx context.Context, outctx ocr3types.Outcom

var streamValues StreamValues
{
streams := map[StreamID]struct{}{}
streams := map[commontypes.StreamID]struct{}{}
for _, channelDefinition := range previousOutcome.ChannelDefinitions {
for _, streamID := range channelDefinition.StreamIDs {
streams[streamID] = struct{}{}
Expand Down Expand Up @@ -454,14 +432,14 @@ type Outcome struct {
ObservationsTimestampNanoseconds int64
// ChannelDefinitions defines the set & structure of channels for which we
// generate reports
ChannelDefinitions ChannelDefinitions
ChannelDefinitions commontypes.ChannelDefinitions
// Latest ValidAfterSeconds value for each channel, reports for each channel
// span from ValidAfterSeconds to ObservationTimestampSeconds
ValidAfterSeconds map[ChannelID]uint32
ValidAfterSeconds map[commontypes.ChannelID]uint32
// StreamMedians is the median observed value for each stream
// QUESTION: Can we use arbitrary types here to allow for other types or
// consensus methods?
StreamMedians map[StreamID]*big.Int
StreamMedians map[commontypes.StreamID]*big.Int
}

// The Outcome's ObservationsTimestamp rounded down to seconds precision
Expand All @@ -475,7 +453,7 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) {

// Indicates whether a report can be generated for the given channel.
// TODO: Return error indicating why it isn't reportable
func (out *Outcome) IsReportable(channelID ChannelID) bool {
func (out *Outcome) IsReportable(channelID commontypes.ChannelID) bool {
if out.LifeCycleStage == LifeCycleStageRetired {
return false
}
Expand Down Expand Up @@ -516,8 +494,8 @@ func (out *Outcome) IsReportable(channelID ChannelID) bool {

// List of reportable channels (according to IsReportable), sorted according
// to a canonical ordering
func (out *Outcome) ReportableChannels() []ChannelID {
result := []ChannelID{}
func (out *Outcome) ReportableChannels() []commontypes.ChannelID {
result := []commontypes.ChannelID{}

for channelID := range out.ChannelDefinitions {
if !out.IsReportable(channelID) {
Expand Down Expand Up @@ -582,13 +560,13 @@ func (p *StreamsPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Que

timestampsNanoseconds := []int64{}

removeChannelVotesByID := map[ChannelID]int{}
removeChannelVotesByID := map[commontypes.ChannelID]int{}

// for each channelId count number of votes that mention it and count number of votes that include it.
addChannelVotesByHash := map[ChannelHash]int{}
addChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{}

streamObservations := map[StreamID][]*big.Int{}
streamObservations := map[commontypes.StreamID][]*big.Int{}

for _, ao := range aos {
observation := Observation{}
Expand Down Expand Up @@ -663,15 +641,15 @@ func (p *StreamsPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Que
/////////////////////////////////
outcome.ChannelDefinitions = previousOutcome.ChannelDefinitions
if outcome.ChannelDefinitions == nil {
outcome.ChannelDefinitions = ChannelDefinitions{}
outcome.ChannelDefinitions = commontypes.ChannelDefinitions{}
}

// if retired, stop updating channel definitions
if outcome.LifeCycleStage == LifeCycleStageRetired {
removeChannelVotesByID, addChannelDefinitionsByHash = nil, nil
}

var removedChannelIDs []ChannelID
var removedChannelIDs []commontypes.ChannelID
for channelID, voteCount := range removeChannelVotesByID {
if voteCount <= p.F {
continue
Expand Down Expand Up @@ -715,7 +693,7 @@ func (p *StreamsPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Que
return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err)
}

outcome.ValidAfterSeconds = map[ChannelID]uint32{}
outcome.ValidAfterSeconds = map[commontypes.ChannelID]uint32{}
for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds {
if previousOutcome.IsReportable(channelID) {
// was reported based on previous outcome
Expand Down Expand Up @@ -753,7 +731,7 @@ func (p *StreamsPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Que
/////////////////////////////////
// outcome.StreamMedians
/////////////////////////////////
outcome.StreamMedians = map[StreamID]*big.Int{}
outcome.StreamMedians = map[commontypes.StreamID]*big.Int{}
for streamID, observations := range streamObservations {
sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 })
if len(observations) <= p.F {
Expand All @@ -774,7 +752,7 @@ type Report struct {
// OCR sequence number of this report
SeqNr uint64
// Channel that is being reported on
ChannelID ChannelID
ChannelID commontypes.ChannelID
// Report is valid for ValidAfterSeconds < block.time <= ValidUntilSeconds
ValidAfterSeconds uint32
ValidUntilSeconds uint32
Expand Down Expand Up @@ -927,7 +905,7 @@ func (p *StreamsPlugin) Close() error {
return nil
}

func subtractChannelDefinitions(minuend ChannelDefinitions, subtrahend ChannelDefinitions, limit int) ChannelDefinitions {
func subtractChannelDefinitions(minuend commontypes.ChannelDefinitions, subtrahend commontypes.ChannelDefinitions, limit int) commontypes.ChannelDefinitions {
differenceList := []ChannelDefinitionWithID{}
for channelID, channelDefinition := range minuend {
if _, ok := subtrahend[channelID]; !ok {
Expand All @@ -944,7 +922,7 @@ func subtractChannelDefinitions(minuend ChannelDefinitions, subtrahend ChannelDe
differenceList = differenceList[:limit]
}

difference := ChannelDefinitions{}
difference := commontypes.ChannelDefinitions{}
for _, defWithID := range differenceList {
difference[defWithID.ChannelID] = defWithID.ChannelDefinition
}
Expand Down
Loading

0 comments on commit de27598

Please sign in to comment.