diff --git a/go.mod b/go.mod index facb37b..738b39c 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,7 @@ require ( github.com/smartcontractkit/chainlink-common v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 github.com/stretchr/testify v1.8.4 - github.com/test-go/testify v1.1.4 google.golang.org/protobuf v1.32.0 - gotest.tools v2.2.0+incompatible ) require ( @@ -19,7 +17,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mr-tron/base58 v1.2.0 // indirect diff --git a/go.sum b/go.sum index 1ca1af4..9777685 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,6 @@ github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 h1:21V61XO github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= -github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -74,5 +72,3 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= -gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/llo/json_codec.go b/llo/json_codec.go new file mode 100644 index 0000000..7105ee7 --- /dev/null +++ b/llo/json_codec.go @@ -0,0 +1,55 @@ +package llo + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2/types" +) + +var _ ReportCodec = JSONReportCodec{} + +// JSONReportCodec is a chain-agnostic reference implementation + +type JSONReportCodec struct{} + +func (cdc JSONReportCodec) Encode(r Report) ([]byte, error) { + return json.Marshal(r) +} + +func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) { + type decode struct { + ConfigDigest string + ChainSelector uint64 + SeqNr uint64 + ChannelID commontypes.ChannelID + ValidAfterSeconds uint32 + ValidUntilSeconds uint32 + Values []*big.Int + Specimen bool + } + d := decode{} + err = json.Unmarshal(b, &d) + cdBytes, err := hex.DecodeString(d.ConfigDigest) + if err != nil { + return r, fmt.Errorf("invalid ConfigDigest; %w", err) + } + cd, err := types.BytesToConfigDigest(cdBytes) + if err != nil { + return r, fmt.Errorf("invalid ConfigDigest; %w", err) + } + + return Report{ + ConfigDigest: cd, + ChainSelector: d.ChainSelector, + SeqNr: d.SeqNr, + ChannelID: d.ChannelID, + ValidAfterSeconds: d.ValidAfterSeconds, + ValidUntilSeconds: d.ValidUntilSeconds, + Values: d.Values, + Specimen: d.Specimen, + }, err +} diff --git a/llo/json_codec_test.go b/llo/json_codec_test.go new file mode 100644 index 0000000..72e5e29 --- /dev/null +++ b/llo/json_codec_test.go @@ -0,0 +1,39 @@ +package llo + +import ( + "math/big" + "testing" + + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_JSONCodec(t *testing.T) { + t.Run("Encode=>Decode", func(t *testing.T) { + r := Report{ + ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}), + ChainSelector: 42, + SeqNr: 43, + ChannelID: commontypes.ChannelID([32]byte{'f', 'o', 'o'}), + ValidAfterSeconds: 44, + ValidUntilSeconds: 45, + Values: []*big.Int{big.NewInt(1), big.NewInt(2)}, + Specimen: true, + } + + cdc := JSONReportCodec{} + + encoded, err := cdc.Encode(r) + require.NoError(t, err) + + assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","ChainSelector":42,"SeqNr":43,"ChannelID":"666f6f0000000000000000000000000000000000000000000000000000000000","ValidAfterSeconds":44,"ValidUntilSeconds":45,"Values":[1,2],"Specimen":true}`, string(encoded)) + + decoded, err := cdc.Decode(encoded) + require.NoError(t, err) + + assert.Equal(t, r, decoded) + }) +} diff --git a/llo/plugin.go b/llo/plugin.go index 32e7969..1ce5598 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/binary" "encoding/json" + "errors" "fmt" "math/big" "sort" @@ -213,6 +214,7 @@ var _ ocr3types.ReportingPlugin[commontypes.LLOReportInfo] = &LLOPlugin{} type ReportCodec interface { Encode(Report) ([]byte, error) + Decode([]byte) (Report, error) } type LLOPlugin struct { @@ -267,15 +269,15 @@ type Observation struct { // *not* strictly) across the lifetime of a protocol instance and that // outctx.previousOutcome contains the consensus outcome with sequence // number (outctx.SeqNr-1). +// +// Should return a serialized Observation struct. func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { - fmt.Println("TRASH Observation") // send empty observation in initial round // NOTE: First sequence number is always 1 if outctx.SeqNr < 1 { // send empty observation in initial round return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr) } else if outctx.SeqNr == 1 { - fmt.Println("TRASH initial round") return types.Observation{}, nil // FIXME: but it needs to be properly serialized } @@ -312,6 +314,7 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon var addChannelDefinitions commontypes.ChannelDefinitions { expectedChannelDefs := p.ChannelDefinitionCache.Definitions() + fmt.Println("TRASH Observation definitions", expectedChannelDefs) removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH) for channelID := range removeChannelDefinitions { @@ -328,11 +331,17 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon } addChannelDefinitions = subtractChannelDefinitions(expectedChannelDefs, previousOutcome.ChannelDefinitions, MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH) + if len(addChannelDefinitions) > 0 { + fmt.Println("TRASH Observation addChannelDefinitions", len(addChannelDefinitions)) + } } + fmt.Println("TRASH Observation previousOutcome.ChannelDefinitions", len(previousOutcome.ChannelDefinitions)) + var streamValues StreamValues { streams := map[commontypes.StreamID]struct{}{} + fmt.Println("TRASH len(previousOutcome.ChannelDefinitions)", len(previousOutcome.ChannelDefinitions)) for _, channelDefinition := range previousOutcome.ChannelDefinitions { for _, streamID := range channelDefinition.StreamIDs { streams[streamID] = struct{}{} @@ -347,6 +356,8 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon } } + fmt.Println("TRASH Observation streamValues", streamValues) + var rawObservation []byte { var err error @@ -363,8 +374,6 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon } } - fmt.Printf("TRASH rawObservation %#v\n", rawObservation) - return rawObservation, nil } @@ -446,29 +455,29 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { } // Indicates whether a report can be generated for the given channel. -// TODO: Return error indicating why it isn't reportable -func (out *Outcome) IsReportable(channelID commontypes.ChannelID) bool { +// Returns nil if channel is reportable +func (out *Outcome) IsReportable(channelID commontypes.ChannelID) error { if out.LifeCycleStage == LifeCycleStageRetired { - return false + return fmt.Errorf("IsReportable=false; retired channel with ID: %s", channelID) } observationsTimestampSeconds, err := out.ObservationsTimestampSeconds() if err != nil { - return false + return fmt.Errorf("IsReportable=false; invalid observations timestamp; %w", err) } - channelDefinition, ok := out.ChannelDefinitions[channelID] - if !ok { - return false + channelDefinition, exists := out.ChannelDefinitions[channelID] + if !exists { + return fmt.Errorf("IsReportable=false; no channel definition with ID: %s", channelID) } if _, err := chainselectors.ChainIdFromSelector(channelDefinition.ChainSelector); err != nil { - return false + return fmt.Errorf("IsReportable=false; invalid chain selector; %w", err) } for _, streamID := range channelDefinition.StreamIDs { if out.StreamMedians[streamID] == nil { - return false + return errors.New("IsReportable=false; median was nil") } } @@ -476,14 +485,14 @@ func (out *Outcome) IsReportable(channelID commontypes.ChannelID) bool { // No validAfterSeconds entry yet, this must be a new channel. // validAfterSeconds will be populated in Outcome() so the channel // becomes reportable in later protocol rounds. - return false + return errors.New("IsReportable=false; no validAfterSeconds entry yet, this must be a new channel") } - if out.ValidAfterSeconds[channelID] >= observationsTimestampSeconds { - return false + if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds { + return fmt.Errorf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds) } - return true + return nil } // List of reportable channels (according to IsReportable), sorted according @@ -492,7 +501,8 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID { result := []commontypes.ChannelID{} for channelID := range out.ChannelDefinitions { - if !out.IsReportable(channelID) { + if err := out.IsReportable(channelID); err != nil { + fmt.Println("TRASH not reportable", err) continue } result = append(result, channelID) @@ -516,6 +526,10 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID { // outctx.previousOutcome contains the consensus outcome with sequence // number (outctx.SeqNr-1). func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { + if len(aos) == 0 { + return nil, errors.New("no attributed observations") + } + if outctx.SeqNr <= 1 { // Initial Outcome var lifeCycleStage commontypes.LLOLifeCycleStage @@ -597,13 +611,21 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, addChannelDefinitionsByHash[channelHash] = defWithID } + fmt.Printf("TRASH outcome observation.StreamValues %#v\n", observation.StreamValues) for id, obsResult := range observation.StreamValues { - if obsResult.Err == nil { + if obsResult.Valid { streamObservations[id] = append(streamObservations[id], obsResult.Val) + } else { + // TODO: log something? + fmt.Println("TRASH invalid observation") } } } + if len(timestampsNanoseconds) == 0 { + return nil, errors.New("no valid observations") + } + var outcome Outcome ///////////////////////////////// @@ -626,7 +648,6 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, ///////////////////////////////// // outcome.ObservationsTimestampNanoseconds - ///////////////////////////////// sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2] @@ -653,7 +674,9 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, } for channelHash, defWithID := range addChannelDefinitionsByHash { + fmt.Println("TRASH addChannelDefinitionsByHash") voteCount := addChannelVotesByHash[channelHash] + fmt.Println("TRASH voteCount", voteCount) if voteCount <= p.F { continue } @@ -673,6 +696,7 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, } outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition } + fmt.Printf("TRASH outcome.ChannelDefinitions #%v\n", outcome.ChannelDefinitions) ///////////////////////////////// // outcome.ValidAfterSeconds @@ -689,10 +713,12 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, outcome.ValidAfterSeconds = map[commontypes.ChannelID]uint32{} for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds { - if previousOutcome.IsReportable(channelID) { + if err := previousOutcome.IsReportable(channelID); err != nil { // was reported based on previous outcome outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds } else { + // TODO: change log level based on what type of error we got + p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err) // was skipped based on previous outcome outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds } @@ -726,6 +752,9 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, // outcome.StreamMedians ///////////////////////////////// outcome.StreamMedians = map[commontypes.StreamID]*big.Int{} + if len(streamObservations) == 0 { + fmt.Println("TRASH no streamObservations") + } for streamID, observations := range streamObservations { sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 }) if len(observations) <= p.F { @@ -736,6 +765,13 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, outcome.StreamMedians[streamID] = observations[len(observations)/2] } + if len(outcome.StreamMedians) == 0 { + fmt.Println("TRASH no medians") + } + + fmt.Printf("TRASH streamObservations %#v\n", streamObservations) + fmt.Printf("TRASH outcome %#v\n", outcome) + return json.Marshal(outcome) } @@ -786,7 +822,7 @@ type Report struct { func (p *LLOPlugin) encodeReport(r Report, format commontypes.LLOReportFormat) (types.Report, error) { codec, exists := p.Codecs[format] if !exists { - return nil, fmt.Errorf("codec for ReportFormat=%s missing", format) + return nil, fmt.Errorf("codec missing for ReportFormat=%s", format) } return codec.Encode(r) } @@ -839,6 +875,8 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t }) } + fmt.Printf("TRASH Reports outcome %#v\n", outcome) + fmt.Printf("TRASH Reports outcome.ReportableChannels() %#v\n", outcome.ReportableChannels()) for _, channelID := range outcome.ReportableChannels() { channelDefinition := outcome.ChannelDefinitions[channelID] values := []*big.Int{} @@ -872,7 +910,7 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t if len(rwis) == 0 { // TODO: log something here? It's not gonna do anything - fmt.Println("TRASH empty reports") + fmt.Println("empty reports") } return rwis, nil diff --git a/llo/types.go b/llo/types.go index f2f9a32..a90bd8e 100644 --- a/llo/types.go +++ b/llo/types.go @@ -26,6 +26,6 @@ type Transmitter interface { } type ObsResult[T any] struct { - Val T - Err error + Val T + Valid bool }