Skip to content

Commit

Permalink
Add JSON report codec
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Jan 22, 2024
1 parent 6ed171c commit dae4ce6
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 32 deletions.
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ require (
github.com/smartcontractkit/chainlink-common v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7
github.com/stretchr/testify v1.8.4
github.com/test-go/testify v1.1.4
google.golang.org/protobuf v1.32.0
gotest.tools v2.2.0+incompatible
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 h1:21V61XO
github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down Expand Up @@ -74,5 +72,3 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
55 changes: 55 additions & 0 deletions llo/json_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package llo

import (
"encoding/hex"
"encoding/json"
"fmt"
"math/big"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/libocr/offchainreporting2/types"
)

var _ ReportCodec = JSONReportCodec{}

// JSONReportCodec is a chain-agnostic reference implementation

type JSONReportCodec struct{}

func (cdc JSONReportCodec) Encode(r Report) ([]byte, error) {
return json.Marshal(r)
}

func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) {
type decode struct {
ConfigDigest string
ChainSelector uint64
SeqNr uint64
ChannelID commontypes.ChannelID
ValidAfterSeconds uint32
ValidUntilSeconds uint32
Values []*big.Int
Specimen bool
}
d := decode{}
err = json.Unmarshal(b, &d)
cdBytes, err := hex.DecodeString(d.ConfigDigest)
if err != nil {
return r, fmt.Errorf("invalid ConfigDigest; %w", err)
}
cd, err := types.BytesToConfigDigest(cdBytes)
if err != nil {
return r, fmt.Errorf("invalid ConfigDigest; %w", err)
}

return Report{
ConfigDigest: cd,
ChainSelector: d.ChainSelector,
SeqNr: d.SeqNr,
ChannelID: d.ChannelID,
ValidAfterSeconds: d.ValidAfterSeconds,
ValidUntilSeconds: d.ValidUntilSeconds,
Values: d.Values,
Specimen: d.Specimen,
}, err
}
39 changes: 39 additions & 0 deletions llo/json_codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package llo

import (
"math/big"
"testing"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/libocr/offchainreporting2/types"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_JSONCodec(t *testing.T) {
t.Run("Encode=>Decode", func(t *testing.T) {
r := Report{
ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}),
ChainSelector: 42,
SeqNr: 43,
ChannelID: commontypes.ChannelID([32]byte{'f', 'o', 'o'}),
ValidAfterSeconds: 44,
ValidUntilSeconds: 45,
Values: []*big.Int{big.NewInt(1), big.NewInt(2)},
Specimen: true,
}

cdc := JSONReportCodec{}

encoded, err := cdc.Encode(r)
require.NoError(t, err)

assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","ChainSelector":42,"SeqNr":43,"ChannelID":"666f6f0000000000000000000000000000000000000000000000000000000000","ValidAfterSeconds":44,"ValidUntilSeconds":45,"Values":[1,2],"Specimen":true}`, string(encoded))

decoded, err := cdc.Decode(encoded)
require.NoError(t, err)

assert.Equal(t, r, decoded)
})
}
84 changes: 61 additions & 23 deletions llo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math/big"
"sort"
Expand Down Expand Up @@ -213,6 +214,7 @@ var _ ocr3types.ReportingPlugin[commontypes.LLOReportInfo] = &LLOPlugin{}

type ReportCodec interface {
Encode(Report) ([]byte, error)
Decode([]byte) (Report, error)
}

