Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Jan 24, 2024
1 parent dae4ce6 commit b6c9370
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 58 deletions.
2 changes: 1 addition & 1 deletion llo/json_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) {
ConfigDigest string
ChainSelector uint64
SeqNr uint64
ChannelID commontypes.ChannelID
ChannelID commontypes.ChannelID // TODO: consider encoding as hex
ValidAfterSeconds uint32
ValidUntilSeconds uint32
Values []*big.Int
Expand Down
62 changes: 5 additions & 57 deletions llo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const MAX_OUTCOME_CHANNEL_DEFINITIONS_LENGTH = 500
// Values for a set of streams, e.g. "eth-usd", "link-usd", and "eur-chf"
// TODO: generalize from *big.Int to anything
// TODO: Consider renaming to StreamDataPoints?
// FIXME: error vs. valid
type StreamValues map[commontypes.StreamID]ObsResult[*big.Int]

type DataSource interface {
Expand Down Expand Up @@ -215,6 +214,7 @@ var _ ocr3types.ReportingPlugin[commontypes.LLOReportInfo] = &LLOPlugin{}
type ReportCodec interface {
Encode(Report) ([]byte, error)
Decode([]byte) (Report, error)
// TODO: max length check?
}

type LLOPlugin struct {
Expand Down Expand Up @@ -314,7 +314,6 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
var addChannelDefinitions commontypes.ChannelDefinitions
{
expectedChannelDefs := p.ChannelDefinitionCache.Definitions()
fmt.Println("TRASH Observation definitions", expectedChannelDefs)

removeChannelDefinitions := subtractChannelDefinitions(previousOutcome.ChannelDefinitions, expectedChannelDefs, MAX_OBSERVATION_REMOVE_CHANNEL_IDS_LENGTH)
for channelID := range removeChannelDefinitions {
Expand All @@ -331,17 +330,11 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
}

addChannelDefinitions = subtractChannelDefinitions(expectedChannelDefs, previousOutcome.ChannelDefinitions, MAX_OBSERVATION_ADD_CHANNEL_DEFINITIONS_LENGTH)
if len(addChannelDefinitions) > 0 {
fmt.Println("TRASH Observation addChannelDefinitions", len(addChannelDefinitions))
}
}

fmt.Println("TRASH Observation previousOutcome.ChannelDefinitions", len(previousOutcome.ChannelDefinitions))

var streamValues StreamValues
{
streams := map[commontypes.StreamID]struct{}{}
fmt.Println("TRASH len(previousOutcome.ChannelDefinitions)", len(previousOutcome.ChannelDefinitions))
for _, channelDefinition := range previousOutcome.ChannelDefinitions {
for _, streamID := range channelDefinition.StreamIDs {
streams[streamID] = struct{}{}
Expand All @@ -356,8 +349,6 @@ func (p *LLOPlugin) Observation(ctx context.Context, outctx ocr3types.OutcomeCon
}
}

fmt.Println("TRASH Observation streamValues", streamValues)

var rawObservation []byte
{
var err error
Expand Down Expand Up @@ -418,9 +409,8 @@ func (p *LLOPlugin) ValidateObservation(outctx ocr3types.OutcomeContext, query t
}

for streamID, obsResult := range observation.StreamValues {
if obsResult.Val == nil {
// FIXME: shouldn't be possible if its valid
return fmt.Errorf("stream with id %q carries nil value", streamID)
if obsResult.Valid && obsResult.Val == nil {
return fmt.Errorf("stream with id %q was marked valid but carries nil value", streamID)
}
}

Expand Down Expand Up @@ -502,7 +492,6 @@ func (out *Outcome) ReportableChannels() []commontypes.ChannelID {

for channelID := range out.ChannelDefinitions {
if err := out.IsReportable(channelID); err != nil {
fmt.Println("TRASH not reportable", err)
continue
}
result = append(result, channelID)
Expand Down Expand Up @@ -611,13 +600,11 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
addChannelDefinitionsByHash[channelHash] = defWithID
}

fmt.Printf("TRASH outcome observation.StreamValues %#v\n", observation.StreamValues)
for id, obsResult := range observation.StreamValues {
if obsResult.Valid {
streamObservations[id] = append(streamObservations[id], obsResult.Val)
} else {
// TODO: log something?
fmt.Println("TRASH invalid observation")
}
}
}
Expand Down Expand Up @@ -674,9 +661,7 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
}

for channelHash, defWithID := range addChannelDefinitionsByHash {
fmt.Println("TRASH addChannelDefinitionsByHash")
voteCount := addChannelVotesByHash[channelHash]
fmt.Println("TRASH voteCount", voteCount)
if voteCount <= p.F {
continue
}
Expand All @@ -696,7 +681,6 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
}
outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition
}
fmt.Printf("TRASH outcome.ChannelDefinitions #%v\n", outcome.ChannelDefinitions)

/////////////////////////////////
// outcome.ValidAfterSeconds
Expand All @@ -714,11 +698,12 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
outcome.ValidAfterSeconds = map[commontypes.ChannelID]uint32{}
for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds {
if err := previousOutcome.IsReportable(channelID); err != nil {
p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err)
// was reported based on previous outcome
outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds
} else {
p.Logger.Debugw("Channel is reportable", "channelID", channelID)
// TODO: change log level based on what type of error we got
p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err)
// was skipped based on previous outcome
outcome.ValidAfterSeconds[channelID] = previousValidAfterSeconds
}
Expand Down Expand Up @@ -752,9 +737,6 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
// outcome.StreamMedians
/////////////////////////////////
outcome.StreamMedians = map[commontypes.StreamID]*big.Int{}
if len(streamObservations) == 0 {
fmt.Println("TRASH no streamObservations")
}
for streamID, observations := range streamObservations {
sort.Slice(observations, func(i, j int) bool { return observations[i].Cmp(observations[j]) < 0 })
if len(observations) <= p.F {
Expand All @@ -765,13 +747,6 @@ func (p *LLOPlugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query,
outcome.StreamMedians[streamID] = observations[len(observations)/2]
}

if len(outcome.StreamMedians) == 0 {
fmt.Println("TRASH no medians")
}

fmt.Printf("TRASH streamObservations %#v\n", streamObservations)
fmt.Printf("TRASH outcome %#v\n", outcome)

return json.Marshal(outcome)
}

Expand All @@ -795,30 +770,6 @@ type Report struct {
Specimen bool
}

// func getEVMReportTypes() abi.Arguments {
// mustNewType := func(t string) abi.Type {
// result, err := abi.NewType(t, "", []abi.ArgumentMarshaling{})
// if err != nil {
// panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err))
// }
// return result
// }
// return abi.Arguments([]abi.Argument{
// {Name: "configDigest", Type: mustNewType("bytes32")},
// {Name: "chainId", Type: mustNewType("uint64")},
// // could also include address of verifier to make things more specific.
// // downside is increased data size.
// // for now we assume that a channelId will only be registered on a single
// // verifier per chain.
// {Name: "seqNr", Type: mustNewType("uint64")},
// {Name: "channelId", Type: mustNewType("bytes32")},
// {Name: "validAfterSeconds", Type: mustNewType("uint32")},
// {Name: "validUntilSeconds", Type: mustNewType("uint32")},
// {Name: "values", Type: mustNewType("int192[]")},
// {Name: "specimen", Type: mustNewType("bool")},
// })
// }

func (p *LLOPlugin) encodeReport(r Report, format commontypes.LLOReportFormat) (types.Report, error) {
codec, exists := p.Codecs[format]
if !exists {
Expand Down Expand Up @@ -875,8 +826,6 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t
})
}

fmt.Printf("TRASH Reports outcome %#v\n", outcome)
fmt.Printf("TRASH Reports outcome.ReportableChannels() %#v\n", outcome.ReportableChannels())
for _, channelID := range outcome.ReportableChannels() {
channelDefinition := outcome.ChannelDefinitions[channelID]
values := []*big.Int{}
Expand Down Expand Up @@ -910,7 +859,6 @@ func (p *LLOPlugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3t

if len(rwis) == 0 {
// TODO: log something here? It's not gonna do anything
fmt.Println("empty reports")
}

return rwis, nil
Expand Down

0 comments on commit b6c9370

Please sign in to comment.