diff --git a/llo/channel_definitions.go b/llo/channel_definitions.go index 66b7136..ccdc2f4 100644 --- a/llo/channel_definitions.go +++ b/llo/channel_definitions.go @@ -2,6 +2,7 @@ package llo import ( "fmt" + "sort" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" ) @@ -27,3 +28,28 @@ func VerifyChannelDefinitions(channelDefs llotypes.ChannelDefinitions) error { } return nil } + +func subtractChannelDefinitions(minuend llotypes.ChannelDefinitions, subtrahend llotypes.ChannelDefinitions, limit int) llotypes.ChannelDefinitions { + differenceList := []ChannelDefinitionWithID{} + for channelID, channelDefinition := range minuend { + if _, ok := subtrahend[channelID]; !ok { + differenceList = append(differenceList, ChannelDefinitionWithID{channelDefinition, channelID}) + } + } + + // Sort so we return deterministic result + sort.Slice(differenceList, func(i, j int) bool { + return differenceList[i].ChannelID < differenceList[j].ChannelID + }) + + if len(differenceList) > limit { + differenceList = differenceList[:limit] + } + + difference := llotypes.ChannelDefinitions{} + for _, defWithID := range differenceList { + difference[defWithID.ChannelID] = defWithID.ChannelDefinition + } + + return difference +} diff --git a/llo/json_codec.go b/llo/json_report_codec.go similarity index 98% rename from llo/json_codec.go rename to llo/json_report_codec.go index 77c9518..7b59347 100644 --- a/llo/json_codec.go +++ b/llo/json_report_codec.go @@ -55,6 +55,9 @@ func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition) ([]byt } values := make([]JSONStreamValue, len(r.Values)) for i, sv := range r.Values { + if sv == nil { + return nil, ErrNilStreamValue + } b, err := sv.MarshalText() if err != nil { return nil, fmt.Errorf("failed to encode StreamValue: %w", err) diff --git a/llo/json_codec_test.go b/llo/json_report_codec_test.go similarity index 100% rename from llo/json_codec_test.go rename to llo/json_report_codec_test.go diff --git a/llo/onchain_config.go b/llo/onchain_config.go deleted file mode 100644 index a240b38..0000000 --- a/llo/onchain_config.go +++ /dev/null @@ -1,16 +0,0 @@ -package llo - -type OnchainConfig struct{} - -var _ OnchainConfigCodec = &JSONOnchainConfigCodec{} - -// TODO: Replace this with protobuf, if it is actually used for something -type JSONOnchainConfigCodec struct{} - -func (c *JSONOnchainConfigCodec) Encode(OnchainConfig) ([]byte, error) { - return nil, nil -} - -func (c *JSONOnchainConfigCodec) Decode([]byte) (OnchainConfig, error) { - return OnchainConfig{}, nil -} diff --git a/llo/plugin.go b/llo/plugin.go index c3e58c7..426103f 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -2,15 +2,7 @@ package llo import ( "context" - "crypto/sha256" - "encoding/binary" - "encoding/json" - "errors" "fmt" - "sort" - "time" - - "golang.org/x/exp/maps" "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" @@ -114,28 +106,6 @@ type ChannelDefinitionCache interface { Definitions() llotypes.ChannelDefinitions } -// MakeChannelHash is used for mapping ChannelDefinitionWithIDs -func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { - h := sha256.New() - merr := errors.Join( - binary.Write(h, binary.BigEndian, cd.ChannelID), - binary.Write(h, binary.BigEndian, cd.ReportFormat), - binary.Write(h, binary.BigEndian, uint32(len(cd.Streams))), - ) - for _, strm := range cd.Streams { - merr = errors.Join(merr, binary.Write(h, binary.BigEndian, strm.StreamID)) - merr = errors.Join(merr, binary.Write(h, binary.BigEndian, strm.Aggregator)) - } - if merr != nil { - // This should never happen - panic(merr) - } - h.Write(cd.Opts) - var result [32]byte - h.Sum(result[:0]) - return result -} - // A ReportingPlugin allows plugging custom logic into the OCR3 protocol. The OCR // protocol handles cryptography, networking, ensuring that a sufficient number // of nodes is in agreement about any report, transmitting the report to the @@ -248,6 +218,8 @@ var _ ocr3types.ReportingPlugin[llotypes.ReportInfo] = &Plugin{} type ReportCodec interface { // Encode may be lossy, so no Decode function is expected + // Encode should handle nil stream aggregate values without panicking (it + // may return error instead) Encode(Report, llotypes.ChannelDefinition) ([]byte, error) } @@ -283,25 +255,6 @@ func (p *Plugin) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (ty return nil, nil } -type Observation struct { - // Attested (i.e. signed by f+1 oracles) retirement report from predecessor - // protocol instance - AttestedPredecessorRetirement []byte - // Should this protocol instance be retired? - ShouldRetire bool - // Timestamp from when observation is made - // Note that this is the timestamp immediately before we initiate any - // observations - UnixTimestampNanoseconds int64 - // Votes to remove/add channels. Subject to MAX_OBSERVATION_*_LENGTH limits - RemoveChannelIDs map[llotypes.ChannelID]struct{} - // Votes to add or replace channel definitions - UpdateChannelDefinitions llotypes.ChannelDefinitions - // Observed (numeric) stream values. Subject to - // MaxObservationStreamValuesLength limit - StreamValues StreamValues -} - // Observation gets an observation from the underlying data source. Returns // a value or an error. // @@ -312,151 +265,7 @@ type Observation struct { // // Should return a serialized Observation struct. func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { - // NOTE: First sequence number is always 1 (0 is invalid) - if outctx.SeqNr < 1 { - return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr) - } else if outctx.SeqNr == 1 { - // First round always has empty PreviousOutcome - // Don't bother observing on the first ever round, because the result - // will never be used anyway. - // See case at the top of Outcome() - return types.Observation{}, nil - } - // Second round will have no channel definitions yet, but may vote to add - // them - - // QUESTION: is there a way to have this captured in EAs so we get something - // closer to the source? - nowNanoseconds := time.Now().UnixNano() - - previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) - if err != nil { - return nil, fmt.Errorf("error unmarshalling previous outcome: %w", err) - } - - var attestedRetirementReport []byte - // Only try to fetch this from the cache if this instance if configured - // with a predecessor and we're still in the staging stage. - if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { - var err2 error - attestedRetirementReport, err2 = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) - if err2 != nil { - return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err2) - } - } - - shouldRetire, err := p.ShouldRetireCache.ShouldRetire() - if err != nil { - return nil, fmt.Errorf("error fetching shouldRetire from cache: %w", err) - } - - // vote to remove channel ids if they're in the previous outcome - // ChannelDefinitions or ValidAfterSeconds - removeChannelIDs := map[llotypes.ChannelID]struct{}{} - // vote to add channel definitions that aren't present in the previous - // outcome ChannelDefinitions - // FIXME: Why care about ValidAfterSeconds here? - var updateChannelDefinitions llotypes.ChannelDefinitions - { - // NOTE: Be careful using maps, since key ordering is randomized! All - // addition/removal lists must be built deterministically so that nodes - // can agree on the same set of changes. - // - // ChannelIDs should always be sorted the same way (channel ID ascending). - expectedChannelDefs := p.ChannelDefinitionCache.Definitions() - if err := VerifyChannelDefinitions(expectedChannelDefs); err != nil { - return nil, fmt.Errorf("ChannelDefinitionCache.Definitions is invalid: %w", err) - } - - removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MaxObservationRemoveChannelIDsLength) - for channelID := range removeChannelDefinitions { - removeChannelIDs[channelID] = struct{}{} - } - - // TODO: needs testing - validAfterSecondsChannelIDs := maps.Keys(previousOutcome.ValidAfterSeconds) - // Sort so we cut off deterministically - sortChannelIDs(validAfterSecondsChannelIDs) - for _, channelID := range validAfterSecondsChannelIDs { - if len(removeChannelIDs) >= MaxObservationRemoveChannelIDsLength { - break - } - if _, ok := expectedChannelDefs[channelID]; !ok { - removeChannelIDs[channelID] = struct{}{} - } - } - - // NOTE: This is slow because it deeply compares every value in the map. - // To improve performance, consider changing channel voting to happen - // every N rounds instead of every round. Or, alternatively perhaps the - // first e.g. 100 rounds could check every round to allow for fast feed - // spinup, then after that every 10 or 100 rounds. - updateChannelDefinitions = make(llotypes.ChannelDefinitions) - expectedChannelIDs := maps.Keys(expectedChannelDefs) - // Sort so we cut off deterministically - sortChannelIDs(expectedChannelIDs) - for _, channelID := range expectedChannelIDs { - prev, exists := previousOutcome.ChannelDefinitions[channelID] - channelDefinition := expectedChannelDefs[channelID] - if exists && prev.Equals(channelDefinition) { - continue - } - // Add or replace channel - updateChannelDefinitions[channelID] = channelDefinition - if len(updateChannelDefinitions) >= MaxObservationUpdateChannelDefinitionsLength { - // Never add more than MaxObservationUpdateChannelDefinitionsLength - break - } - } - - if len(updateChannelDefinitions) > 0 { - p.Logger.Debugw("Voting to update channel definitions", - "updateChannelDefinitions", updateChannelDefinitions, - "seqNr", outctx.SeqNr, - "stage", "Observation") - } - if len(removeChannelIDs) > 0 { - p.Logger.Debugw("Voting to remove channel definitions", - "removeChannelIDs", removeChannelIDs, - "seqNr", outctx.SeqNr, - "stage", "Observation", - ) - } - } - - var streamValues StreamValues - if len(previousOutcome.ChannelDefinitions) == 0 { - p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "stage", "Observation", "seqNr", outctx.SeqNr) - } else { - streamValues = make(StreamValues) - for _, channelDefinition := range previousOutcome.ChannelDefinitions { - for _, strm := range channelDefinition.Streams { - streamValues[strm.StreamID] = nil - } - } - - if err := p.DataSource.Observe(ctx, streamValues, dsOpts{p.Config.VerboseLogging, outctx.SeqNr}); err != nil { - return nil, fmt.Errorf("DataSource.Observe error: %w", err) - } - } - - var rawObservation []byte - { - var err error - rawObservation, err = p.ObservationCodec.Encode(Observation{ - attestedRetirementReport, - shouldRetire, - nowNanoseconds, - removeChannelIDs, - updateChannelDefinitions, - streamValues, - }) - if err != nil { - return nil, fmt.Errorf("Observation encode error: %w", err) - } - } - - return rawObservation, nil + return p.observation(ctx, outctx, query) } // Should return an error if an observation isn't well-formed. @@ -507,121 +316,6 @@ func (p *Plugin) ValidateObservation(outctx ocr3types.OutcomeContext, query type return nil } -type Outcome struct { - // LifeCycleStage the protocol is in - LifeCycleStage llotypes.LifeCycleStage - // ObservationsTimestampNanoseconds is the median timestamp from the - // latest set of observations - ObservationsTimestampNanoseconds int64 - // ChannelDefinitions defines the set & structure of channels for which we - // generate reports - ChannelDefinitions llotypes.ChannelDefinitions - // Latest ValidAfterSeconds value for each channel, reports for each channel - // span from ValidAfterSeconds to ObservationTimestampSeconds - ValidAfterSeconds map[llotypes.ChannelID]uint32 - // StreamAggregates contains stream IDs mapped to various aggregations. - // Usually you will only have one aggregation type per stream but since - // channels can define different aggregation methods, sometimes we will - // need multiple. - StreamAggregates StreamAggregates -} - -// The Outcome's ObservationsTimestamp rounded down to seconds precision -func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { - result := time.Unix(0, out.ObservationsTimestampNanoseconds).Unix() - if int64(uint32(result)) != result { - return 0, fmt.Errorf("timestamp doesn't fit into uint32: %v", result) - } - return uint32(result), nil -} - -// Indicates whether a report can be generated for the given channel. -// Returns nil if channel is reportable -// TODO: Test this function -func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableChannel { - if out.LifeCycleStage == LifeCycleStageRetired { - return &ErrUnreportableChannel{nil, "IsReportable=false; retired channel", channelID} - } - - observationsTimestampSeconds, err := out.ObservationsTimestampSeconds() - if err != nil { - return &ErrUnreportableChannel{err, "IsReportable=false; invalid observations timestamp", channelID} - } - - channelDefinition, exists := out.ChannelDefinitions[channelID] - if !exists { - return &ErrUnreportableChannel{nil, "IsReportable=false; no channel definition with this ID", channelID} - } - - for _, strm := range channelDefinition.Streams { - if out.StreamAggregates[strm.StreamID] == nil { - // FIXME: Is this comment actually correct? - // This can happen in normal operation, because in Report() we use - // the ChannelDefinitions in the generated Outcome. But that was - // compiled with Observations made using the ChannelDefinitions - // from the PREVIOUS outcome. So if channel definitions have been - // added in this round, we would not expect there to be - // observations present for new streams in those channels. - return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; median was nil for stream %d", strm.StreamID), channelID} - } - } - - if _, ok := out.ValidAfterSeconds[channelID]; !ok { - // No validAfterSeconds entry yet, this must be a new channel. - // validAfterSeconds will be populated in Outcome() so the channel - // becomes reportable in later protocol rounds. - // TODO: Test this case, haven't seen it in prod logs even though it would be expected - return &ErrUnreportableChannel{nil, "IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", channelID} - } - - if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds { - return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds), channelID} - } - - return nil -} - -type ErrUnreportableChannel struct { - Inner error - Reason string - ChannelID llotypes.ChannelID -} - -func (e *ErrUnreportableChannel) Error() string { - s := fmt.Sprintf("ChannelID: %d; Reason: %s", e.ChannelID, e.Reason) - if e.Inner != nil { - s += fmt.Sprintf("; Err: %v", e.Inner) - } - return s -} - -func (e *ErrUnreportableChannel) String() string { - return e.Error() -} - -func (e *ErrUnreportableChannel) Unwrap() error { - return e.Inner -} - -// List of reportable channels (according to IsReportable), sorted according -// to a canonical ordering -// TODO: test this -func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unreportable []*ErrUnreportableChannel) { - for channelID := range out.ChannelDefinitions { - if err := out.IsReportable(channelID); err != nil { - unreportable = append(unreportable, err) - } else { - reportable = append(reportable, channelID) - } - } - - sort.Slice(reportable, func(i, j int) bool { - return reportable[i] < reportable[j] - }) - - return -} - // Generates an outcome for a seqNr, typically based on the previous // outcome, the current query, and the current set of attributed // observations. @@ -636,330 +330,7 @@ func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unrep // libocr guarantees that this will always be called with at least 2f+1 // AttributedObservations func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { - if len(aos) < 2*p.F+1 { - return nil, fmt.Errorf("invariant violation: expected at least 2f+1 attributed observations, got %d (f: %d)", len(aos), p.F) - } - - // Initial outcome is kind of a "keystone" with minimum extra information - if outctx.SeqNr <= 1 { - // Initial Outcome - var lifeCycleStage llotypes.LifeCycleStage - if p.PredecessorConfigDigest == nil { - // Start straight in production if we have no predecessor - lifeCycleStage = LifeCycleStageProduction - } else { - lifeCycleStage = LifeCycleStageStaging - } - outcome := Outcome{ - lifeCycleStage, - 0, - nil, - nil, - nil, - } - return p.OutcomeCodec.Encode(outcome) - } - - ///////////////////////////////// - // Decode previousOutcome - ///////////////////////////////// - previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) - if err != nil { - return nil, fmt.Errorf("error decoding previous outcome: %v", err) - } - - ///////////////////////////////// - // Decode observations - ///////////////////////////////// - - // a single valid retirement report is enough - var validPredecessorRetirementReport *RetirementReport - - shouldRetireVotes := 0 - - timestampsNanoseconds := []int64{} - - removeChannelVotesByID := map[llotypes.ChannelID]int{} - - // for each channelId count number of votes that mention it and count number of votes that include it. - updateChannelVotesByHash := map[ChannelHash]int{} - updateChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} - - streamObservations := make(map[llotypes.StreamID][]StreamValue) - - for _, ao := range aos { - // TODO: Put in a function - // MERC-3524 - observation, err2 := p.ObservationCodec.Decode(ao.Observation) - if err2 != nil { - p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err2) - continue - } - - if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { - pcd := *p.PredecessorConfigDigest - retirementReport, err3 := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) - if err3 != nil { - p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err3, "predecessorConfigDigest", pcd) - continue - } - validPredecessorRetirementReport = &retirementReport - } - - if observation.ShouldRetire { - shouldRetireVotes++ - } - - timestampsNanoseconds = append(timestampsNanoseconds, observation.UnixTimestampNanoseconds) - - for channelID := range observation.RemoveChannelIDs { - removeChannelVotesByID[channelID]++ - } - - for channelID, channelDefinition := range observation.UpdateChannelDefinitions { - defWithID := ChannelDefinitionWithID{channelDefinition, channelID} - channelHash := MakeChannelHash(defWithID) - updateChannelVotesByHash[channelHash]++ - updateChannelDefinitionsByHash[channelHash] = defWithID - } - - var missingObservations []llotypes.StreamID - for id, sv := range observation.StreamValues { - if sv != nil { // FIXME: nil checks don't work here. Test this and figure out what to do (also, are there other cases?) - streamObservations[id] = append(streamObservations[id], sv) - } else { - missingObservations = append(missingObservations, id) - } - } - if p.Config.VerboseLogging { - if len(missingObservations) > 0 { - sort.Slice(missingObservations, func(i, j int) bool { return missingObservations[i] < missingObservations[j] }) - p.Logger.Debugw("Peer was missing observations", "streamIDs", missingObservations, "oracleID", ao.Observer, "stage", "Outcome", "seqNr", outctx.SeqNr) - } - p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) - } - } - - if len(timestampsNanoseconds) == 0 { - return nil, errors.New("no valid observations") - } - - var outcome Outcome - - ///////////////////////////////// - // outcome.LifeCycleStage - ///////////////////////////////// - if previousOutcome.LifeCycleStage == LifeCycleStageStaging && validPredecessorRetirementReport != nil { - // Promote this protocol instance to the production stage! 🚀 - - // override ValidAfterSeconds with the value from the retirement report - // so that we have no gaps in the validity time range. - outcome.ValidAfterSeconds = validPredecessorRetirementReport.ValidAfterSeconds - outcome.LifeCycleStage = LifeCycleStageProduction - } else { - outcome.LifeCycleStage = previousOutcome.LifeCycleStage - } - - if outcome.LifeCycleStage == LifeCycleStageProduction && shouldRetireVotes > p.F { - outcome.LifeCycleStage = LifeCycleStageRetired - } - - ///////////////////////////////// - // outcome.ObservationsTimestampNanoseconds - // TODO: Refactor this into an aggregate function - // MERC-3524 - sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) - outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2] - - ///////////////////////////////// - // outcome.ChannelDefinitions - ///////////////////////////////// - outcome.ChannelDefinitions = previousOutcome.ChannelDefinitions - if outcome.ChannelDefinitions == nil { - outcome.ChannelDefinitions = llotypes.ChannelDefinitions{} - } - - // if retired, stop updating channel definitions - if outcome.LifeCycleStage == LifeCycleStageRetired { - removeChannelVotesByID, updateChannelDefinitionsByHash = nil, nil - } - - var removedChannelIDs []llotypes.ChannelID - for channelID, voteCount := range removeChannelVotesByID { - if voteCount <= p.F { - continue - } - removedChannelIDs = append(removedChannelIDs, channelID) - delete(outcome.ChannelDefinitions, channelID) - } - - type hashWithID struct { - ChannelHash - ChannelDefinitionWithID - } - orderedHashes := make([]hashWithID, 0, len(updateChannelDefinitionsByHash)) - for channelHash, dfnWithID := range updateChannelDefinitionsByHash { - orderedHashes = append(orderedHashes, hashWithID{channelHash, dfnWithID}) - } - // Use predictable order for adding channels (id asc) so that extras that - // exceed the max are consistent across all nodes - sort.Slice(orderedHashes, func(i, j int) bool { return orderedHashes[i].ChannelID < orderedHashes[j].ChannelID }) - for _, hwid := range orderedHashes { - voteCount := updateChannelVotesByHash[hwid.ChannelHash] - if voteCount <= p.F { - continue - } - defWithID := hwid.ChannelDefinitionWithID - if original, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists { - p.Logger.Debugw("Adding channel (replacement)", - "channelID", defWithID.ChannelID, - "originalChannelDefinition", original, - "replaceChannelDefinition", defWithID, - "seqNr", outctx.SeqNr, - "stage", "Outcome", - ) - outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition - } else if len(outcome.ChannelDefinitions) >= MaxOutcomeChannelDefinitionsLength { - p.Logger.Warnw("Adding channel FAILED. Cannot add channel, outcome already contains maximum number of channels", - "maxOutcomeChannelDefinitionsLength", MaxOutcomeChannelDefinitionsLength, - "addChannelDefinition", defWithID, - "seqNr", outctx.SeqNr, - "stage", "Outcome", - ) - // continue, don't break here because remaining channels might be a - // replacement rather than an addition, and this is still ok - continue - } - p.Logger.Debugw("Adding channel (new)", - "channelID", defWithID.ChannelID, - "addChannelDefinition", defWithID, - "seqNr", outctx.SeqNr, - "stage", "Outcome", - ) - outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition - } - - ///////////////////////////////// - // outcome.ValidAfterSeconds - ///////////////////////////////// - - // ValidAfterSeconds can be non-nil here if earlier code already - // populated ValidAfterSeconds during promotion to production. In this - // case, nothing to do. - if outcome.ValidAfterSeconds == nil { - previousObservationsTimestampSeconds, err2 := previousOutcome.ObservationsTimestampSeconds() - if err2 != nil { - return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err2) - } - - outcome.ValidAfterSeconds = map[llotypes.ChannelID]uint32{} - for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds { - if err3 := previousOutcome.IsReportable(channelID); err3 != nil { - if p.Config.VerboseLogging { - p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "stage", "Outcome", "seqNr", outctx.SeqNr) - } - // was reported based on previous outcome - outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds - } else { - // was skipped based on previous outcome - outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds - } - } - } - - observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() - if err != nil { - return nil, fmt.Errorf("error getting outcome's observations timestamp: %w", err) - } - - for channelID := range outcome.ChannelDefinitions { - if _, ok := outcome.ValidAfterSeconds[channelID]; !ok { - // new channel, set validAfterSeconds to observations timestamp - outcome.ValidAfterSeconds[channelID] = observationsTimestampSeconds - } - } - - // One might think that we should simply delete any channel from - // ValidAfterSeconds that is not mentioned in the ChannelDefinitions. This - // could, however, lead to gaps being created if this protocol instance is - // promoted from staging to production while we're still "ramping up" the - // full set of channels. We do the "safe" thing (i.e. minimizing occurrence - // of gaps) here and only remove channels if there has been an explicit vote - // to remove them. - for _, channelID := range removedChannelIDs { - delete(outcome.ValidAfterSeconds, channelID) - } - - ///////////////////////////////// - // outcome.StreamAggregates - ///////////////////////////////// - outcome.StreamAggregates = make(map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue, len(streamObservations)) - // Aggregation methods are defined on a per-channel basis, but we only want - // to do the minimum necessary number of aggregations (one per stream/aggregator - // pair) and re-use the same result, in case multiple channels share the - // same stream/aggregator pair. - for cid, cd := range outcome.ChannelDefinitions { - for _, strm := range cd.Streams { - sid, agg := strm.StreamID, strm.Aggregator - if _, exists := outcome.StreamAggregates[sid][agg]; exists { - // Should only happen in the case of duplicate streams, no - // need to aggregate twice - continue - } - aggF := GetAggregatorFunc(agg) - if aggF == nil { - return nil, fmt.Errorf("no aggregator function defined for aggregator of type %v", agg) - } - m, exists := outcome.StreamAggregates[sid] - if !exists { - m = make(map[llotypes.Aggregator]StreamValue) - outcome.StreamAggregates[sid] = m - } - result, err := aggF(streamObservations[sid], p.F) - if err != nil { - if p.Config.VerboseLogging { - p.Logger.Warnw("Aggregation failed", "aggregator", agg, "channelID", cid, "f", p.F, "streamID", sid, "observations", streamObservations[sid], "stage", "Outcome", "seqNr", outctx.SeqNr, "err", err) - } - // FIXME: Is this a complete failure? - // MERC-3524 - continue - } - m[agg] = result - } - } - - if p.Config.VerboseLogging { - p.Logger.Debugw("Generated outcome", "outcome", outcome, "stage", "Outcome", "seqNr", outctx.SeqNr) - } - return p.OutcomeCodec.Encode(outcome) -} - -type Report struct { - ConfigDigest types.ConfigDigest - // OCR sequence number of this report - SeqNr uint64 - // Channel that is being reported on - ChannelID llotypes.ChannelID - // Report is only valid at t > ValidAfterSeconds - ValidAfterSeconds uint32 - // ObservationTimestampSeconds is the median of all observation timestamps - // (note that this timestamp is taken immediately before we initiate any - // observations) - ObservationTimestampSeconds uint32 - // Values for every stream in the channel - Values []StreamValue - // The contract onchain will only validate non-specimen reports. A staging - // protocol instance will generate specimen reports so we can validate it - // works properly without any risk of misreports landing on chain. - Specimen bool -} - -func (p *Plugin) encodeReport(r Report, cd llotypes.ChannelDefinition) (types.Report, error) { - codec, exists := p.Codecs[cd.ReportFormat] - if !exists { - return nil, fmt.Errorf("codec missing for ReportFormat=%q", cd.ReportFormat) - } - return codec.Encode(r, cd) + return p.outcome(outctx, query, aos) } // Generates a (possibly empty) list of reports from an outcome. Each report @@ -976,84 +347,7 @@ func (p *Plugin) encodeReport(r Report, cd llotypes.ChannelDefinition) (types.Re // outctx.previousOutcome contains the consensus outcome with sequence // number (outctx.SeqNr-1). func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[llotypes.ReportInfo], error) { - if seqNr <= 1 { - // no reports for initial round - return nil, nil - } - - outcome, err := p.OutcomeCodec.Decode(rawOutcome) - if err != nil { - return nil, fmt.Errorf("error unmarshalling outcome: %w", err) - } - - observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() - if err != nil { - return nil, fmt.Errorf("error getting observations timestamp: %w", err) - } - - rwis := []ocr3types.ReportWithInfo[llotypes.ReportInfo]{} - - if outcome.LifeCycleStage == LifeCycleStageRetired { - // if we're retired, emit special retirement report to transfer - // ValidAfterSeconds part of state to the new protocol instance for a - // "gapless" handover - retirementReport := RetirementReport{ - outcome.ValidAfterSeconds, - } - - rwis = append(rwis, ocr3types.ReportWithInfo[llotypes.ReportInfo]{ - // TODO: Needs retirement report codec - Report: must(json.Marshal(retirementReport)), - Info: llotypes.ReportInfo{ - LifeCycleStage: outcome.LifeCycleStage, - ReportFormat: llotypes.ReportFormatJSON, - }, - }) - } - - reportableChannels, unreportableChannels := outcome.ReportableChannels() - if p.Config.VerboseLogging { - p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "stage", "Report", "seqNr", seqNr) - } - - for _, cid := range reportableChannels { - cd := outcome.ChannelDefinitions[cid] - values := make([]StreamValue, 0, len(cd.Streams)) - for _, strm := range cd.Streams { - // TODO: Can you ever get nil values (i.e. missing from the - // StreamAggregates) here? What happens if you do? - // MERC-3524 - values = append(values, outcome.StreamAggregates[strm.StreamID][strm.Aggregator]) - } - - report := Report{ - p.ConfigDigest, - seqNr, - cid, - outcome.ValidAfterSeconds[cid], - observationsTimestampSeconds, - values, - outcome.LifeCycleStage != LifeCycleStageProduction, - } - - encoded, err := p.encodeReport(report, cd) - if err != nil { - return nil, err - } - rwis = append(rwis, ocr3types.ReportWithInfo[llotypes.ReportInfo]{ - Report: encoded, - Info: llotypes.ReportInfo{ - LifeCycleStage: outcome.LifeCycleStage, - ReportFormat: cd.ReportFormat, - }, - }) - } - - if p.Config.VerboseLogging && len(rwis) == 0 { - p.Logger.Debugw("No reports, will not transmit anything", "reportableChannels", reportableChannels, "stage", "Report", "seqNr", seqNr) - } - - return rwis, nil + return p.reports(seqNr, rawOutcome) } func (p *Plugin) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[llotypes.ReportInfo]) (bool, error) { @@ -1081,35 +375,3 @@ func (p *Plugin) ObservationQuorum(outctx ocr3types.OutcomeContext, query types. func (p *Plugin) Close() error { return nil } - -func subtractChannelDefinitions(minuend llotypes.ChannelDefinitions, subtrahend llotypes.ChannelDefinitions, limit int) llotypes.ChannelDefinitions { - differenceList := []ChannelDefinitionWithID{} - for channelID, channelDefinition := range minuend { - if _, ok := subtrahend[channelID]; !ok { - differenceList = append(differenceList, ChannelDefinitionWithID{channelDefinition, channelID}) - } - } - - // Sort so we return deterministic result - sort.Slice(differenceList, func(i, j int) bool { - return differenceList[i].ChannelID < differenceList[j].ChannelID - }) - - if len(differenceList) > limit { - differenceList = differenceList[:limit] - } - - difference := llotypes.ChannelDefinitions{} - for _, defWithID := range differenceList { - difference[defWithID.ChannelID] = defWithID.ChannelDefinition - } - - return difference -} - -// deterministic sort of channel IDs -func sortChannelIDs(cids []llotypes.ChannelID) { - sort.Slice(cids, func(i, j int) bool { - return cids[i] < cids[j] - }) -} diff --git a/llo/plugin_observation.go b/llo/plugin_observation.go new file mode 100644 index 0000000..6c1f246 --- /dev/null +++ b/llo/plugin_observation.go @@ -0,0 +1,188 @@ +package llo + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "golang.org/x/exp/maps" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +func (p *Plugin) observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { + // NOTE: First sequence number is always 1 (0 is invalid) + if outctx.SeqNr < 1 { + return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr) + } else if outctx.SeqNr == 1 { + // First round always has empty PreviousOutcome + // Don't bother observing on the first ever round, because the result + // will never be used anyway. + // See case at the top of Outcome() + return types.Observation{}, nil + } + // Second round will have no channel definitions yet, but may vote to add + // them + + // QUESTION: is there a way to have this captured in EAs so we get something + // closer to the source? + nowNanoseconds := time.Now().UnixNano() + + previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("error unmarshalling previous outcome: %w", err) + } + + var attestedRetirementReport []byte + // Only try to fetch this from the cache if this instance if configured + // with a predecessor and we're still in the staging stage. + if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { + var err2 error + attestedRetirementReport, err2 = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) + if err2 != nil { + return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err2) + } + } + + shouldRetire, err := p.ShouldRetireCache.ShouldRetire() + if err != nil { + return nil, fmt.Errorf("error fetching shouldRetire from cache: %w", err) + } + + // vote to remove channel ids if they're in the previous outcome + // ChannelDefinitions or ValidAfterSeconds + removeChannelIDs := map[llotypes.ChannelID]struct{}{} + // vote to add channel definitions that aren't present in the previous + // outcome ChannelDefinitions + // FIXME: Why care about ValidAfterSeconds here? + var updateChannelDefinitions llotypes.ChannelDefinitions + { + // NOTE: Be careful using maps, since key ordering is randomized! All + // addition/removal lists must be built deterministically so that nodes + // can agree on the same set of changes. + // + // ChannelIDs should always be sorted the same way (channel ID ascending). + expectedChannelDefs := p.ChannelDefinitionCache.Definitions() + if err := VerifyChannelDefinitions(expectedChannelDefs); err != nil { + return nil, fmt.Errorf("ChannelDefinitionCache.Definitions is invalid: %w", err) + } + + removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MaxObservationRemoveChannelIDsLength) + for channelID := range removeChannelDefinitions { + removeChannelIDs[channelID] = struct{}{} + } + + // TODO: needs testing + validAfterSecondsChannelIDs := maps.Keys(previousOutcome.ValidAfterSeconds) + // Sort so we cut off deterministically + sortChannelIDs(validAfterSecondsChannelIDs) + for _, channelID := range validAfterSecondsChannelIDs { + if len(removeChannelIDs) >= MaxObservationRemoveChannelIDsLength { + break + } + if _, ok := expectedChannelDefs[channelID]; !ok { + removeChannelIDs[channelID] = struct{}{} + } + } + + // NOTE: This is slow because it deeply compares every value in the map. + // To improve performance, consider changing channel voting to happen + // every N rounds instead of every round. Or, alternatively perhaps the + // first e.g. 100 rounds could check every round to allow for fast feed + // spinup, then after that every 10 or 100 rounds. + updateChannelDefinitions = make(llotypes.ChannelDefinitions) + expectedChannelIDs := maps.Keys(expectedChannelDefs) + // Sort so we cut off deterministically + sortChannelIDs(expectedChannelIDs) + for _, channelID := range expectedChannelIDs { + prev, exists := previousOutcome.ChannelDefinitions[channelID] + channelDefinition := expectedChannelDefs[channelID] + if exists && prev.Equals(channelDefinition) { + continue + } + // Add or replace channel + updateChannelDefinitions[channelID] = channelDefinition + if len(updateChannelDefinitions) >= MaxObservationUpdateChannelDefinitionsLength { + // Never add more than MaxObservationUpdateChannelDefinitionsLength + break + } + } + + if len(updateChannelDefinitions) > 0 { + p.Logger.Debugw("Voting to update channel definitions", + "updateChannelDefinitions", updateChannelDefinitions, + "seqNr", outctx.SeqNr, + "stage", "Observation") + } + if len(removeChannelIDs) > 0 { + p.Logger.Debugw("Voting to remove channel definitions", + "removeChannelIDs", removeChannelIDs, + "seqNr", outctx.SeqNr, + "stage", "Observation", + ) + } + } + + var streamValues StreamValues + if len(previousOutcome.ChannelDefinitions) == 0 { + p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "stage", "Observation", "seqNr", outctx.SeqNr) + } else { + streamValues = make(StreamValues) + for _, channelDefinition := range previousOutcome.ChannelDefinitions { + for _, strm := range channelDefinition.Streams { + streamValues[strm.StreamID] = nil + } + } + + if err := p.DataSource.Observe(ctx, streamValues, dsOpts{p.Config.VerboseLogging, outctx.SeqNr}); err != nil { + return nil, fmt.Errorf("DataSource.Observe error: %w", err) + } + } + + var rawObservation []byte + { + var err error + rawObservation, err = p.ObservationCodec.Encode(Observation{ + attestedRetirementReport, + shouldRetire, + nowNanoseconds, + removeChannelIDs, + updateChannelDefinitions, + streamValues, + }) + if err != nil { + return nil, fmt.Errorf("Observation encode error: %w", err) + } + } + + return rawObservation, nil +} + +type Observation struct { + // Attested (i.e. signed by f+1 oracles) retirement report from predecessor + // protocol instance + AttestedPredecessorRetirement []byte + // Should this protocol instance be retired? + ShouldRetire bool + // Timestamp from when observation is made + // Note that this is the timestamp immediately before we initiate any + // observations + UnixTimestampNanoseconds int64 + // Votes to remove/add channels. Subject to MAX_OBSERVATION_*_LENGTH limits + RemoveChannelIDs map[llotypes.ChannelID]struct{} + // Votes to add or replace channel definitions + UpdateChannelDefinitions llotypes.ChannelDefinitions + // Observed (numeric) stream values. Subject to + // MaxObservationStreamValuesLength limit + StreamValues StreamValues +} + +// deterministic sort of channel IDs +func sortChannelIDs(cids []llotypes.ChannelID) { + sort.Slice(cids, func(i, j int) bool { + return cids[i] < cids[j] + }) +} diff --git a/llo/plugin_observation_test.go b/llo/plugin_observation_test.go new file mode 100644 index 0000000..cc29184 --- /dev/null +++ b/llo/plugin_observation_test.go @@ -0,0 +1,352 @@ +package llo + +import ( + "context" + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +func Test_Observation(t *testing.T) { + smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + }, + 2: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + } + cdc := &mockChannelDefinitionCache{definitions: smallDefinitions} + + ds := &mockDataSource{ + s: map[llotypes.StreamID]StreamValue{ + 1: ToDecimal(decimal.NewFromInt(1000)), + 3: ToDecimal(decimal.NewFromInt(3000)), + 4: ToDecimal(decimal.NewFromInt(4000)), + }, + err: nil, + } + + p := &Plugin{ + Config: Config{true}, + OutcomeCodec: protoOutcomeCodec{}, + ShouldRetireCache: &mockShouldRetireCache{}, + ChannelDefinitionCache: cdc, + Logger: logger.Test(t), + ObservationCodec: protoObservationCodec{}, + DataSource: ds, + } + var query types.Query // query is always empty for LLO + + t.Run("seqNr=0 always errors", func(t *testing.T) { + outctx := ocr3types.OutcomeContext{} + _, err := p.Observation(context.Background(), outctx, query) + assert.EqualError(t, err, "got invalid seqnr=0, must be >=1") + }) + + t.Run("seqNr=1 always returns empty observation", func(t *testing.T) { + outctx := ocr3types.OutcomeContext{SeqNr: 1} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + require.Len(t, obs, 0) + }) + + t.Run("observes timestamp and channel definitions on seqNr=2", func(t *testing.T) { + testStartTS := time.Now() + + outctx := ocr3types.OutcomeContext{SeqNr: 2} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.Len(t, decoded.StreamValues, 0) + assert.Equal(t, cdc.definitions, decoded.UpdateChannelDefinitions) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + }) + + t.Run("observes streams on seqNr=2", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + assert.Len(t, decoded.UpdateChannelDefinitions, 0) + assert.Len(t, decoded.RemoveChannelIDs, 0) + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + mediumDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + }, + 3: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + 4: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + 5: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + 6: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + } + + cdc.definitions = mediumDefinitions + + t.Run("votes to increase channel amount by a small amount, and remove one", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: smallDefinitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + assert.Len(t, decoded.UpdateChannelDefinitions, 4) + assert.ElementsMatch(t, []uint32{3, 4, 5, 6}, maps.Keys(decoded.UpdateChannelDefinitions)) + expected := make(llotypes.ChannelDefinitions) + for k, v := range mediumDefinitions { + if k > 2 { // 2 was removed and 1 already present + expected[k] = v + } + } + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + assert.Len(t, decoded.RemoveChannelIDs, 1) + assert.Equal(t, map[uint32]struct{}{2: {}}, decoded.RemoveChannelIDs) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + largeSize := 100 + require.Greater(t, largeSize, MaxObservationUpdateChannelDefinitionsLength) + largeDefinitions := make(map[llotypes.ChannelID]llotypes.ChannelDefinition, largeSize) + for i := 0; i < largeSize; i++ { + largeDefinitions[llotypes.ChannelID(i)] = llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}}, + } + } + cdc.definitions = largeDefinitions + + t.Run("votes to add channels when channel definitions increases by a large amount, and replace some existing channels with different definitions", func(t *testing.T) { + t.Run("first round of additions", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: smallDefinitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + // Even though we have a large amount of channel definitions, we should + // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time + assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) + expected := make(llotypes.ChannelDefinitions) + for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { + expected[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] + } + + // 1 and 2 are actually replaced since definition is different from the one in smallDefinitions + assert.ElementsMatch(t, []uint32{0, 1, 2, 3, 4}, maps.Keys(decoded.UpdateChannelDefinitions)) + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + // Nothing removed + assert.Len(t, decoded.RemoveChannelIDs, 0) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + + t.Run("second round of additions", func(t *testing.T) { + testStartTS := time.Now() + offset := MaxObservationUpdateChannelDefinitionsLength * 2 + + subsetDfns := make(llotypes.ChannelDefinitions) + for i := 0; i < offset; i++ { + subsetDfns[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] + } + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: subsetDfns, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + + // Even though we have a large amount of channel definitions, we should + // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time + assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) + expected := make(llotypes.ChannelDefinitions) + expectedChannelIDs := []uint32{} + for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { + expectedChannelIDs = append(expectedChannelIDs, uint32(i+offset)) + expected[llotypes.ChannelID(i+offset)] = largeDefinitions[llotypes.ChannelID(i+offset)] + } + assert.Equal(t, expected, decoded.UpdateChannelDefinitions) + + assert.ElementsMatch(t, expectedChannelIDs, maps.Keys(decoded.UpdateChannelDefinitions)) + + // Nothing removed + assert.Len(t, decoded.RemoveChannelIDs, 0) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + }) + + cdc.definitions = smallDefinitions + + // TODO: huge (greater than max allowed) + + t.Run("votes to remove channel IDs", func(t *testing.T) { + t.Run("first round of removals", func(t *testing.T) { + testStartTS := time.Now() + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: largeDefinitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + // will have two items here to account for the change of 1 and 2 in smallDefinitions + assert.Len(t, decoded.UpdateChannelDefinitions, 2) + + // Even though we have a large amount of channel definitions, we should + // only remove MaxObservationRemoveChannelIDsLength at a time + assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) + assert.ElementsMatch(t, []uint32{0, 3, 4, 5, 6}, maps.Keys(decoded.RemoveChannelIDs)) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + t.Run("second round of removals", func(t *testing.T) { + testStartTS := time.Now() + offset := MaxObservationUpdateChannelDefinitionsLength * 2 + + subsetDfns := maps.Clone(largeDefinitions) + for i := 0; i < offset; i++ { + delete(subsetDfns, llotypes.ChannelID(i)) + } + + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: subsetDfns, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + + outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} + obs, err := p.Observation(context.Background(), outctx, query) + require.NoError(t, err) + decoded, err := p.ObservationCodec.Decode(obs) + require.NoError(t, err) + + assert.Len(t, decoded.AttestedPredecessorRetirement, 0) + assert.False(t, decoded.ShouldRetire) + // will have two items here to account for the change of 1 and 2 in smallDefinitions + assert.Len(t, decoded.UpdateChannelDefinitions, 2) + + // Even though we have a large amount of channel definitions, we should + // only remove MaxObservationRemoveChannelIDsLength at a time + assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) + assert.ElementsMatch(t, []uint32{10, 11, 12, 13, 14}, maps.Keys(decoded.RemoveChannelIDs)) + + assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) + assert.Equal(t, ds.s, decoded.StreamValues) + }) + }) +} diff --git a/llo/plugin_outcome.go b/llo/plugin_outcome.go new file mode 100644 index 0000000..280099b --- /dev/null +++ b/llo/plugin_outcome.go @@ -0,0 +1,441 @@ +package llo + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "sort" + "time" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { + if len(aos) < 2*p.F+1 { + return nil, fmt.Errorf("invariant violation: expected at least 2f+1 attributed observations, got %d (f: %d)", len(aos), p.F) + } + + // Initial outcome is kind of a "keystone" with minimum extra information + if outctx.SeqNr <= 1 { + // Initial Outcome + var lifeCycleStage llotypes.LifeCycleStage + if p.PredecessorConfigDigest == nil { + // Start straight in production if we have no predecessor + lifeCycleStage = LifeCycleStageProduction + } else { + lifeCycleStage = LifeCycleStageStaging + } + outcome := Outcome{ + lifeCycleStage, + 0, + nil, + nil, + nil, + } + return p.OutcomeCodec.Encode(outcome) + } + + ///////////////////////////////// + // Decode previousOutcome + ///////////////////////////////// + previousOutcome, err := p.OutcomeCodec.Decode(outctx.PreviousOutcome) + if err != nil { + return nil, fmt.Errorf("error decoding previous outcome: %v", err) + } + + ///////////////////////////////// + // Decode observations + ///////////////////////////////// + + // a single valid retirement report is enough + var validPredecessorRetirementReport *RetirementReport + + shouldRetireVotes := 0 + + timestampsNanoseconds := []int64{} + + removeChannelVotesByID := map[llotypes.ChannelID]int{} + + // for each channelId count number of votes that mention it and count number of votes that include it. + updateChannelVotesByHash := map[ChannelHash]int{} + updateChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} + + streamObservations := make(map[llotypes.StreamID][]StreamValue) + + for _, ao := range aos { + // TODO: Put in a function + // MERC-3524 + observation, err2 := p.ObservationCodec.Decode(ao.Observation) + if err2 != nil { + p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err2) + continue + } + + if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { + pcd := *p.PredecessorConfigDigest + retirementReport, err3 := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) + if err3 != nil { + p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err3, "predecessorConfigDigest", pcd) + continue + } + validPredecessorRetirementReport = &retirementReport + } + + if observation.ShouldRetire { + shouldRetireVotes++ + } + + timestampsNanoseconds = append(timestampsNanoseconds, observation.UnixTimestampNanoseconds) + + for channelID := range observation.RemoveChannelIDs { + removeChannelVotesByID[channelID]++ + } + + for channelID, channelDefinition := range observation.UpdateChannelDefinitions { + defWithID := ChannelDefinitionWithID{channelDefinition, channelID} + channelHash := MakeChannelHash(defWithID) + updateChannelVotesByHash[channelHash]++ + updateChannelDefinitionsByHash[channelHash] = defWithID + } + + var missingObservations []llotypes.StreamID + for id, sv := range observation.StreamValues { + if sv != nil { // FIXME: nil checks don't work here. Test this and figure out what to do (also, are there other cases?) + streamObservations[id] = append(streamObservations[id], sv) + } else { + missingObservations = append(missingObservations, id) + } + } + if p.Config.VerboseLogging { + if len(missingObservations) > 0 { + sort.Slice(missingObservations, func(i, j int) bool { return missingObservations[i] < missingObservations[j] }) + p.Logger.Debugw("Peer was missing observations", "streamIDs", missingObservations, "oracleID", ao.Observer, "stage", "Outcome", "seqNr", outctx.SeqNr) + } + p.Logger.Debugw("Got observations from peer", "stage", "Outcome", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr) + } + } + + if len(timestampsNanoseconds) == 0 { + return nil, errors.New("no valid observations") + } + + var outcome Outcome + + ///////////////////////////////// + // outcome.LifeCycleStage + ///////////////////////////////// + if previousOutcome.LifeCycleStage == LifeCycleStageStaging && validPredecessorRetirementReport != nil { + // Promote this protocol instance to the production stage! 🚀 + + // override ValidAfterSeconds with the value from the retirement report + // so that we have no gaps in the validity time range. + outcome.ValidAfterSeconds = validPredecessorRetirementReport.ValidAfterSeconds + outcome.LifeCycleStage = LifeCycleStageProduction + } else { + outcome.LifeCycleStage = previousOutcome.LifeCycleStage + } + + if outcome.LifeCycleStage == LifeCycleStageProduction && shouldRetireVotes > p.F { + outcome.LifeCycleStage = LifeCycleStageRetired + } + + ///////////////////////////////// + // outcome.ObservationsTimestampNanoseconds + // TODO: Refactor this into an aggregate function + // MERC-3524 + sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) + outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2] + + ///////////////////////////////// + // outcome.ChannelDefinitions + ///////////////////////////////// + outcome.ChannelDefinitions = previousOutcome.ChannelDefinitions + if outcome.ChannelDefinitions == nil { + outcome.ChannelDefinitions = llotypes.ChannelDefinitions{} + } + + // if retired, stop updating channel definitions + if outcome.LifeCycleStage == LifeCycleStageRetired { + removeChannelVotesByID, updateChannelDefinitionsByHash = nil, nil + } + + var removedChannelIDs []llotypes.ChannelID + for channelID, voteCount := range removeChannelVotesByID { + if voteCount <= p.F { + continue + } + removedChannelIDs = append(removedChannelIDs, channelID) + delete(outcome.ChannelDefinitions, channelID) + } + + type hashWithID struct { + ChannelHash + ChannelDefinitionWithID + } + orderedHashes := make([]hashWithID, 0, len(updateChannelDefinitionsByHash)) + for channelHash, dfnWithID := range updateChannelDefinitionsByHash { + orderedHashes = append(orderedHashes, hashWithID{channelHash, dfnWithID}) + } + // Use predictable order for adding channels (id asc) so that extras that + // exceed the max are consistent across all nodes + sort.Slice(orderedHashes, func(i, j int) bool { return orderedHashes[i].ChannelID < orderedHashes[j].ChannelID }) + for _, hwid := range orderedHashes { + voteCount := updateChannelVotesByHash[hwid.ChannelHash] + if voteCount <= p.F { + continue + } + defWithID := hwid.ChannelDefinitionWithID + if original, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists { + p.Logger.Debugw("Adding channel (replacement)", + "channelID", defWithID.ChannelID, + "originalChannelDefinition", original, + "replaceChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, + "stage", "Outcome", + ) + outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition + } else if len(outcome.ChannelDefinitions) >= MaxOutcomeChannelDefinitionsLength { + p.Logger.Warnw("Adding channel FAILED. Cannot add channel, outcome already contains maximum number of channels", + "maxOutcomeChannelDefinitionsLength", MaxOutcomeChannelDefinitionsLength, + "addChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, + "stage", "Outcome", + ) + // continue, don't break here because remaining channels might be a + // replacement rather than an addition, and this is still ok + continue + } + p.Logger.Debugw("Adding channel (new)", + "channelID", defWithID.ChannelID, + "addChannelDefinition", defWithID, + "seqNr", outctx.SeqNr, + "stage", "Outcome", + ) + outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition + } + + ///////////////////////////////// + // outcome.ValidAfterSeconds + ///////////////////////////////// + + // ValidAfterSeconds can be non-nil here if earlier code already + // populated ValidAfterSeconds during promotion to production. In this + // case, nothing to do. + if outcome.ValidAfterSeconds == nil { + previousObservationsTimestampSeconds, err2 := previousOutcome.ObservationsTimestampSeconds() + if err2 != nil { + return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err2) + } + + outcome.ValidAfterSeconds = map[llotypes.ChannelID]uint32{} + for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds { + if err3 := previousOutcome.IsReportable(channelID); err3 != nil { + if p.Config.VerboseLogging { + p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "stage", "Outcome", "seqNr", outctx.SeqNr) + } + // was reported based on previous outcome + outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds + } else { + // was skipped based on previous outcome + outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds + } + } + } + + observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() + if err != nil { + return nil, fmt.Errorf("error getting outcome's observations timestamp: %w", err) + } + + for channelID := range outcome.ChannelDefinitions { + if _, ok := outcome.ValidAfterSeconds[channelID]; !ok { + // new channel, set validAfterSeconds to observations timestamp + outcome.ValidAfterSeconds[channelID] = observationsTimestampSeconds + } + } + + // One might think that we should simply delete any channel from + // ValidAfterSeconds that is not mentioned in the ChannelDefinitions. This + // could, however, lead to gaps being created if this protocol instance is + // promoted from staging to production while we're still "ramping up" the + // full set of channels. We do the "safe" thing (i.e. minimizing occurrence + // of gaps) here and only remove channels if there has been an explicit vote + // to remove them. + for _, channelID := range removedChannelIDs { + delete(outcome.ValidAfterSeconds, channelID) + } + + ///////////////////////////////// + // outcome.StreamAggregates + ///////////////////////////////// + outcome.StreamAggregates = make(map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue, len(streamObservations)) + // Aggregation methods are defined on a per-channel basis, but we only want + // to do the minimum necessary number of aggregations (one per stream/aggregator + // pair) and re-use the same result, in case multiple channels share the + // same stream/aggregator pair. + for cid, cd := range outcome.ChannelDefinitions { + for _, strm := range cd.Streams { + sid, agg := strm.StreamID, strm.Aggregator + if _, exists := outcome.StreamAggregates[sid][agg]; exists { + // Should only happen in the unexpected case of duplicate + // streams, no need to aggregate twice + continue + } + aggF := GetAggregatorFunc(agg) + if aggF == nil { + return nil, fmt.Errorf("no aggregator function defined for aggregator of type %v", agg) + } + m, exists := outcome.StreamAggregates[sid] + if !exists { + m = make(map[llotypes.Aggregator]StreamValue) + outcome.StreamAggregates[sid] = m + } + result, err := aggF(streamObservations[sid], p.F) + if err != nil { + if p.Config.VerboseLogging { + p.Logger.Warnw("Aggregation failed", "aggregator", agg, "channelID", cid, "f", p.F, "streamID", sid, "observations", streamObservations[sid], "stage", "Outcome", "seqNr", outctx.SeqNr, "err", err) + } + // FIXME: Is this a complete failure? + // MERC-3524 + continue + } + m[agg] = result + } + } + + if p.Config.VerboseLogging { + p.Logger.Debugw("Generated outcome", "outcome", outcome, "stage", "Outcome", "seqNr", outctx.SeqNr) + } + return p.OutcomeCodec.Encode(outcome) +} + +type Outcome struct { + // LifeCycleStage the protocol is in + LifeCycleStage llotypes.LifeCycleStage + // ObservationsTimestampNanoseconds is the median timestamp from the + // latest set of observations + ObservationsTimestampNanoseconds int64 + // ChannelDefinitions defines the set & structure of channels for which we + // generate reports + ChannelDefinitions llotypes.ChannelDefinitions + // Latest ValidAfterSeconds value for each channel, reports for each channel + // span from ValidAfterSeconds to ObservationTimestampSeconds + ValidAfterSeconds map[llotypes.ChannelID]uint32 + // StreamAggregates contains stream IDs mapped to various aggregations. + // Usually you will only have one aggregation type per stream but since + // channels can define different aggregation methods, sometimes we will + // need multiple. + StreamAggregates StreamAggregates +} + +// The Outcome's ObservationsTimestamp rounded down to seconds precision +func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { + result := time.Unix(0, out.ObservationsTimestampNanoseconds).Unix() + if int64(uint32(result)) != result { + return 0, fmt.Errorf("timestamp doesn't fit into uint32: %v", result) + } + return uint32(result), nil +} + +// Indicates whether a report can be generated for the given channel. +// Returns nil if channel is reportable +// NOTE: A channel is still reportable even if missing some or all stream +// values. The report codec is expected to handle nils and act accordingly +// (e.g. some values may be optional). +// TODO: Test this function +func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableChannel { + if out.LifeCycleStage == LifeCycleStageRetired { + return &ErrUnreportableChannel{nil, "IsReportable=false; retired channel", channelID} + } + + observationsTimestampSeconds, err := out.ObservationsTimestampSeconds() + if err != nil { + return &ErrUnreportableChannel{err, "IsReportable=false; invalid observations timestamp", channelID} + } + + _, exists := out.ChannelDefinitions[channelID] + if !exists { + return &ErrUnreportableChannel{nil, "IsReportable=false; no channel definition with this ID", channelID} + } + + if _, ok := out.ValidAfterSeconds[channelID]; !ok { + // No validAfterSeconds entry yet, this must be a new channel. + // validAfterSeconds will be populated in Outcome() so the channel + // becomes reportable in later protocol rounds. + // TODO: Test this case, haven't seen it in prod logs even though it would be expected + return &ErrUnreportableChannel{nil, "IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", channelID} + } + + if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds { + return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds), channelID} + } + + return nil +} + +// List of reportable channels (according to IsReportable), sorted according +// to a canonical ordering +// TODO: test this +func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unreportable []*ErrUnreportableChannel) { + for channelID := range out.ChannelDefinitions { + if err := out.IsReportable(channelID); err != nil { + unreportable = append(unreportable, err) + } else { + reportable = append(reportable, channelID) + } + } + + sort.Slice(reportable, func(i, j int) bool { + return reportable[i] < reportable[j] + }) + + return +} + +type ErrUnreportableChannel struct { + Inner error `json:",omitempty"` + Reason string + ChannelID llotypes.ChannelID +} + +func (e *ErrUnreportableChannel) Error() string { + s := fmt.Sprintf("ChannelID: %d; Reason: %s", e.ChannelID, e.Reason) + if e.Inner != nil { + s += fmt.Sprintf("; Err: %v", e.Inner) + } + return s +} + +func (e *ErrUnreportableChannel) String() string { + return e.Error() +} + +func (e *ErrUnreportableChannel) Unwrap() error { + return e.Inner +} + +// MakeChannelHash is used for mapping ChannelDefinitionWithIDs +func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { + h := sha256.New() + merr := errors.Join( + binary.Write(h, binary.BigEndian, cd.ChannelID), + binary.Write(h, binary.BigEndian, cd.ReportFormat), + binary.Write(h, binary.BigEndian, uint32(len(cd.Streams))), + ) + for _, strm := range cd.Streams { + merr = errors.Join(merr, binary.Write(h, binary.BigEndian, strm.StreamID)) + merr = errors.Join(merr, binary.Write(h, binary.BigEndian, strm.Aggregator)) + } + if merr != nil { + // This should never happen + panic(merr) + } + h.Write(cd.Opts) + var result [32]byte + h.Sum(result[:0]) + return result +} diff --git a/llo/plugin_outcome_test.go b/llo/plugin_outcome_test.go new file mode 100644 index 0000000..609fc05 --- /dev/null +++ b/llo/plugin_outcome_test.go @@ -0,0 +1,324 @@ +package llo + +import ( + "fmt" + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +func Test_Outcome(t *testing.T) { + p := &Plugin{ + Config: Config{true}, + OutcomeCodec: protoOutcomeCodec{}, + Logger: logger.Test(t), + ObservationCodec: protoObservationCodec{}, + } + + t.Run("if number of observers < 2f+1, errors", func(t *testing.T) { + _, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{}) + assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)") + p.F = 1 + _, err = p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{{}, {}}) + assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 2 (f: 1)") + }) + + t.Run("if seqnr == 1, and has enough observers, emits initial outcome with 'production' LifeCycleStage", func(t *testing.T) { + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{ + { + Observation: []byte{}, + Observer: commontypes.OracleID(0), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(1), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(2), + }, + { + Observation: []byte{}, + Observer: commontypes.OracleID(3), + }, + }) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, Outcome{ + LifeCycleStage: "production", + }, decoded) + }) + + t.Run("channel definitions", func(t *testing.T) { + t.Run("adds a new channel definition if there are enough votes", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + } + obs, err := p.ObservationCodec.Encode(Observation{ + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: newCd, + }, + }) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: obs, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) + }) + + t.Run("replaces an existing channel definition if there are enough votes", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorQuote}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + } + obs, err := p.ObservationCodec.Encode(Observation{ + UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: newCd, + }, + }) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: obs, + Observer: commontypes.OracleID(i), + }) + } + + previousOutcome, err := p.OutcomeCodec.Encode(Outcome{ + ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 42: { + ReportFormat: llotypes.ReportFormat(1), + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, + }, + }, + }) + require.NoError(t, err) + + outcome, err := p.Outcome(ocr3types.OutcomeContext{PreviousOutcome: previousOutcome, SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) + }) + + t.Run("does not add channels beyond MaxOutcomeChannelDefinitionsLength", func(t *testing.T) { + newCd := llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(2), + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + } + obs := Observation{UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{}} + for i := 0; i < MaxOutcomeChannelDefinitionsLength+10; i++ { + obs.UpdateChannelDefinitions[llotypes.ChannelID(i)] = newCd + } + encoded, err := p.ObservationCodec.Encode(obs) + require.NoError(t, err) + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + assert.Len(t, decoded.ChannelDefinitions, MaxOutcomeChannelDefinitionsLength) + + // should contain channels 0 thru 999 + assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(0)) + assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength-1)) + assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength)) + assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength+1)) + }) + }) + + t.Run("stream observations", func(t *testing.T) { + testStartTS := time.Now() + smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, + }, + 2: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, + }, + } + cdc := &mockChannelDefinitionCache{definitions: smallDefinitions} + + t.Run("aggregates values when all stream values are present from all observers", func(t *testing.T) { + previousOutcome := Outcome{ + LifeCycleStage: llotypes.LifeCycleStage("test"), + ObservationsTimestampNanoseconds: testStartTS.UnixNano(), + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: nil, + StreamAggregates: nil, + } + encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) + require.NoError(t, err) + outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} + aos := []types.AttributedObservation{} + for i := 0; i < 4; i++ { + obs := Observation{ + UnixTimestampNanoseconds: testStartTS.UnixNano() + int64(time.Second) + int64(i*100)*int64(time.Millisecond), + StreamValues: map[llotypes.StreamID]StreamValue{ + 1: ToDecimal(decimal.NewFromInt(int64(100 + i*10))), + 2: ToDecimal(decimal.NewFromInt(int64(200 + i*10))), + 3: &Quote{Bid: decimal.NewFromInt(int64(300 + i*10)), Benchmark: decimal.NewFromInt(int64(310 + i*10)), Ask: decimal.NewFromInt(int64(320 + i*10))}, + }} + encoded, err2 := p.ObservationCodec.Encode(obs) + require.NoError(t, err2) + aos = append(aos, + types.AttributedObservation{ + Observation: encoded, + Observer: commontypes.OracleID(i), + }) + } + outcome, err := p.Outcome(outctx, types.Query{}, aos) + require.NoError(t, err) + + decoded, err := p.OutcomeCodec.Decode(outcome) + require.NoError(t, err) + + observationsTs := decoded.ObservationsTimestampNanoseconds + assert.GreaterOrEqual(t, observationsTs, testStartTS.UnixNano()+1_200_000_000) + + assert.Equal(t, Outcome{ + LifeCycleStage: "test", + ObservationsTimestampNanoseconds: observationsTs, + ChannelDefinitions: cdc.definitions, + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: uint32(observationsTs / int64(time.Second)), // set to median observation timestamp + 2: uint32(observationsTs / int64(time.Second)), + }, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)), + }, + 2: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)), + }, + 3: map[llotypes.Aggregator]StreamValue{ + llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)}, + }, + }, + }, decoded) + }) + }) +} + +func Test_MakeChannelHash(t *testing.T) { + t.Run("hashes channel definitions", func(t *testing.T) { + defs := ChannelDefinitionWithID{ + ChannelID: 1, + ChannelDefinition: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormat(1), + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, + Opts: []byte(`{}`), + }, + } + hash := MakeChannelHash(defs) + // NOTE: Breaking this test by changing the hash below may break existing running instances + assert.Equal(t, "c0b72f4acb79bb8f5075f979f86016a30159266a96870b1c617b44426337162a", fmt.Sprintf("%x", hash)) + }) + + t.Run("different channelID makes different hash", func(t *testing.T) { + def1 := ChannelDefinitionWithID{ChannelID: 1} + def2 := ChannelDefinitionWithID{ChannelID: 2} + + assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) + }) + + t.Run("different report format makes different hash", func(t *testing.T) { + def1 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatJSON, + }, + } + def2 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + }, + } + + assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) + }) + + t.Run("different streamIDs makes different hash", func(t *testing.T) { + def1 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + }, + } + def2 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}}, + }, + } + + assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) + }) + + t.Run("different aggregators makes different hash", func(t *testing.T) { + def1 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, + }, + } + def2 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorQuote}}, + }, + } + + assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) + }) + + t.Run("different opts makes different hash", func(t *testing.T) { + def1 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Opts: []byte(`{"foo":"bar"}`), + }, + } + def2 := ChannelDefinitionWithID{ + ChannelDefinition: llotypes.ChannelDefinition{ + Opts: []byte(`{"foo":"baz"}`), + }, + } + + assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) + }) +} diff --git a/llo/plugin_reports.go b/llo/plugin_reports.go new file mode 100644 index 0000000..5b99b7a --- /dev/null +++ b/llo/plugin_reports.go @@ -0,0 +1,122 @@ +package llo + +import ( + "encoding/json" + "fmt" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +func (p *Plugin) reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[llotypes.ReportInfo], error) { + if seqNr <= 1 { + // no reports for initial round + return nil, nil + } + + outcome, err := p.OutcomeCodec.Decode(rawOutcome) + if err != nil { + fmt.Println("TRASH 1", err) + return nil, fmt.Errorf("error unmarshalling outcome: %w", err) + } + fmt.Println("TRASH 2", outcome) + + observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() + if err != nil { + return nil, fmt.Errorf("error getting observations timestamp: %w", err) + } + + rwis := []ocr3types.ReportWithInfo[llotypes.ReportInfo]{} + + if outcome.LifeCycleStage == LifeCycleStageRetired { + // if we're retired, emit special retirement report to transfer + // ValidAfterSeconds part of state to the new protocol instance for a + // "gapless" handover + retirementReport := RetirementReport{ + outcome.ValidAfterSeconds, + } + + rwis = append(rwis, ocr3types.ReportWithInfo[llotypes.ReportInfo]{ + // TODO: Needs retirement report codec + Report: must(json.Marshal(retirementReport)), + Info: llotypes.ReportInfo{ + LifeCycleStage: outcome.LifeCycleStage, + ReportFormat: llotypes.ReportFormatJSON, + }, + }) + } + + reportableChannels, unreportableChannels := outcome.ReportableChannels() + if p.Config.VerboseLogging { + p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "stage", "Report", "seqNr", seqNr) + } + + for _, cid := range reportableChannels { + cd := outcome.ChannelDefinitions[cid] + values := make([]StreamValue, 0, len(cd.Streams)) + for _, strm := range cd.Streams { + values = append(values, outcome.StreamAggregates[strm.StreamID][strm.Aggregator]) + } + + report := Report{ + p.ConfigDigest, + seqNr, + cid, + outcome.ValidAfterSeconds[cid], + observationsTimestampSeconds, + values, + outcome.LifeCycleStage != LifeCycleStageProduction, + } + + encoded, err := p.encodeReport(report, cd) + if err != nil { + p.Logger.Warnw("Error encoding report", "reportFormat", cd.ReportFormat, "err", err, "channelID", cid, "stage", "Report", "seqNr", seqNr) + continue + } + rwis = append(rwis, ocr3types.ReportWithInfo[llotypes.ReportInfo]{ + Report: encoded, + Info: llotypes.ReportInfo{ + LifeCycleStage: outcome.LifeCycleStage, + ReportFormat: cd.ReportFormat, + }, + }) + } + + if p.Config.VerboseLogging && len(rwis) == 0 { + p.Logger.Debugw("No reports, will not transmit anything", "reportableChannels", reportableChannels, "stage", "Report", "seqNr", seqNr) + } + + return rwis, nil +} + +func (p *Plugin) encodeReport(r Report, cd llotypes.ChannelDefinition) (types.Report, error) { + codec, exists := p.Codecs[cd.ReportFormat] + if !exists { + return nil, fmt.Errorf("codec missing for ReportFormat=%q", cd.ReportFormat) + } + fmt.Printf("TRASH report: %v\n", r) + fmt.Printf("TRASH report: %#v\n", r) + return codec.Encode(r, cd) +} + +type Report struct { + ConfigDigest types.ConfigDigest + // OCR sequence number of this report + SeqNr uint64 + // Channel that is being reported on + ChannelID llotypes.ChannelID + // Report is only valid at t > ValidAfterSeconds + ValidAfterSeconds uint32 + // ObservationTimestampSeconds is the median of all observation timestamps + // (note that this timestamp is taken immediately before we initiate any + // observations) + ObservationTimestampSeconds uint32 + // Values for every stream in the channel + Values []StreamValue + // The contract onchain will only validate non-specimen reports. A staging + // protocol instance will generate specimen reports so we can validate it + // works properly without any risk of misreports landing on chain. + Specimen bool +} diff --git a/llo/plugin_reports_test.go b/llo/plugin_reports_test.go new file mode 100644 index 0000000..acb7f24 --- /dev/null +++ b/llo/plugin_reports_test.go @@ -0,0 +1,227 @@ +package llo + +import ( + "testing" + "time" + + "github.com/shopspring/decimal" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_Reports(t *testing.T) { + p := &Plugin{ + Config: Config{true}, + OutcomeCodec: protoOutcomeCodec{}, + Logger: logger.Test(t), + Codecs: map[llotypes.ReportFormat]ReportCodec{ + llotypes.ReportFormatJSON: JSONReportCodec{}, + }, + } + + t.Run("ignores seqnr=0", func(t *testing.T) { + rwi, err := p.Reports(0, ocr3types.Outcome{}) + assert.NoError(t, err) + assert.Nil(t, rwi) + }) + + t.Run("does not return reports for initial round", func(t *testing.T) { + rwi, err := p.Reports(1, ocr3types.Outcome{}) + assert.NoError(t, err) + assert.Nil(t, rwi) + }) + + t.Run("returns error if unmarshalling outcome fails", func(t *testing.T) { + rwi, err := p.Reports(2, []byte("invalid")) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to decode outcome: expected protobuf") + assert.Nil(t, rwi) + }) + + t.Run("emits 'retirement report' if lifecycle state is retired", func(t *testing.T) { + t.Run("with null ValidAfterSeconds", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: LifeCycleStageRetired, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 1) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: LifeCycleStageRetired, ReportFormat: llotypes.ReportFormatJSON}, rwis[0].Info) + assert.Equal(t, "{\"ValidAfterSeconds\":null}", string(rwis[0].Report)) + }) + }) + + smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, + }, + 2: { + ReportFormat: llotypes.ReportFormatJSON, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorQuote}}, + }, + } + + t.Run("does not report if observations are not valid yet", func(t *testing.T) { + outcome := Outcome{ + ObservationsTimestampNanoseconds: 0, + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: 0, + }, + ChannelDefinitions: smallDefinitions, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 0) + }) + + t.Run("does not produce report if an aggregate is missing", func(t *testing.T) { + outcome := Outcome{ + ObservationsTimestampNanoseconds: int64(200 * time.Second), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 2: 100, + }, + ChannelDefinitions: smallDefinitions, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 0) + }) + + t.Run("skips reports if codec is missing", func(t *testing.T) { + dfns := map[llotypes.ChannelID]llotypes.ChannelDefinition{ + 1: { + ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, + Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}}, + }, + } + + outcome := Outcome{ + ObservationsTimestampNanoseconds: int64(200 * time.Second), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 2: 100, + }, + ChannelDefinitions: dfns, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 0) + }) + + t.Run("generates specimen report for non-production LifeCycleStage", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: LifeCycleStageStaging, + ObservationsTimestampNanoseconds: int64(200 * time.Second), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: 100, + 2: 100, + }, + ChannelDefinitions: smallDefinitions, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + 4: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(6.6), Benchmark: decimal.NewFromFloat(7.7), Bid: decimal.NewFromFloat(8.8)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 2) + assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":1,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 5.5, Benchmark: 4.4, Ask: 3.3}"}],"Specimen":true}`, string(rwis[0].Report)) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: "staging", ReportFormat: llotypes.ReportFormatJSON}, rwis[0].Info) + assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":2,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 8.8, Benchmark: 7.7, Ask: 6.6}"}],"Specimen":true}`, string(rwis[1].Report)) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: "staging", ReportFormat: llotypes.ReportFormatJSON}, rwis[1].Info) + }) + + t.Run("generates non-specimen reports for production", func(t *testing.T) { + outcome := Outcome{ + LifeCycleStage: LifeCycleStageProduction, + ObservationsTimestampNanoseconds: int64(200 * time.Second), + ValidAfterSeconds: map[llotypes.ChannelID]uint32{ + 1: 100, + 2: 100, + }, + ChannelDefinitions: smallDefinitions, + StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{ + 1: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)), + }, + 2: { + llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)), + }, + 3: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)}, + }, + 4: { + llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(6.6), Benchmark: decimal.NewFromFloat(7.7), Bid: decimal.NewFromFloat(8.8)}, + }, + }, + } + encoded, err := p.OutcomeCodec.Encode(outcome) + require.NoError(t, err) + rwis, err := p.Reports(2, encoded) + require.NoError(t, err) + require.Len(t, rwis, 2) + assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":1,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 5.5, Benchmark: 4.4, Ask: 3.3}"}],"Specimen":false}`, string(rwis[0].Report)) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: "production", ReportFormat: llotypes.ReportFormatJSON}, rwis[0].Info) + assert.Equal(t, `{"ConfigDigest":"0000000000000000000000000000000000000000000000000000000000000000","SeqNr":2,"ChannelID":2,"ValidAfterSeconds":100,"ObservationTimestampSeconds":200,"Values":[{"Type":0,"Value":"1.1"},{"Type":0,"Value":"2.2"},{"Type":1,"Value":"Q{Bid: 8.8, Benchmark: 7.7, Ask: 6.6}"}],"Specimen":false}`, string(rwis[1].Report)) + assert.Equal(t, llo.ReportInfo{LifeCycleStage: "production", ReportFormat: llotypes.ReportFormatJSON}, rwis[1].Info) + }) +} diff --git a/llo/plugin_test.go b/llo/plugin_test.go index 6de7344..ed5415a 100644 --- a/llo/plugin_test.go +++ b/llo/plugin_test.go @@ -2,21 +2,14 @@ package llo import ( "context" - "fmt" "testing" - "time" - "github.com/shopspring/decimal" - "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/offchainreporting2/types" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "golang.org/x/exp/maps" - "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type mockShouldRetireCache struct { @@ -48,340 +41,6 @@ func (m *mockDataSource) Observe(ctx context.Context, streamValues StreamValues, return m.err } -func Test_Observation(t *testing.T) { - smallDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 1: { - ReportFormat: llotypes.ReportFormatJSON, - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - }, - 2: { - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - } - cdc := &mockChannelDefinitionCache{definitions: smallDefinitions} - - ds := &mockDataSource{ - s: map[llotypes.StreamID]StreamValue{ - 1: ToDecimal(decimal.NewFromInt(1000)), - 3: ToDecimal(decimal.NewFromInt(3000)), - 4: ToDecimal(decimal.NewFromInt(4000)), - }, - err: nil, - } - - p := &Plugin{ - Config: Config{true}, - OutcomeCodec: protoOutcomeCodec{}, - ShouldRetireCache: &mockShouldRetireCache{}, - ChannelDefinitionCache: cdc, - Logger: logger.Test(t), - ObservationCodec: protoObservationCodec{}, - DataSource: ds, - } - var query types.Query // query is always empty for LLO - - t.Run("seqNr=0 always errors", func(t *testing.T) { - outctx := ocr3types.OutcomeContext{} - _, err := p.Observation(context.Background(), outctx, query) - assert.EqualError(t, err, "got invalid seqnr=0, must be >=1") - }) - - t.Run("seqNr=1 always returns empty observation", func(t *testing.T) { - outctx := ocr3types.OutcomeContext{SeqNr: 1} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - require.Len(t, obs, 0) - }) - - t.Run("observes timestamp and channel definitions on seqNr=2", func(t *testing.T) { - testStartTS := time.Now() - - outctx := ocr3types.OutcomeContext{SeqNr: 2} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - assert.Len(t, decoded.RemoveChannelIDs, 0) - assert.Len(t, decoded.StreamValues, 0) - assert.Equal(t, cdc.definitions, decoded.UpdateChannelDefinitions) - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - }) - - t.Run("observes streams on seqNr=2", func(t *testing.T) { - testStartTS := time.Now() - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: cdc.definitions, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - assert.Len(t, decoded.UpdateChannelDefinitions, 0) - assert.Len(t, decoded.RemoveChannelIDs, 0) - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - - mediumDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 1: { - ReportFormat: llotypes.ReportFormatJSON, - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - }, - 3: { - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - 4: { - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - 5: { - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - 6: { - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - } - - cdc.definitions = mediumDefinitions - - t.Run("votes to increase channel amount by a small amount, and remove one", func(t *testing.T) { - testStartTS := time.Now() - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: smallDefinitions, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - - assert.Len(t, decoded.UpdateChannelDefinitions, 4) - assert.ElementsMatch(t, []uint32{3, 4, 5, 6}, maps.Keys(decoded.UpdateChannelDefinitions)) - expected := make(llotypes.ChannelDefinitions) - for k, v := range mediumDefinitions { - if k > 2 { // 2 was removed and 1 already present - expected[k] = v - } - } - assert.Equal(t, expected, decoded.UpdateChannelDefinitions) - - assert.Len(t, decoded.RemoveChannelIDs, 1) - assert.Equal(t, map[uint32]struct{}{2: {}}, decoded.RemoveChannelIDs) - - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - - largeSize := 100 - require.Greater(t, largeSize, MaxObservationUpdateChannelDefinitionsLength) - largeDefinitions := make(map[llotypes.ChannelID]llotypes.ChannelDefinition, largeSize) - for i := 0; i < largeSize; i++ { - largeDefinitions[llotypes.ChannelID(i)] = llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - Streams: []llotypes.Stream{{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}}, - } - } - cdc.definitions = largeDefinitions - - t.Run("votes to add channels when channel definitions increases by a large amount, and replace some existing channels with different definitions", func(t *testing.T) { - t.Run("first round of additions", func(t *testing.T) { - testStartTS := time.Now() - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: smallDefinitions, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - - // Even though we have a large amount of channel definitions, we should - // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time - assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) - expected := make(llotypes.ChannelDefinitions) - for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { - expected[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] - } - - // 1 and 2 are actually replaced since definition is different from the one in smallDefinitions - assert.ElementsMatch(t, []uint32{0, 1, 2, 3, 4}, maps.Keys(decoded.UpdateChannelDefinitions)) - assert.Equal(t, expected, decoded.UpdateChannelDefinitions) - - // Nothing removed - assert.Len(t, decoded.RemoveChannelIDs, 0) - - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - - t.Run("second round of additions", func(t *testing.T) { - testStartTS := time.Now() - offset := MaxObservationUpdateChannelDefinitionsLength * 2 - - subsetDfns := make(llotypes.ChannelDefinitions) - for i := 0; i < offset; i++ { - subsetDfns[llotypes.ChannelID(i)] = largeDefinitions[llotypes.ChannelID(i)] - } - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: subsetDfns, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - - // Even though we have a large amount of channel definitions, we should - // only add/replace MaxObservationUpdateChannelDefinitionsLength at a time - assert.Len(t, decoded.UpdateChannelDefinitions, MaxObservationUpdateChannelDefinitionsLength) - expected := make(llotypes.ChannelDefinitions) - expectedChannelIDs := []uint32{} - for i := 0; i < MaxObservationUpdateChannelDefinitionsLength; i++ { - expectedChannelIDs = append(expectedChannelIDs, uint32(i+offset)) - expected[llotypes.ChannelID(i+offset)] = largeDefinitions[llotypes.ChannelID(i+offset)] - } - assert.Equal(t, expected, decoded.UpdateChannelDefinitions) - - assert.ElementsMatch(t, expectedChannelIDs, maps.Keys(decoded.UpdateChannelDefinitions)) - - // Nothing removed - assert.Len(t, decoded.RemoveChannelIDs, 0) - - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - }) - - cdc.definitions = smallDefinitions - - // TODO: huge (greater than max allowed) - - t.Run("votes to remove channel IDs", func(t *testing.T) { - t.Run("first round of removals", func(t *testing.T) { - testStartTS := time.Now() - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: largeDefinitions, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - // will have two items here to account for the change of 1 and 2 in smallDefinitions - assert.Len(t, decoded.UpdateChannelDefinitions, 2) - - // Even though we have a large amount of channel definitions, we should - // only remove MaxObservationRemoveChannelIDsLength at a time - assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) - assert.ElementsMatch(t, []uint32{0, 3, 4, 5, 6}, maps.Keys(decoded.RemoveChannelIDs)) - - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - t.Run("second round of removals", func(t *testing.T) { - testStartTS := time.Now() - offset := MaxObservationUpdateChannelDefinitionsLength * 2 - - subsetDfns := maps.Clone(largeDefinitions) - for i := 0; i < offset; i++ { - delete(subsetDfns, llotypes.ChannelID(i)) - } - - previousOutcome := Outcome{ - LifeCycleStage: llotypes.LifeCycleStage("test"), - ObservationsTimestampNanoseconds: testStartTS.UnixNano(), - ChannelDefinitions: subsetDfns, - ValidAfterSeconds: nil, - StreamAggregates: nil, - } - encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome) - require.NoError(t, err) - - outctx := ocr3types.OutcomeContext{SeqNr: 3, PreviousOutcome: encodedPreviousOutcome} - obs, err := p.Observation(context.Background(), outctx, query) - require.NoError(t, err) - decoded, err := p.ObservationCodec.Decode(obs) - require.NoError(t, err) - - assert.Len(t, decoded.AttestedPredecessorRetirement, 0) - assert.False(t, decoded.ShouldRetire) - // will have two items here to account for the change of 1 and 2 in smallDefinitions - assert.Len(t, decoded.UpdateChannelDefinitions, 2) - - // Even though we have a large amount of channel definitions, we should - // only remove MaxObservationRemoveChannelIDsLength at a time - assert.Len(t, decoded.RemoveChannelIDs, MaxObservationRemoveChannelIDsLength) - assert.ElementsMatch(t, []uint32{10, 11, 12, 13, 14}, maps.Keys(decoded.RemoveChannelIDs)) - - assert.GreaterOrEqual(t, decoded.UnixTimestampNanoseconds, testStartTS.UnixNano()) - assert.Equal(t, ds.s, decoded.StreamValues) - }) - }) -} - func Test_ValidateObservation(t *testing.T) { p := &Plugin{ Config: Config{true}, @@ -396,235 +55,3 @@ func Test_ValidateObservation(t *testing.T) { assert.EqualError(t, err, "Expected empty observation for first round, got: 0x01") }) } - -func Test_Outcome(t *testing.T) { - // cdc := &mockChannelDefinitionCache{} - p := &Plugin{ - Config: Config{true}, - OutcomeCodec: protoOutcomeCodec{}, - // ShouldRetireCache: &mockShouldRetireCache{}, - Logger: logger.Test(t), - ObservationCodec: protoObservationCodec{}, - } - - t.Run("if number of observers < 2f+1, errors", func(t *testing.T) { - _, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{}) - assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)") - p.F = 1 - _, err = p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{{}, {}}) - assert.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 2 (f: 1)") - }) - - t.Run("if seqnr == 1, and has enough observers, emits initial outcome with 'production' LifeCycleStage", func(t *testing.T) { - outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{ - { - Observation: []byte{}, - Observer: commontypes.OracleID(0), - }, - { - Observation: []byte{}, - Observer: commontypes.OracleID(1), - }, - { - Observation: []byte{}, - Observer: commontypes.OracleID(2), - }, - { - Observation: []byte{}, - Observer: commontypes.OracleID(3), - }, - }) - require.NoError(t, err) - - decoded, err := p.OutcomeCodec.Decode(outcome) - require.NoError(t, err) - - assert.Equal(t, Outcome{ - LifeCycleStage: "production", - }, decoded) - }) - - t.Run("adds a new channel definition if there are enough votes", func(t *testing.T) { - newCd := llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(2), - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - } - obs, err := p.ObservationCodec.Encode(Observation{ - UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 42: newCd, - }, - }) - require.NoError(t, err) - aos := []types.AttributedObservation{} - for i := 0; i < 4; i++ { - aos = append(aos, - types.AttributedObservation{ - Observation: obs, - Observer: commontypes.OracleID(i), - }) - } - outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) - require.NoError(t, err) - - decoded, err := p.OutcomeCodec.Decode(outcome) - require.NoError(t, err) - - assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) - }) - - t.Run("replaces an existing channel definition if there are enough votes", func(t *testing.T) { - newCd := llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(2), - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorQuote}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - } - obs, err := p.ObservationCodec.Encode(Observation{ - UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 42: newCd, - }, - }) - require.NoError(t, err) - aos := []types.AttributedObservation{} - for i := 0; i < 4; i++ { - aos = append(aos, - types.AttributedObservation{ - Observation: obs, - Observer: commontypes.OracleID(i), - }) - } - - previousOutcome, err := p.OutcomeCodec.Encode(Outcome{ - ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{ - 42: { - ReportFormat: llotypes.ReportFormat(1), - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}, {StreamID: 4, Aggregator: llotypes.AggregatorMedian}}, - }, - }, - }) - require.NoError(t, err) - - outcome, err := p.Outcome(ocr3types.OutcomeContext{PreviousOutcome: previousOutcome, SeqNr: 2}, types.Query{}, aos) - require.NoError(t, err) - - decoded, err := p.OutcomeCodec.Decode(outcome) - require.NoError(t, err) - - assert.Equal(t, newCd, decoded.ChannelDefinitions[42]) - }) - - t.Run("does not add channels beyond MaxOutcomeChannelDefinitionsLength", func(t *testing.T) { - newCd := llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(2), - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - } - obs := Observation{UpdateChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{}} - for i := 0; i < MaxOutcomeChannelDefinitionsLength+10; i++ { - obs.UpdateChannelDefinitions[llotypes.ChannelID(i)] = newCd - } - encoded, err := p.ObservationCodec.Encode(obs) - require.NoError(t, err) - aos := []types.AttributedObservation{} - for i := 0; i < 4; i++ { - aos = append(aos, - types.AttributedObservation{ - Observation: encoded, - Observer: commontypes.OracleID(i), - }) - } - outcome, err := p.Outcome(ocr3types.OutcomeContext{SeqNr: 2}, types.Query{}, aos) - require.NoError(t, err) - - decoded, err := p.OutcomeCodec.Decode(outcome) - require.NoError(t, err) - - assert.Len(t, decoded.ChannelDefinitions, MaxOutcomeChannelDefinitionsLength) - - // should contain channels 0 thru 999 - assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(0)) - assert.Contains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength-1)) - assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength)) - assert.NotContains(t, decoded.ChannelDefinitions, llotypes.ChannelID(MaxOutcomeChannelDefinitionsLength+1)) - }) -} - -func Test_MakeChannelHash(t *testing.T) { - t.Run("hashes channel definitions", func(t *testing.T) { - defs := ChannelDefinitionWithID{ - ChannelID: 1, - ChannelDefinition: llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormat(1), - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorMedian}}, - Opts: []byte(`{}`), - }, - } - hash := MakeChannelHash(defs) - // NOTE: Breaking this test by changing the hash below may break existing running instances - assert.Equal(t, "c0b72f4acb79bb8f5075f979f86016a30159266a96870b1c617b44426337162a", fmt.Sprintf("%x", hash)) - }) - - t.Run("different channelID makes different hash", func(t *testing.T) { - def1 := ChannelDefinitionWithID{ChannelID: 1} - def2 := ChannelDefinitionWithID{ChannelID: 2} - - assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) - }) - - t.Run("different report format makes different hash", func(t *testing.T) { - def1 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormatJSON, - }, - } - def2 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - ReportFormat: llotypes.ReportFormatEVMPremiumLegacy, - }, - } - - assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) - }) - - t.Run("different streamIDs makes different hash", func(t *testing.T) { - def1 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, - }, - } - def2 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}}, - }, - } - - assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) - }) - - t.Run("different aggregators makes different hash", func(t *testing.T) { - def1 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}}, - }, - } - def2 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorQuote}}, - }, - } - - assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) - }) - - t.Run("different opts makes different hash", func(t *testing.T) { - def1 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Opts: []byte(`{"foo":"bar"}`), - }, - } - def2 := ChannelDefinitionWithID{ - ChannelDefinition: llotypes.ChannelDefinition{ - Opts: []byte(`{"foo":"baz"}`), - }, - } - - assert.NotEqual(t, MakeChannelHash(def1), MakeChannelHash(def2)) - }) -} diff --git a/llo/types.go b/llo/types.go index a1eb271..54b4e8a 100644 --- a/llo/types.go +++ b/llo/types.go @@ -13,11 +13,6 @@ type ChannelDefinitionWithID struct { type ChannelHash [32]byte -type OnchainConfigCodec interface { - Encode(OnchainConfig) ([]byte, error) - Decode([]byte) (OnchainConfig, error) -} - type Transmitter interface { // NOTE: Mercury doesn't actually transmit on-chain, so there is no // "contract" involved with the transmitter.