type LLOPlugin struct {
Expand Down Expand Up @@ -267,15 +269,15 @@ type Observation struct {
// *not* strictly) across the lifetime of a protocol instance and that
// outctx.previousOutcome contains the consensus outcome with sequence
// number (outctx.SeqNr-1).
//
// Should return a serialized Observation struct.
func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) {
fmt.Println("TRASH Observation")
// send empty observation in initial round
// NOTE: First sequence number is always 1
if outctx.SeqNr < 1 {
// send empty observation in initial round
return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr)
} else if outctx.SeqNr == 1 {
fmt.Println("TRASH initial round")
return types.Observation{}, nil // FIXME: but it needs to be properly serialized
}

Expand Down Expand Up @@ -312,6 +314,7 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
var addChannelDefinitions commontypes.ChannelDefinitions
{
expectedChannelDefs := p.ChannelDefinitionCache.Definitions()
fmt.Println("TRASH Observation definitions", expectedChannelDefs)

removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH)
for channelID := range removeChannelDefinitions {
Expand All @@ -328,11 +331,17 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
}

addChannelDefinitions = subtractChannelDefinitions(expectedChannelDefs, previousOutcome.ChannelDefinitions, MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH)
if len(addChannelDefinitions) > 0 {
fmt.Println("TRASH Observation addChannelDefinitions", len(addChannelDefinitions))
}
}

fmt.Println("TRASH Observation previousOutcome.ChannelDefinitions", len(previousOutcome.ChannelDefinitions))

var streamValues StreamValues
{
streams := map[commontypes.StreamID]struct{}{}
fmt.Println("TRASH len(previousOutcome.ChannelDefinitions)", len(previousOutcome.ChannelDefinitions))
for _, channelDefinition := range previousOutcome.ChannelDefinitions {
for _, streamID := range channelDefinition.StreamIDs {
streams[streamID] = struct{}{}
Expand All @@ -347,6 +356,8 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
}
}

fmt.Println("TRASH Observation streamValues", streamValues)

var rawObservation []byte
{
var err error
Expand All @@ -363,8 +374,6 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
}
}

fmt.Printf("TRASH rawObservation %#v\n", rawObservation)

return rawObservation, nil
}

Expand Down Expand Up @@ -446,44 +455,44 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) {
}

// Indicates whether a report can be generated for the given channel.
// TODO: Return error indicating why it isn't reportable
func (out *Outcome) IsReportable(channelID commontypes.ChannelID) bool {
// Returns nil if channel is reportable
func (out *Outcome) IsReportable(channelID commontypes.ChannelID) error {
if out.LifeCycleStage == LifeCycleStageRetired {
return false
return fmt.Errorf("IsReportable=false; retired channel with ID: %s", channelID)
}

observationsTimestampSeconds, err := out.ObservationsTimestampSeconds()
if err != nil {
return false
return fmt.Errorf("IsReportable=false; invalid observations timestamp; %w", err)
}

channelDefinition, ok := out.ChannelDefinitions[channelID]
if !ok {
return false
channelDefinition, exists := out.ChannelDefinitions[channelID]
if !exists {
return fmt.Errorf("IsReportable=false; no channel definition with ID: %s", channelID)
}

if _, err := chainselectors.ChainIdFromSelector(channelDefinition.ChainSelector); err != nil {
return false
return fmt.Errorf("IsReportable=false; invalid chain selector; %w", err)
}

for _, streamID := range channelDefinition.StreamIDs {
if out.StreamMedians[streamID] == nil {
return false
return errors.New("IsReportable=false; median was nil")
}
}

if _, ok := out.ValidAfterSeconds[channelID]; !ok {
// No validAfterSeconds entry yet, this must be a new channel.
// validAfterSeconds will be populated in Outcome() so the channel
// becomes reportable in later protocol rounds.
return false
return errors.New("IsReportable=false; no validAfterSeconds entry yet, this must be a new channel")
}

if out.ValidAfterSeconds[channelID] >= observationsTimestampSeconds {
return false
if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds {
return fmt.Errorf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds)
}

return true
return nil
}

// List of reportable channels (according to IsReportable), sorted according
Expand All @@ -492,7 +501,8 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID {
result := []commontypes.ChannelID{}

for channelID := range out.ChannelDefinitions {
if !out.IsReportable(channelID) {
if err := out.IsReportable(channelID); err != nil {
fmt.Println("TRASH not reportable", err)
continue
}
result = append(result, channelID)
Expand All @@ -516,6 +526,10 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID {
// outctx.previousOutcome contains the consensus outcome with sequence
// number (outctx.SeqNr-1).
func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
if len(aos) == 0 {
return nil, errors.New("no attributed observations")
}

if outctx.SeqNr <= 1 {
// Initial Outcome
var lifeCycleStage commontypes.LLOLifeCycleStage
Expand Down Expand Up @@ -597,13 +611,21 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
addChannelDefinitionsByHash[channelHash] = defWithID
}

fmt.Printf("TRASH outcome observation.StreamValues %#v\n", observation.StreamValues)
for id, obsResult := range observation.StreamValues {
if obsResult.Err == nil {
if obsResult.Valid {
streamObservations[id] = append(streamObservations[id], obsResult.Val)
} else {
// TODO: log something?
fmt.Println("TRASH invalid observation")
}
}
}

if len(timestampsNanoseconds) == 0 {
return nil, errors.New("no valid observations")
}

var outcome Outcome

/////////////////////////////////
Expand All @@ -626,7 +648,6 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,

/////////////////////////////////
// outcome.ObservationsTimestampNanoseconds
/////////////////////////////////
sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] })
outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2]

Expand All @@ -653,7 +674,9 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
}

for channelHash, defWithID := range addChannelDefinitionsByHash {
fmt.Println("TRASH addChannelDefinitionsByHash")
voteCount := addChannelVotesByHash[channelHash]
fmt.Println("TRASH voteCount", voteCount)
if voteCount <= p.F {
continue
}
Expand All @@ -673,6 +696,7 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
}
outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition
}
fmt.Printf("TRASH outcome.ChannelDefinitions #%v\n", outcome.ChannelDefinitions)

