From 35308d30de722a9709055e911046b41d57c48355 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 4 Dec 2023 15:13:56 -0500 Subject: [PATCH 1/2] Implement LLO plugin --- .tool-versions | 2 + go.mod | 23 +- go.sum | 53 +- llo/json_codec.go | 58 ++ llo/json_codec_test.go | 39 + llo/llo_offchain_config.pb.go | 145 +++ llo/llo_offchain_config.proto | 8 + llo/must.go | 8 + llo/offchain_config.go | 51 + llo/offchain_config_test.go | 45 + llo/onchain_config.go | 16 + llo/plugin.go | 924 ++++++++++++++++++ llo/predecessor_retirement_report_cache.go | 27 + ...redecessor_retirement_report_cache_test.go | 7 + llo/should_retire_cache.go | 19 + llo/should_retire_cache_test.go | 7 + llo/types.go | 31 + 17 files changed, 1451 insertions(+), 12 deletions(-) create mode 100644 .tool-versions create mode 100644 llo/json_codec.go create mode 100644 llo/json_codec_test.go create mode 100644 llo/llo_offchain_config.pb.go create mode 100644 llo/llo_offchain_config.proto create mode 100644 llo/must.go create mode 100644 llo/offchain_config.go create mode 100644 llo/offchain_config_test.go create mode 100644 llo/onchain_config.go create mode 100644 llo/plugin.go create mode 100644 llo/predecessor_retirement_report_cache.go create mode 100644 llo/predecessor_retirement_report_cache_test.go create mode 100644 llo/should_retire_cache.go create mode 100644 llo/should_retire_cache_test.go create mode 100644 llo/types.go diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..b2fe128 --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +golang 1.21.4 +protoc 23.2 diff --git a/go.mod b/go.mod index 2aa4b61..0b93658 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,38 @@ module github.com/smartcontractkit/chainlink-data-streams -go 1.21.3 +go 1.21 require ( github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20231204152334-1f32103bbb4c - github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 + github.com/smartcontractkit/chain-selectors v1.0.5 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240206175150-8264f16d8815 + github.com/smartcontractkit/libocr v0.0.0-20240112202000-6359502d2ff1 github.com/stretchr/testify v1.8.4 google.golang.org/protobuf v1.32.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_golang v1.17.0 // indirect + github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect + github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/crypto v0.13.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect + google.golang.org/grpc v1.58.3 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index dc8e680..78e3ca0 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,48 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= +github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20231204152334-1f32103bbb4c h1:YFyo0pCmKkpB4EOSykCZFueRXNxQ7OhKiCnhoVIzydo= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20231204152334-1f32103bbb4c/go.mod h1:Hrru9i7n+WEYyW2aIt3/YGPhxLX+HEGWnhk3yVXeDF8= -github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7 h1:21V61XOYSxpFmFqlhr5IaEh1uQ1F6CewJ30D/U/P34c= -github.com/smartcontractkit/libocr v0.0.0-20231107151413-13e0202ae8d7/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= +github.com/smartcontractkit/chain-selectors v1.0.5 h1:NOefQsogPZS4aBbWPFrgAyoke0gppN2ojfa8SQkhu8c= +github.com/smartcontractkit/chain-selectors v1.0.5/go.mod h1:WBhLlODF5b95vvx2tdKK55vGACg1+qZpuBhOGu1UXVo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240206175150-8264f16d8815 h1:gqKnHZdBNydojYfflbik1xNYqqbeQipj3P1Xi+o+kHM= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240206175150-8264f16d8815/go.mod h1:05rRF84QKlIOF5LfTBPkHdw4UpBI2G3zxRcuZ65bPjk= +github.com/smartcontractkit/libocr v0.0.0-20240112202000-6359502d2ff1 h1:3y9WsXkZ5lxFrmfH7DQHs/q308lylKId5l/3VC0QAdM= +github.com/smartcontractkit/libocr v0.0.0-20240112202000-6359502d2ff1/go.mod h1:kC0qmVPUaVkFqGiZMNhmRmjdphuUmeyLEdlWFOQzFWI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= @@ -28,8 +51,22 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/llo/json_codec.go b/llo/json_codec.go new file mode 100644 index 0000000..d4c1dff --- /dev/null +++ b/llo/json_codec.go @@ -0,0 +1,58 @@ +package llo + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "math/big" + + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2/types" +) + +var _ ReportCodec = JSONReportCodec{} + +// JSONReportCodec is a chain-agnostic reference implementation + +type JSONReportCodec struct{} + +func (cdc JSONReportCodec) Encode(r Report) ([]byte, error) { + return json.Marshal(r) +} + +func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) { + type decode struct { + ConfigDigest string + ChainSelector uint64 + SeqNr uint64 + ChannelID commontypes.ChannelID + ValidAfterSeconds uint32 + ValidUntilSeconds uint32 + Values []*big.Int + Specimen bool + } + d := decode{} + err = json.Unmarshal(b, &d) + if err != nil { + return r, fmt.Errorf("failed to decode report: expected JSON (got: %s); %w", b, err) + } + cdBytes, err := hex.DecodeString(d.ConfigDigest) + if err != nil { + return r, fmt.Errorf("invalid ConfigDigest; %w", err) + } + cd, err := types.BytesToConfigDigest(cdBytes) + if err != nil { + return r, fmt.Errorf("invalid ConfigDigest; %w", err) + } + + return Report{ + ConfigDigest: cd, + ChainSelector: d.ChainSelector, + SeqNr: d.SeqNr, + ChannelID: d.ChannelID, + ValidAfterSeconds: d.ValidAfterSeconds, + ValidUntilSeconds: d.ValidUntilSeconds, + Values: d.Values, + Specimen: d.Specimen, + }, err +} diff --git a/llo/json_codec_test.go b/llo/json_codec_test.go new file mode 100644 index 0000000..e817fb0 --- /dev/null +++ b/llo/json_codec_test.go @@ -0,0 +1,39 @@ +package llo + +import ( + "math/big" + "testing" + + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_JSONCodec(t *testing.T) { + t.Run("Encode=>Decode", func(t *testing.T) { + r := Report{ + ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}), + ChainSelector: 42, + SeqNr: 43, + ChannelID: commontypes.ChannelID(46), + ValidAfterSeconds: 44, + ValidUntilSeconds: 45, + Values: []*big.Int{big.NewInt(1), big.NewInt(2)}, + Specimen: true, + } + + cdc := JSONReportCodec{} + + encoded, err := cdc.Encode(r) + require.NoError(t, err) + + assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","ChainSelector":42,"SeqNr":43,"ChannelID":46,"ValidAfterSeconds":44,"ValidUntilSeconds":45,"Values":[1,2],"Specimen":true}`, string(encoded)) + + decoded, err := cdc.Decode(encoded) + require.NoError(t, err) + + assert.Equal(t, r, decoded) + }) +} diff --git a/llo/llo_offchain_config.pb.go b/llo/llo_offchain_config.pb.go new file mode 100644 index 0000000..9c7aab1 --- /dev/null +++ b/llo/llo_offchain_config.pb.go @@ -0,0 +1,145 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.23.2 +// source: llo_offchain_config.proto + +package llo + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type LLOOffchainConfigProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PredecessorConfigDigest []byte `protobuf:"bytes,1,opt,name=predecessorConfigDigest,proto3" json:"predecessorConfigDigest,omitempty"` +} + +func (x *LLOOffchainConfigProto) Reset() { + *x = LLOOffchainConfigProto{} + if protoimpl.UnsafeEnabled { + mi := &file_llo_offchain_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LLOOffchainConfigProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOOffchainConfigProto) ProtoMessage() {} + +func (x *LLOOffchainConfigProto) ProtoReflect() protoreflect.Message { + mi := &file_llo_offchain_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOOffchainConfigProto.ProtoReflect.Descriptor instead. +func (*LLOOffchainConfigProto) Descriptor() ([]byte, []int) { + return file_llo_offchain_config_proto_rawDescGZIP(), []int{0} +} + +func (x *LLOOffchainConfigProto) GetPredecessorConfigDigest() []byte { + if x != nil { + return x.PredecessorConfigDigest + } + return nil +} + +var File_llo_offchain_config_proto protoreflect.FileDescriptor + +var file_llo_offchain_config_proto_rawDesc = []byte{ + 0x0a, 0x19, 0x6c, 0x6c, 0x6f, 0x5f, 0x6f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x63, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x76, 0x31, 0x22, + 0x52, 0x0a, 0x16, 0x4c, 0x4c, 0x4f, 0x4f, 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x38, 0x0a, 0x17, 0x70, 0x72, 0x65, + 0x64, 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, + 0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x17, 0x70, 0x72, 0x65, 0x64, + 0x65, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, + 0x65, 0x73, 0x74, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x6c, 0x6c, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_llo_offchain_config_proto_rawDescOnce sync.Once + file_llo_offchain_config_proto_rawDescData = file_llo_offchain_config_proto_rawDesc +) + +func file_llo_offchain_config_proto_rawDescGZIP() []byte { + file_llo_offchain_config_proto_rawDescOnce.Do(func() { + file_llo_offchain_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_llo_offchain_config_proto_rawDescData) + }) + return file_llo_offchain_config_proto_rawDescData +} + +var file_llo_offchain_config_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_llo_offchain_config_proto_goTypes = []interface{}{ + (*LLOOffchainConfigProto)(nil), // 0: v1.LLOOffchainConfigProto +} +var file_llo_offchain_config_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_llo_offchain_config_proto_init() } +func file_llo_offchain_config_proto_init() { + if File_llo_offchain_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_llo_offchain_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LLOOffchainConfigProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_llo_offchain_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_llo_offchain_config_proto_goTypes, + DependencyIndexes: file_llo_offchain_config_proto_depIdxs, + MessageInfos: file_llo_offchain_config_proto_msgTypes, + }.Build() + File_llo_offchain_config_proto = out.File + file_llo_offchain_config_proto_rawDesc = nil + file_llo_offchain_config_proto_goTypes = nil + file_llo_offchain_config_proto_depIdxs = nil +} diff --git a/llo/llo_offchain_config.proto b/llo/llo_offchain_config.proto new file mode 100644 index 0000000..305b2ca --- /dev/null +++ b/llo/llo_offchain_config.proto @@ -0,0 +1,8 @@ +syntax="proto3"; + +package v1; +option go_package = ".;llo"; + +message LLOOffchainConfigProto { + bytes predecessorConfigDigest = 1; +} diff --git a/llo/must.go b/llo/must.go new file mode 100644 index 0000000..bf002ef --- /dev/null +++ b/llo/must.go @@ -0,0 +1,8 @@ +package llo + +func must[T any](x T, err error) T { + if err != nil { + panic(err) + } + return x +} diff --git a/llo/offchain_config.go b/llo/offchain_config.go new file mode 100644 index 0000000..bbd5554 --- /dev/null +++ b/llo/offchain_config.go @@ -0,0 +1,51 @@ +package llo + +import ( + "fmt" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "google.golang.org/protobuf/proto" +) + +type OffchainConfig struct { + // We use the offchainconfig of the plugin to tell the plugin the + // configdigest of its predecessor protocol instance. + // + // NOTE: Set here: + // https://github.com/smartcontractkit/mercury-v1-sketch/blob/f52c0f823788f86c1aeaa9ba1eee32a85b981535/onchain/src/ConfigurationStore.sol#L13 + // TODO: This needs to be implemented alongside staging/production + // switchover support: https://smartcontract-it.atlassian.net/browse/MERC-3386 + PredecessorConfigDigest *types.ConfigDigest + // TODO: Billing + // https://smartcontract-it.atlassian.net/browse/MERC-1189 + // QUESTION: Previously we stored ExpiryWindow and BaseUSDFeeCents in offchain + // config, but those might be channel specific so need to move to + // channel definition + // ExpirationWindow uint32 `json:"expirationWindow"` // Integer number of seconds + // BaseUSDFee decimal.Decimal `json:"baseUSDFee"` // Base USD fee +} + +func DecodeOffchainConfig(b []byte) (o OffchainConfig, err error) { + pbuf := &LLOOffchainConfigProto{} + err = proto.Unmarshal(b, pbuf) + if err != nil { + return o, fmt.Errorf("failed to decode offchain config: expected protobuf (got: 0x%x); %w", b, err) + } + if len(pbuf.PredecessorConfigDigest) > 0 { + var predecessorConfigDigest types.ConfigDigest + predecessorConfigDigest, err = types.BytesToConfigDigest(pbuf.PredecessorConfigDigest) + if err != nil { + return o, err + } + o.PredecessorConfigDigest = &predecessorConfigDigest + } + return +} + +func (c OffchainConfig) Encode() ([]byte, error) { + pbuf := LLOOffchainConfigProto{} + if c.PredecessorConfigDigest != nil { + pbuf.PredecessorConfigDigest = c.PredecessorConfigDigest[:] + } + return proto.Marshal(&pbuf) +} diff --git a/llo/offchain_config_test.go b/llo/offchain_config_test.go new file mode 100644 index 0000000..aa1f555 --- /dev/null +++ b/llo/offchain_config_test.go @@ -0,0 +1,45 @@ +package llo + +import ( + "testing" + + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_OffchainConfig(t *testing.T) { + t.Run("garbage bytes", func(t *testing.T) { + _, err := DecodeOffchainConfig([]byte{1}) + require.Error(t, err) + + assert.Contains(t, err.Error(), "failed to decode offchain config: expected protobuf (got: 0x01); proto:") + }) + + t.Run("zero length for PredecessorConfigDigest is ok", func(t *testing.T) { + decoded, err := DecodeOffchainConfig([]byte{}) + require.NoError(t, err) + assert.Equal(t, OffchainConfig{}, decoded) + }) + + t.Run("encoding nil PredecessorConfigDigest is ok", func(t *testing.T) { + cfg := OffchainConfig{nil} + + b, err := cfg.Encode() + require.NoError(t, err) + + assert.Len(t, b, 0) + }) + + t.Run("encode and decode", func(t *testing.T) { + cd := types.ConfigDigest([32]byte{1, 2, 3}) + cfg := OffchainConfig{&cd} + + b, err := cfg.Encode() + require.NoError(t, err) + + cfgDecoded, err := DecodeOffchainConfig(b) + require.NoError(t, err) + assert.Equal(t, cfg, cfgDecoded) + }) +} diff --git a/llo/onchain_config.go b/llo/onchain_config.go new file mode 100644 index 0000000..a240b38 --- /dev/null +++ b/llo/onchain_config.go @@ -0,0 +1,16 @@ +package llo + +type OnchainConfig struct{} + +var _ OnchainConfigCodec = &JSONOnchainConfigCodec{} + +// TODO: Replace this with protobuf, if it is actually used for something +type JSONOnchainConfigCodec struct{} + +func (c *JSONOnchainConfigCodec) Encode(OnchainConfig) ([]byte, error) { + return nil, nil +} + +func (c *JSONOnchainConfigCodec) Decode([]byte) (OnchainConfig, error) { + return OnchainConfig{}, nil +} diff --git a/llo/plugin.go b/llo/plugin.go new file mode 100644 index 0000000..0b66250 --- /dev/null +++ b/llo/plugin.go @@ -0,0 +1,924 @@ +package llo + +import ( + "context" + "crypto/sha256" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math/big" + "sort" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + + chainselectors "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" + ocr3types "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" +) + +// TODO: Split out this file and write unit tests: https://smartcontract-it.atlassian.net/browse/MERC-3524 + +// Notes: +// +// This is a sketch, there are many improvements to be made for this to be +// production-grade, secure code. +// +// We use JSON for serialization/deserialization. We rely on the fact that +// golang's json package serializes maps deterministically. Protobufs would +// likely be a more performant & efficient choice. + +// Additional limits so we can more effectively bound the size of observations +const ( + MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH = 5 + MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH = 5 + MAX_OBSERVATION_STREAM_VALUES_LENGTH = 1_000 +) + +const MAX_OUTCOME_CHANNEL_DEFINITIONS_LENGTH = 500 + +// Values for a set of streams, e.g. "eth-usd", "link-usd", and "eur-chf" +// TODO: generalize from *big.Int to anything +// https://smartcontract-it.atlassian.net/browse/MERC-3525 +// TODO: Consider renaming to StreamDataPoints? +type StreamValues map[commontypes.StreamID]ObsResult[*big.Int] + +type DataSource interface { + // For each known streamID, Observe should return a non-nil entry in + // StreamValues. Observe should ignore unknown streamIDs. + Observe(ctx context.Context, streamIDs map[commontypes.StreamID]struct{}) (StreamValues, error) +} + +// Protocol instances start in either the staging or production stage. They +// may later be retired and "hand over" their work to another protocol instance +// that will move from the staging to the production stage. +const ( + LifeCycleStageStaging commontypes.LLOLifeCycleStage = "staging" + LifeCycleStageProduction commontypes.LLOLifeCycleStage = "production" + LifeCycleStageRetired commontypes.LLOLifeCycleStage = "retired" +) + +type RetirementReport struct { + // Carries validity time stamps between protocol instances to ensure there + // are no gaps + ValidAfterSeconds map[commontypes.ChannelID]uint32 +} + +type ShouldRetireCache interface { // reads asynchronously from onchain ConfigurationStore + // Should the protocol instance retire according to the configuration + // contract? + // See: https://github.com/smartcontractkit/mercury-v1-sketch/blob/main/onchain/src/ConfigurationStore.sol#L18 + ShouldRetire() (bool, error) +} + +// The predecessor protocol instance stores its attested retirement report in +// this cache (locally, offchain), so it can be fetched by the successor +// protocol instance. +// +// PredecessorRetirementReportCache is populated by the old protocol instance +// writing to it and the new protocol instance reading from it. +// +// The sketch envisions it being implemented as a single object that is shared +// between different protocol instances. +type PredecessorRetirementReportCache interface { + AttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest) ([]byte, error) + CheckAttestedRetirementReport(predecessorConfigDigest ocr2types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) +} + +const ( + ReportFormatEVM commontypes.LLOReportFormat = "evm" + ReportFormatJSON commontypes.LLOReportFormat = "json" + // Solana, CosmWasm, kalechain, etc... all go here +) + +// MakeChannelHash is used for mapping ChannelDefinitionWithIDs +func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash { + h := sha256.New() + merr := errors.Join( + binary.Write(h, binary.BigEndian, cd.ChannelID), + binary.Write(h, binary.BigEndian, uint32(len(cd.ReportFormat))), + ) + _, err := h.Write([]byte(cd.ReportFormat)) + merr = errors.Join(merr, + err, + binary.Write(h, binary.BigEndian, cd.ChainSelector), + binary.Write(h, binary.BigEndian, uint32(len(cd.StreamIDs))), + ) + for _, streamID := range cd.StreamIDs { + merr = errors.Join(merr, binary.Write(h, binary.BigEndian, streamID)) + } + if merr != nil { + // This should never happen + panic(merr) + } + var result [32]byte + h.Sum(result[:0]) + return result +} + +// A ReportingPlugin allows plugging custom logic into the OCR3 protocol. The OCR +// protocol handles cryptography, networking, ensuring that a sufficient number +// of nodes is in agreement about any report, transmitting the report to the +// contract, etc... The ReportingPlugin handles application-specific logic. To do so, +// the ReportingPlugin defines a number of callbacks that are called by the OCR +// protocol logic at certain points in the protocol's execution flow. The report +// generated by the ReportingPlugin must be in a format understood by contract that +// the reports are transmitted to. +// +// We assume that each correct node participating in the protocol instance will +// be running the same ReportingPlugin implementation. However, not all nodes may be +// correct; up to f nodes be faulty in arbitrary ways (aka byzantine faults). +// For example, faulty nodes could be down, have intermittent connectivity +// issues, send garbage messages, or be controlled by an adversary. +// +// For a protocol round where everything is working correctly, followers will +// call Observation, Outcome, and Reports. For each report, +// ShouldAcceptAttestedReport will be called as well. If +// ShouldAcceptAttestedReport returns true, ShouldTransmitAcceptedReport will +// be called. However, an ReportingPlugin must also correctly handle the case where +// faults occur. +// +// In particular, an ReportingPlugin must deal with cases where: +// +// - only a subset of the functions on the ReportingPlugin are invoked for a given +// round +// +// - an arbitrary number of seqnrs has been skipped between invocations of the +// ReportingPlugin +// +// - the observation returned by Observation is not included in the list of +// AttributedObservations passed to Report +// +// - a query or observation is malformed. (For defense in depth, it is also +// recommended that malformed outcomes are handled gracefully.) +// +// - instances of the ReportingPlugin run by different oracles have different call +// traces. E.g., the ReportingPlugin's Observation function may have been invoked on +// node A, but not on node B. +// +// All functions on an ReportingPlugin should be thread-safe. +// +// All functions that take a context as their first argument may still do cheap +// computations after the context expires, but should stop any blocking +// interactions with outside services (APIs, database, ...) and return as +// quickly as possible. (Rough rule of thumb: any such computation should not +// take longer than a few ms.) A blocking function may block execution of the +// entire protocol instance on its node! +// +// For a given OCR protocol instance, there can be many (consecutive) instances +// of an ReportingPlugin, e.g. due to software restarts. If you need ReportingPlugin state +// to survive across restarts, you should store it in the Outcome or persist it. +// A ReportingPlugin instance will only ever serve a single protocol instance. +var _ ocr3types.ReportingPluginFactory[commontypes.LLOReportInfo] = &PluginFactory{} + +func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc commontypes.ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[commontypes.LLOReportFormat]ReportCodec) *PluginFactory { + return &PluginFactory{ + prrc, src, cdc, ds, lggr, codecs, + } +} + +type PluginFactory struct { + PredecessorRetirementReportCache PredecessorRetirementReportCache + ShouldRetireCache ShouldRetireCache + ChannelDefinitionCache commontypes.ChannelDefinitionCache + DataSource DataSource + Logger logger.Logger + Codecs map[commontypes.LLOReportFormat]ReportCodec +} + +func (f *PluginFactory) NewReportingPlugin(cfg ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[commontypes.LLOReportInfo], ocr3types.ReportingPluginInfo, error) { + offchainCfg, err := DecodeOffchainConfig(cfg.OffchainConfig) + if err != nil { + return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("NewReportingPlugin failed to decode offchain config; got: 0x%x (len: %d); %w", cfg.OffchainConfig, len(cfg.OffchainConfig), err) + } + + return &LLOPlugin{ + offchainCfg.PredecessorConfigDigest, + cfg.ConfigDigest, + f.PredecessorRetirementReportCache, + f.ShouldRetireCache, + f.ChannelDefinitionCache, + f.DataSource, + f.Logger, + cfg.F, + f.Codecs, + }, ocr3types.ReportingPluginInfo{ + Name: "LLO", + Limits: ocr3types.ReportingPluginLimits{ + MaxQueryLength: 0, + MaxObservationLength: ocr3types.MaxMaxObservationLength, // TODO: use tighter bound + MaxOutcomeLength: ocr3types.MaxMaxOutcomeLength, // TODO: use tighter bound + MaxReportLength: ocr3types.MaxMaxReportLength, // TODO: use tighter bound + MaxReportCount: ocr3types.MaxMaxReportCount, // TODO: use tighter bound + }, + }, nil +} + +var _ ocr3types.ReportingPlugin[commontypes.LLOReportInfo] = &LLOPlugin{} + +type ReportCodec interface { + Encode(Report) ([]byte, error) + Decode([]byte) (Report, error) + // TODO: max length check? https://smartcontract-it.atlassian.net/browse/MERC-3524 +} + +type LLOPlugin struct { + PredecessorConfigDigest *types.ConfigDigest + ConfigDigest types.ConfigDigest + PredecessorRetirementReportCache PredecessorRetirementReportCache + ShouldRetireCache ShouldRetireCache + ChannelDefinitionCache commontypes.ChannelDefinitionCache + DataSource DataSource + Logger logger.Logger + F int + Codecs map[commontypes.LLOReportFormat]ReportCodec +} + +// Query creates a Query that is sent from the leader to all follower nodes +// as part of the request for an observation. Be careful! A malicious leader +// could equivocate (i.e. send different queries to different followers.) +// Many applications will likely be better off always using an empty query +// if the oracles don't need to coordinate on what to observe (e.g. in case +// of a price feed) or the underlying data source offers an (eventually) +// consistent view to different oracles (e.g. in case of observing a +// blockchain). +// +// You may assume that the outctx.SeqNr is increasing monotonically (though +// *not* strictly) across the lifetime of a protocol instance and that +// outctx.previousOutcome contains the consensus outcome with sequence +// number (outctx.SeqNr-1). +func (p *LLOPlugin) Query(ctx context.Context, outctx ocr3types.OutcomeContext) (types.Query, error) { + return nil, nil +} + +type Observation struct { + // Attested (i.e. signed by f+1 oracles) retirement report from predecessor + // protocol instance + AttestedPredecessorRetirement []byte + // Should this protocol instance be retired? + ShouldRetire bool + // Timestamp from when observation is made + UnixTimestampNanoseconds int64 + // Votes to remove/add channels. Subject to MAX_OBSERVATION_*_LENGTH limits + RemoveChannelIDs map[commontypes.ChannelID]struct{} + AddChannelDefinitions commontypes.ChannelDefinitions + // Observed (numeric) stream values. Subject to + // MAX_OBSERVATION_STREAM_VALUES_LENGTH limit + StreamValues StreamValues +} + +// Observation gets an observation from the underlying data source. Returns +// a value or an error. +// +// You may assume that the outctx.SeqNr is increasing monotonically (though +// *not* strictly) across the lifetime of a protocol instance and that +// outctx.previousOutcome contains the consensus outcome with sequence +// number (outctx.SeqNr-1). +// +// Should return a serialized Observation struct. +func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query types.Query) (types.Observation, error) { + // send empty observation in initial round + // NOTE: First sequence number is always 1 + if outctx.SeqNr < 1 { + // send empty observation in initial round + return types.Observation{}, fmt.Errorf("got invalid seqnr=%d, must be >=1", outctx.SeqNr) + } else if outctx.SeqNr == 1 { + return types.Observation{}, nil // FIXME: but it needs to be properly serialized + } + + // QUESTION: is there a way to have this captured in EAs so we get something + // closer to the source? + nowNanoseconds := time.Now().UnixNano() + + var previousOutcome Outcome + if err := json.Unmarshal(outctx.PreviousOutcome, &previousOutcome); err != nil { + return nil, fmt.Errorf("error unmarshalling previous outcome: %w", err) + } + + var attestedRetirementReport []byte + // Only try to fetch this from the cache if this instance if configured + // with a predecessor and we're still in the staging stage. + if p.PredecessorConfigDigest != nil && previousOutcome.LifeCycleStage == LifeCycleStageStaging { + var err error + attestedRetirementReport, err = p.PredecessorRetirementReportCache.AttestedRetirementReport(*p.PredecessorConfigDigest) + if err != nil { + return nil, fmt.Errorf("error fetching attested retirement report from cache: %w", err) + } + } + + shouldRetire, err := p.ShouldRetireCache.ShouldRetire() + if err != nil { + return nil, fmt.Errorf("error fetching shouldRetire from cache: %w", err) + } + + // vote to remove channel ids if they're in the previous outcome + // ChannelDefinitions or ValidAfterSeconds + removeChannelIDs := map[commontypes.ChannelID]struct{}{} + // vote to add channel definitions that aren't present in the previous + // outcome ChannelDefinitions + var addChannelDefinitions commontypes.ChannelDefinitions + { + expectedChannelDefs := p.ChannelDefinitionCache.Definitions() + + removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH) + for channelID := range removeChannelDefinitions { + removeChannelIDs[channelID] = struct{}{} + } + + for channelID := range previousOutcome.ValidAfterSeconds { + if len(removeChannelIDs) >= MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH { + break + } + if _, ok := expectedChannelDefs[channelID]; !ok { + removeChannelIDs[channelID] = struct{}{} + } + } + + addChannelDefinitions = subtractChannelDefinitions(expectedChannelDefs, previousOutcome.ChannelDefinitions, MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH) + } + + var streamValues StreamValues + { + streams := map[commontypes.StreamID]struct{}{} + for _, channelDefinition := range previousOutcome.ChannelDefinitions { + for _, streamID := range channelDefinition.StreamIDs { + streams[streamID] = struct{}{} + } + } + + var err error + // TODO: Should probably be a slice, not map? + streamValues, err = p.DataSource.Observe(ctx, streams) + if err != nil { + return nil, fmt.Errorf("DataSource.Observe error: %w", err) + } + } + + var rawObservation []byte + { + var err error + rawObservation, err = json.Marshal(Observation{ + attestedRetirementReport, + shouldRetire, + nowNanoseconds, + removeChannelIDs, + addChannelDefinitions, + streamValues, + }) + if err != nil { + return nil, fmt.Errorf("json.Marshal error: %w", err) + } + } + + return rawObservation, nil +} + +// Should return an error if an observation isn't well-formed. +// Non-well-formed observations will be discarded by the protocol. This is +// called for each observation, don't do anything slow in here. +// +// You may assume that the outctx.SeqNr is increasing monotonically (though +// *not* strictly) across the lifetime of a protocol instance and that +// outctx.previousOutcome contains the consensus outcome with sequence +// number (outctx.SeqNr-1). +func (p *LLOPlugin) ValidateObservation(outctx ocr3types.OutcomeContext, query types.Query, ao types.AttributedObservation) error { + if outctx.SeqNr <= 1 { + if len(ao.Observation) != 0 { + return fmt.Errorf("Observation is not empty") + } + } + + var observation Observation + // FIXME: do we really want to allow empty observations? happens because "" is not valid JSON + if len(ao.Observation) > 0 { + err := json.Unmarshal(ao.Observation, &observation) + if err != nil { + return fmt.Errorf("Observation is invalid json (got: %q): %w", ao.Observation, err) + } + } + + if p.PredecessorConfigDigest == nil && len(observation.AttestedPredecessorRetirement) != 0 { + return fmt.Errorf("AttestedPredecessorRetirement is not empty even though this instance has no predecessor") + } + + if len(observation.AddChannelDefinitions) > MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH { + return fmt.Errorf("AddChannelDefinitions is too long: %v vs %v", len(observation.AddChannelDefinitions), MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH) + } + + if len(observation.RemoveChannelIDs) > MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH { + return fmt.Errorf("RemoveChannelIDs is too long: %v vs %v", len(observation.RemoveChannelIDs), MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH) + } + + if len(observation.StreamValues) > MAX_OBSERVATION_STREAM_VALUES_LENGTH { + return fmt.Errorf("StreamValues is too long: %v vs %v", len(observation.StreamValues), MAX_OBSERVATION_STREAM_VALUES_LENGTH) + } + + for streamID, obsResult := range observation.StreamValues { + if obsResult.Valid && obsResult.Val == nil { + return fmt.Errorf("stream with id %q was marked valid but carries nil value", streamID) + } + } + + return nil +} + +type Outcome struct { + // LifeCycleStage the protocol is in + LifeCycleStage commontypes.LLOLifeCycleStage + // ObservationsTimestampNanoseconds is the median timestamp from the + // latest set of observations + ObservationsTimestampNanoseconds int64 + // ChannelDefinitions defines the set & structure of channels for which we + // generate reports + ChannelDefinitions commontypes.ChannelDefinitions + // Latest ValidAfterSeconds value for each channel, reports for each channel + // span from ValidAfterSeconds to ObservationTimestampSeconds + ValidAfterSeconds map[commontypes.ChannelID]uint32 + // StreamMedians is the median observed value for each stream + // QUESTION: Can we use arbitrary types here to allow for other types or + // consensus methods? + StreamMedians map[commontypes.StreamID]*big.Int +} + +// The Outcome's ObservationsTimestamp rounded down to seconds precision +func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) { + result := time.Unix(0, out.ObservationsTimestampNanoseconds).Unix() + if int64(uint32(result)) != result { + return 0, fmt.Errorf("timestamp doesn't fit into uint32: %v", result) + } + return uint32(result), nil +} + +// Indicates whether a report can be generated for the given channel. +// Returns nil if channel is reportable +func (out *Outcome) IsReportable(channelID commontypes.ChannelID) error { + if out.LifeCycleStage == LifeCycleStageRetired { + return fmt.Errorf("IsReportable=false; retired channel with ID: %d", channelID) + } + + observationsTimestampSeconds, err := out.ObservationsTimestampSeconds() + if err != nil { + return fmt.Errorf("IsReportable=false; invalid observations timestamp; %w", err) + } + + channelDefinition, exists := out.ChannelDefinitions[channelID] + if !exists { + return fmt.Errorf("IsReportable=false; no channel definition with ID: %d", channelID) + } + + if _, err := chainselectors.ChainIdFromSelector(channelDefinition.ChainSelector); err != nil { + return fmt.Errorf("IsReportable=false; invalid chain selector; %w", err) + } + + for _, streamID := range channelDefinition.StreamIDs { + if out.StreamMedians[streamID] == nil { + return errors.New("IsReportable=false; median was nil") + } + } + + if _, ok := out.ValidAfterSeconds[channelID]; !ok { + // No validAfterSeconds entry yet, this must be a new channel. + // validAfterSeconds will be populated in Outcome() so the channel + // becomes reportable in later protocol rounds. + return errors.New("IsReportable=false; no validAfterSeconds entry yet, this must be a new channel") + } + + if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds { + return fmt.Errorf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds) + } + + return nil +} + +// List of reportable channels (according to IsReportable), sorted according +// to a canonical ordering +func (out *Outcome) ReportableChannels() []commontypes.ChannelID { + result := []commontypes.ChannelID{} + + for channelID := range out.ChannelDefinitions { + if err := out.IsReportable(channelID); err != nil { + continue + } + result = append(result, channelID) + } + + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) + + return result +} + +// Generates an outcome for a seqNr, typically based on the previous +// outcome, the current query, and the current set of attributed +// observations. +// +// This function should be pure. Don't do anything slow in here. +// +// You may assume that the outctx.SeqNr is increasing monotonically (though +// *not* strictly) across the lifetime of a protocol instance and that +// outctx.previousOutcome contains the consensus outcome with sequence +// number (outctx.SeqNr-1). +func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { + if len(aos) == 0 { + return nil, errors.New("no attributed observations") + } + + if outctx.SeqNr <= 1 { + // Initial Outcome + var lifeCycleStage commontypes.LLOLifeCycleStage + if p.PredecessorConfigDigest == nil { + // Start straight in production if we have no predecessor + lifeCycleStage = LifeCycleStageProduction + } else { + lifeCycleStage = LifeCycleStageStaging + } + outcome := Outcome{ + lifeCycleStage, + 0, + nil, + nil, + nil, + } + return json.Marshal(outcome) + } + + ///////////////////////////////// + // Decode previousOutcome + ///////////////////////////////// + var previousOutcome Outcome + if err := json.Unmarshal(outctx.PreviousOutcome, &previousOutcome); err != nil { + return nil, fmt.Errorf("error unmarshalling previous outcome: %v", err) + } + + ///////////////////////////////// + // Decode observations + ///////////////////////////////// + + // a single valid retirement report is enough + var validPredecessorRetirementReport *RetirementReport + + shouldRetireVotes := 0 + + timestampsNanoseconds := []int64{} + + removeChannelVotesByID := map[commontypes.ChannelID]int{} + + // for each channelId count number of votes that mention it and count number of votes that include it. + addChannelVotesByHash := map[ChannelHash]int{} + addChannelDefinitionsByHash := map[ChannelHash]ChannelDefinitionWithID{} + + streamObservations := map[commontypes.StreamID][]*big.Int{} + + for _, ao := range aos { + observation := Observation{} + // TODO: Use protobufs + if err := json.Unmarshal(ao.Observation, &observation); err != nil { + p.Logger.Warnw("ignoring invalid observation", "oracleID", ao.Observer, "error", err) + continue + } + + if len(observation.AttestedPredecessorRetirement) != 0 && validPredecessorRetirementReport == nil { + pcd := *p.PredecessorConfigDigest + retirementReport, err := p.PredecessorRetirementReportCache.CheckAttestedRetirementReport(pcd, observation.AttestedPredecessorRetirement) + if err != nil { + p.Logger.Warnw("ignoring observation with invalid attested predecessor retirement", "oracleID", ao.Observer, "error", err, "predecessorConfigDigest", pcd) + continue + } + validPredecessorRetirementReport = &retirementReport + } + + if observation.ShouldRetire { + shouldRetireVotes++ + } + + timestampsNanoseconds = append(timestampsNanoseconds, observation.UnixTimestampNanoseconds) + + for channelID := range observation.RemoveChannelIDs { + removeChannelVotesByID[channelID]++ + } + + for channelID, channelDefinition := range observation.AddChannelDefinitions { + defWithID := ChannelDefinitionWithID{channelDefinition, channelID} + channelHash := MakeChannelHash(defWithID) + addChannelVotesByHash[channelHash]++ + addChannelDefinitionsByHash[channelHash] = defWithID + } + + for id, obsResult := range observation.StreamValues { + if obsResult.Valid { + streamObservations[id] = append(streamObservations[id], obsResult.Val) + } else { + p.Logger.Debugw("Ignoring invalid observation", "streamID", id, "oracleID", ao.Observer) + } + } + } + + if len(timestampsNanoseconds) == 0 { + return nil, errors.New("no valid observations") + } + + var outcome Outcome + + ///////////////////////////////// + // outcome.LifeCycleStage + ///////////////////////////////// + if previousOutcome.LifeCycleStage == LifeCycleStageStaging && validPredecessorRetirementReport != nil { + // Promote this protocol instance to the production stage! 🚀 + + // override ValidAfterSeconds with the value from the retirement report + // so that we have no gaps in the validity time range. + outcome.ValidAfterSeconds = validPredecessorRetirementReport.ValidAfterSeconds + outcome.LifeCycleStage = LifeCycleStageProduction + } else { + outcome.LifeCycleStage = previousOutcome.LifeCycleStage + } + + if outcome.LifeCycleStage == LifeCycleStageProduction && shouldRetireVotes > p.F { + outcome.LifeCycleStage = LifeCycleStageRetired + } + + ///////////////////////////////// + // outcome.ObservationsTimestampNanoseconds + sort.Slice(timestampsNanoseconds, func(i, j int) bool { return timestampsNanoseconds[i] < timestampsNanoseconds[j] }) + outcome.ObservationsTimestampNanoseconds = timestampsNanoseconds[len(timestampsNanoseconds)/2] + + ///////////////////////////////// + // outcome.ChannelDefinitions + ///////////////////////////////// + outcome.ChannelDefinitions = previousOutcome.ChannelDefinitions + if outcome.ChannelDefinitions == nil { + outcome.ChannelDefinitions = commontypes.ChannelDefinitions{} + } + + // if retired, stop updating channel definitions + if outcome.LifeCycleStage == LifeCycleStageRetired { + removeChannelVotesByID, addChannelDefinitionsByHash = nil, nil + } + + var removedChannelIDs []commontypes.ChannelID + for channelID, voteCount := range removeChannelVotesByID { + if voteCount <= p.F { + continue + } + removedChannelIDs = append(removedChannelIDs, channelID) + delete(outcome.ChannelDefinitions, channelID) + } + + for channelHash, defWithID := range addChannelDefinitionsByHash { + voteCount := addChannelVotesByHash[channelHash] + if voteCount <= p.F { + continue + } + if conflictDef, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists { + p.Logger.Warn("More than f nodes vote to add a channel, but a channel with the same id already exists", + "existingChannelDefinition", conflictDef, + "addChannelDefinition", defWithID, + ) + continue + } + if len(outcome.ChannelDefinitions) > MAX_OUTCOME_CHANNEL_DEFINITIONS_LENGTH { + p.Logger.Warn("Cannot add channel, outcome already contains maximum number of channels", + "maxOutcomeChannelDefinitionsLength", MAX_OUTCOME_CHANNEL_DEFINITIONS_LENGTH, + "addChannelDefinition", defWithID, + ) + continue + } + outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition + } + + ///////////////////////////////// + // outcome.ValidAfterSeconds + ///////////////////////////////// + + // ValidAfterSeconds can be non-nil here if earlier code already + // populated ValidAfterSeconds during promotion to production. In this + // case, nothing to do. + if outcome.ValidAfterSeconds == nil { + previousObservationsTimestampSeconds, err := previousOutcome.ObservationsTimestampSeconds() + if err != nil { + return nil, fmt.Errorf("error getting previous outcome's observations timestamp: %v", err) + } + + outcome.ValidAfterSeconds = map[commontypes.ChannelID]uint32{} + for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds { + if err := previousOutcome.IsReportable(channelID); err != nil { + p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err) + // was reported based on previous outcome + outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds + } else { + p.Logger.Debugw("Channel is reportable", "channelID", channelID) + // TODO: change log level based on what type of error we got + // was skipped based on previous outcome + outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds + } + } + } + + observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() + if err != nil { + return nil, fmt.Errorf("error getting outcome's observations timestamp: %w", err) + } + + for channelID := range outcome.ChannelDefinitions { + if _, ok := outcome.ValidAfterSeconds[channelID]; !ok { + // new channel, set validAfterSeconds to observations timestamp + outcome.ValidAfterSeconds[channelID] = observationsTimestampSeconds + } + } + + // One might think that we should simply delete any channel from + // ValidAfterSeconds that is not mentioned in the ChannelDefinitions. This + // could, however, lead to gaps being created if this protocol instance is + // promoted from staging to production while we're still "ramping up" the + // full set of channels. We do the "safe" thing (i.e. minimizing occurrence + // of gaps) here and only remove channels if there has been an explicit vote + // to remove them. + for _, channelID := range removedChannelIDs { + delete(outcome.ValidAfterSeconds, channelID) + } + + ///////////////////////////////// + // outcome.StreamMedians + ///////////////////////////////// + outcome.StreamMedians = map[commontypes.StreamID]*big.Int{} + for streamID, observations := range streamObservations { + sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 }) + if len(observations) <= p.F { + continue + } + // We use a "rank-k" median here, instead one could average in case of + // an even number of observations. + outcome.StreamMedians[streamID] = observations[len(observations)/2] + } + + return json.Marshal(outcome) +} + +type Report struct { + ConfigDigest types.ConfigDigest + // Chain the report is destined for + ChainSelector uint64 + // OCR sequence number of this report + SeqNr uint64 + // Channel that is being reported on + ChannelID commontypes.ChannelID + // Report is valid for ValidAfterSeconds < block.time <= ValidUntilSeconds + ValidAfterSeconds uint32 + ValidUntilSeconds uint32 + // Here we only encode big.Ints, but in principle there's nothing stopping + // us from also supporting non-numeric data or smaller values etc... + Values []*big.Int + // The contract onchain will only validate non-specimen reports. A staging + // protocol instance will generate specimen reports so we can validate it + // works properly without any risk of misreports landing on chain. + Specimen bool +} + +func (p *LLOPlugin) encodeReport(r Report, format commontypes.LLOReportFormat) (types.Report, error) { + codec, exists := p.Codecs[format] + if !exists { + return nil, fmt.Errorf("codec missing for ReportFormat=%s", format) + } + return codec.Encode(r) +} + +// Generates a (possibly empty) list of reports from an outcome. Each report +// will be signed and possibly be transmitted to the contract. (Depending on +// ShouldAcceptAttestedReport & ShouldTransmitAcceptedReport) +// +// This function should be pure. Don't do anything slow in here. +// +// This is likely to change in the future. It will likely be returning a +// list of report batches, where each batch goes into its own Merkle tree. +// +// You may assume that the outctx.SeqNr is increasing monotonically (though +// *not* strictly) across the lifetime of a protocol instance and that +// outctx.previousOutcome contains the consensus outcome with sequence +// number (outctx.SeqNr-1). +func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3types.ReportWithInfo[commontypes.LLOReportInfo], error) { + if seqNr <= 1 { + // no reports for initial round + return nil, nil + } + + var outcome Outcome + if err := json.Unmarshal(rawOutcome, &outcome); err != nil { + return nil, fmt.Errorf("error unmarshalling outcome: %w", err) + } + + observationsTimestampSeconds, err := outcome.ObservationsTimestampSeconds() + if err != nil { + return nil, fmt.Errorf("error getting observations timestamp: %w", err) + } + + rwis := []ocr3types.ReportWithInfo[commontypes.LLOReportInfo]{} + + if outcome.LifeCycleStage == LifeCycleStageRetired { + // if we're retired, emit special retirement report to transfer + // ValidAfterSeconds part of state to the new protocol instance for a + // "gapless" handover + retirementReport := RetirementReport{ + outcome.ValidAfterSeconds, + } + + rwis = append(rwis, ocr3types.ReportWithInfo[commontypes.LLOReportInfo]{ + Report: must(json.Marshal(retirementReport)), + Info: commontypes.LLOReportInfo{ + LifeCycleStage: outcome.LifeCycleStage, + ReportFormat: ReportFormatJSON, + }, + }) + } + + for _, channelID := range outcome.ReportableChannels() { + channelDefinition := outcome.ChannelDefinitions[channelID] + values := []*big.Int{} + for _, streamID := range channelDefinition.StreamIDs { + values = append(values, outcome.StreamMedians[streamID]) + } + + report := Report{ + p.ConfigDigest, + channelDefinition.ChainSelector, + seqNr, + channelID, + outcome.ValidAfterSeconds[channelID], + observationsTimestampSeconds, + values, + outcome.LifeCycleStage != LifeCycleStageProduction, + } + + encoded, err := p.encodeReport(report, channelDefinition.ReportFormat) + if err != nil { + return nil, err + } + rwis = append(rwis, ocr3types.ReportWithInfo[commontypes.LLOReportInfo]{ + Report: encoded, + Info: commontypes.LLOReportInfo{ + LifeCycleStage: outcome.LifeCycleStage, + ReportFormat: channelDefinition.ReportFormat, + }, + }) + } + + if len(rwis) == 0 { + p.Logger.Debugw("No reports", "reportableChannels", outcome.ReportableChannels()) + } + + return rwis, nil +} + +func (p *LLOPlugin) ShouldAcceptAttestedReport(context.Context, uint64, ocr3types.ReportWithInfo[commontypes.LLOReportInfo]) (bool, error) { + // Transmit it all to the Mercury server + return true, nil +} + +func (p *LLOPlugin) ShouldTransmitAcceptedReport(context.Context, uint64, ocr3types.ReportWithInfo[commontypes.LLOReportInfo]) (bool, error) { + // Transmit it all to the Mercury server + return true, nil +} + +// ObservationQuorum returns the minimum number of valid (according to +// ValidateObservation) observations needed to construct an outcome. +// +// This function should be pure. Don't do anything slow in here. +// +// This is an advanced feature. The "default" approach (what OCR1 & OCR2 +// did) is to have an empty ValidateObservation function and return +// QuorumTwoFPlusOne from this function. +func (p *LLOPlugin) ObservationQuorum(outctx ocr3types.OutcomeContext, query types.Query) (ocr3types.Quorum, error) { + return ocr3types.QuorumTwoFPlusOne, nil +} + +func (p *LLOPlugin) Close() error { + return nil +} + +func subtractChannelDefinitions(minuend commontypes.ChannelDefinitions, subtrahend commontypes.ChannelDefinitions, limit int) commontypes.ChannelDefinitions { + differenceList := []ChannelDefinitionWithID{} + for channelID, channelDefinition := range minuend { + if _, ok := subtrahend[channelID]; !ok { + differenceList = append(differenceList, ChannelDefinitionWithID{channelDefinition, channelID}) + } + } + + // Sort so we return deterministic result + sort.Slice(differenceList, func(i, j int) bool { + return differenceList[i].ChannelID < differenceList[j].ChannelID + }) + + if len(differenceList) > limit { + differenceList = differenceList[:limit] + } + + difference := commontypes.ChannelDefinitions{} + for _, defWithID := range differenceList { + difference[defWithID.ChannelID] = defWithID.ChannelDefinition + } + + return difference +} diff --git a/llo/predecessor_retirement_report_cache.go b/llo/predecessor_retirement_report_cache.go new file mode 100644 index 0000000..ee305fe --- /dev/null +++ b/llo/predecessor_retirement_report_cache.go @@ -0,0 +1,27 @@ +package llo + +import ( + "github.com/smartcontractkit/libocr/offchainreporting2/types" +) + +var _ PredecessorRetirementReportCache = &predecessorRetirementReportCache{} + +type predecessorRetirementReportCache struct{} + +// TODO: This ought to be DB-persisted +// https://smartcontract-it.atlassian.net/browse/MERC-3386 +func NewPredecessorRetirementReportCache() PredecessorRetirementReportCache { + return newPredecessorRetirementReportCache() +} + +func newPredecessorRetirementReportCache() *predecessorRetirementReportCache { + return &predecessorRetirementReportCache{} +} + +func (c *predecessorRetirementReportCache) AttestedRetirementReport(predecessorConfigDigest types.ConfigDigest) ([]byte, error) { + panic("TODO") +} + +func (c *predecessorRetirementReportCache) CheckAttestedRetirementReport(predecessorConfigDigest types.ConfigDigest, attestedRetirementReport []byte) (RetirementReport, error) { + panic("TODO") +} diff --git a/llo/predecessor_retirement_report_cache_test.go b/llo/predecessor_retirement_report_cache_test.go new file mode 100644 index 0000000..95d25cf --- /dev/null +++ b/llo/predecessor_retirement_report_cache_test.go @@ -0,0 +1,7 @@ +package llo + +import "testing" + +func Test_PredecessorRetirementReportCache(t *testing.T) { + t.Fatal("TODO") +} diff --git a/llo/should_retire_cache.go b/llo/should_retire_cache.go new file mode 100644 index 0000000..76ccd67 --- /dev/null +++ b/llo/should_retire_cache.go @@ -0,0 +1,19 @@ +package llo + +var _ ShouldRetireCache = &shouldRetireCache{} + +type shouldRetireCache struct{} + +// TODO: https://smartcontract-it.atlassian.net/browse/MERC-3386 +func NewShouldRetireCache() ShouldRetireCache { + return newShouldRetireCache() +} + +func newShouldRetireCache() *shouldRetireCache { + return &shouldRetireCache{} +} + +func (c *shouldRetireCache) ShouldRetire() (bool, error) { + // TODO + return false, nil +} diff --git a/llo/should_retire_cache_test.go b/llo/should_retire_cache_test.go new file mode 100644 index 0000000..fa4c882 --- /dev/null +++ b/llo/should_retire_cache_test.go @@ -0,0 +1,7 @@ +package llo + +import "testing" + +func Test_ShouldRetireCache(t *testing.T) { + t.Fatal("TODO") +} diff --git a/llo/types.go b/llo/types.go new file mode 100644 index 0000000..a90bd8e --- /dev/null +++ b/llo/types.go @@ -0,0 +1,31 @@ +package llo + +import ( + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" +) + +type ChannelDefinitionWithID struct { + commontypes.ChannelDefinition + ChannelID commontypes.ChannelID +} + +type ChannelHash [32]byte + +type OnchainConfigCodec interface { + Encode(OnchainConfig) ([]byte, error) + Decode([]byte) (OnchainConfig, error) +} + +type Transmitter interface { + // NOTE: Mercury doesn't actually transmit on-chain, so there is no + // "contract" involved with the transmitter. + // - Transmit should be implemented and send to Mercury server + // - FromAccount() should return CSA public key + ocr3types.ContractTransmitter[commontypes.LLOReportInfo] +} + +type ObsResult[T any] struct { + Val T + Valid bool +} From d5e384884eed6f2f3be5d83bcbb2ac3ea7e406ec Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 7 Feb 2024 10:15:03 -0500 Subject: [PATCH 2/2] Add some comments/clarity --- llo/plugin.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/llo/plugin.go b/llo/plugin.go index 0b66250..62cdecb 100644 --- a/llo/plugin.go +++ b/llo/plugin.go @@ -522,9 +522,12 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID { // *not* strictly) across the lifetime of a protocol instance and that // outctx.previousOutcome contains the consensus outcome with sequence // number (outctx.SeqNr-1). +// +// libocr guarantees that this will always be called with at least 2f+1 +// AttributedObservations func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) { - if len(aos) == 0 { - return nil, errors.New("no attributed observations") + if len(aos) < 2*p.F+1 { + return nil, fmt.Errorf("invariant violation: expected at least 2f+1 attributed observations, got %d (f: %d)", len(aos), p.F) } if outctx.SeqNr <= 1 { @@ -748,6 +751,11 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, for streamID, observations := range streamObservations { sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 }) if len(observations) <= p.F { + // In the worst case, we have 2f+1 observations, of which up to f + // are allowed to be unparseable/missing. If we have less than f+1 + // usable observations, we cannot securely generate a median at + // all. + p.Logger.Debugw("Not enough observations to calculate median, expected at least f+1", "f", p.F, "streamID", streamID, "observations", observations) continue } // We use a "rank-k" median here, instead one could average in case of