/////////////////////////////////
// outcome.ValidAfterSeconds
Expand All @@ -689,10 +713,12 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,

outcome.ValidAfterSeconds = map[commontypes.ChannelID]uint32{}
for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds {
if previousOutcome.IsReportable(channelID) {
if err := previousOutcome.IsReportable(channelID); err != nil {
// was reported based on previous outcome
outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds
} else {
// TODO: change log level based on what type of error we got
p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err)
// was skipped based on previous outcome
outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds
}
Expand Down Expand Up @@ -726,6 +752,9 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
// outcome.StreamMedians
/////////////////////////////////
outcome.StreamMedians = map[commontypes.StreamID]*big.Int{}
if len(streamObservations) == 0 {
fmt.Println("TRASH no streamObservations")
}
for streamID, observations := range streamObservations {
sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 })
if len(observations) <= p.F {
Expand All @@ -736,6 +765,13 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
outcome.StreamMedians[streamID] = observations[len(observations)/2]
}

if len(outcome.StreamMedians) == 0 {
fmt.Println("TRASH no medians")
}

fmt.Printf("TRASH streamObservations %#v\n", streamObservations)
fmt.Printf("TRASH outcome %#v\n", outcome)

return json.Marshal(outcome)
}

Expand Down Expand Up @@ -786,7 +822,7 @@ type Report struct {
func (p *LLOPlugin) encodeReport(r Report, format commontypes.LLOReportFormat) (types.Report, error) {
codec, exists := p.Codecs[format]
if !exists {
return nil, fmt.Errorf("codec for ReportFormat=%s missing", format)
return nil, fmt.Errorf("codec missing for ReportFormat=%s", format)
}
return codec.Encode(r)
}
Expand Down Expand Up @@ -839,6 +875,8 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t
})
}

fmt.Printf("TRASH Reports outcome %#v\n", outcome)
fmt.Printf("TRASH Reports outcome.ReportableChannels() %#v\n", outcome.ReportableChannels())
for _, channelID := range outcome.ReportableChannels() {
channelDefinition := outcome.ChannelDefinitions[channelID]
values := []*big.Int{}
Expand Down Expand Up @@ -872,7 +910,7 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t

if len(rwis) == 0 {
// TODO: log something here? It's not gonna do anything
fmt.Println("TRASH empty reports")
fmt.Println("empty reports")
}

return rwis, nil
Expand Down
Loading

0 comments on commit dae4ce6

Please sign in to comment.