Skip to content

Commit

Permalink
Put verbose logging behind a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Jul 1, 2024
1 parent 27a24e2 commit 39accb6
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 50 deletions.
151 changes: 102 additions & 49 deletions llo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,32 @@ import (

// TODO: Split out this file and write unit tests: https://smartcontract-it.atlassian.net/browse/MERC-3524

// Notes:
//
// This is a sketch, there are many improvements to be made for this to be
// production-grade, secure code.
//
// Are there possible issues with map serialization ordering? TODO: Check all
// places where we Encode/Decode to ensure we don't rely on ordering anywhere

// Additional limits so we can more effectively bound the size of observations
// TODO: Reconsider these values
// NOTE: These are hardcoded because these exact values are relied upon as a
// property of coming to consensus, it's too dangerous to make these
// configurable on a per-node basis. It may be possible to add them to the
// OffchainConfig if they need to be changed dynamically and in a
// backwards-compatible way.
const (
MaxObservationRemoveChannelIDsLength = 5
// Maximum amount of channels that can be added per round (if more than
// this needs to be added, it will be added in batches until everything is
// up-to-date)
MaxObservationRemoveChannelIDsLength = 5
// Maximum amount of channels that can be removed per round (if more than
// this needs to be removed, it will be removed in batches until everything
// is up-to-date)
MaxObservationUpdateChannelDefinitionsLength = 5
MaxObservationStreamValuesLength = 1_000
// Maximum number of streams that can be observed per round
// TODO: This needs to be implemented on the Observation side so we don't
// even generate an observation that fails this
MaxObservationStreamValuesLength = 10_000
// MaxOutcomeChannelDefinitionsLength is the maximum number of channels that
// can be supported
// TODO: This needs to be implemented on the Observation side so we don't
// even generate an observation that fails this
MaxOutcomeChannelDefinitionsLength = 10_000
)

// MaxOutcomeChannelDefinitionsLength is the maximum number of channels that
// can be supported
const MaxOutcomeChannelDefinitionsLength = 1_000

// Values for a set of streams, e.g. "eth-usd", "link-usd", "eur-chf" etc
// StreamIDs are uint32
// TODO: generalize from *big.Int to anything
Expand Down Expand Up @@ -193,13 +199,20 @@ func MakeChannelHash(cd ChannelDefinitionWithID) ChannelHash {
// A ReportingPlugin instance will only ever serve a single protocol instance.
var _ ocr3types.ReportingPluginFactory[llotypes.ReportInfo] = &PluginFactory{}

func NewPluginFactory(prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory {
func NewPluginFactory(cfg Config, prrc PredecessorRetirementReportCache, src ShouldRetireCache, cdc ChannelDefinitionCache, ds DataSource, lggr logger.Logger, codecs map[llotypes.ReportFormat]ReportCodec) *PluginFactory {
return &PluginFactory{
prrc, src, cdc, ds, lggr, codecs,
cfg, prrc, src, cdc, ds, lggr, codecs,
}
}

type Config struct {
// Enables additional logging that might be expensive, e.g. logging entire
// channel definitions on every round or other very large structs
VerboseLogging bool
}

type PluginFactory struct {
Config Config
PredecessorRetirementReportCache PredecessorRetirementReportCache
ShouldRetireCache ShouldRetireCache
ChannelDefinitionCache ChannelDefinitionCache
Expand All @@ -215,6 +228,7 @@ func (f *PluginFactory) NewReportingPlugin(cfg ocr3types.ReportingPluginConfig)
}

return &Plugin{
f.Config,
offchainCfg.PredecessorConfigDigest,
cfg.ConfigDigest,
f.PredecessorRetirementReportCache,
Expand Down Expand Up @@ -247,6 +261,7 @@ type ReportCodec interface {
}

type Plugin struct {
Config Config
PredecessorConfigDigest *types.ConfigDigest
ConfigDigest types.ConfigDigest
PredecessorRetirementReportCache PredecessorRetirementReportCache
Expand Down Expand Up @@ -412,7 +427,7 @@ func (p *Plugin) Observation(ctx context.Context, outctx ocr3types.OutcomeContex

var streamValues StreamValues
if len(previousOutcome.ChannelDefinitions) == 0 {
p.Logger.Warnw("ChannelDefinitions is empty, will not generate any observations", "seqNr", outctx.SeqNr)
p.Logger.Debugw("ChannelDefinitions is empty, will not generate any observations", "seqNr", outctx.SeqNr)
} else {
streams := map[llotypes.StreamID]struct{}{}
for _, channelDefinition := range previousOutcome.ChannelDefinitions {
Expand Down Expand Up @@ -528,23 +543,24 @@ func (out *Outcome) ObservationsTimestampSeconds() (uint32, error) {

// Indicates whether a report can be generated for the given channel.
// Returns nil if channel is reportable
func (out *Outcome) IsReportable(channelID llotypes.ChannelID) error {
// TODO: Test this function
func (out *Outcome) IsReportable(channelID llotypes.ChannelID) *ErrUnreportableChannel {
if out.LifeCycleStage == LifeCycleStageRetired {
return fmt.Errorf("IsReportable=false; retired channel with ID: %d", channelID)
return &ErrUnreportableChannel{nil, "IsReportable=false; retired channel", channelID}
}

observationsTimestampSeconds, err := out.ObservationsTimestampSeconds()
if err != nil {
return fmt.Errorf("IsReportable=false; invalid observations timestamp; %w", err)
return &ErrUnreportableChannel{err, "IsReportable=false; invalid observations timestamp", channelID}
}

channelDefinition, exists := out.ChannelDefinitions[channelID]
if !exists {
return fmt.Errorf("IsReportable=false; no channel definition with ID: %d", channelID)
return &ErrUnreportableChannel{nil, "IsReportable=false; no channel definition with this ID", channelID}
}

if _, err := chainselectors.ChainIdFromSelector(channelDefinition.ChainSelector); err != nil {
return fmt.Errorf("IsReportable=false; invalid chain selector; %w", err)
return &ErrUnreportableChannel{err, "IsReportable=false; invalid chain selector", channelID}
}

for _, streamID := range channelDefinition.StreamIDs {
Expand All @@ -556,42 +572,64 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID) error {
// from the PREVIOUS outcome. So if channel definitions have been
// added in this round, we would not expect there to be
// observations present for new streams in those channels.
return fmt.Errorf("IsReportable=false; median was nil for stream %d", streamID)
return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; median was nil for stream %d", streamID), channelID}
}
}

if _, ok := out.ValidAfterSeconds[channelID]; !ok {
// No validAfterSeconds entry yet, this must be a new channel.
// validAfterSeconds will be populated in Outcome() so the channel
// becomes reportable in later protocol rounds.
return errors.New("IsReportable=false; no validAfterSeconds entry yet, this must be a new channel")
// TODO: Test this case, haven't seen it in prod logs even though it would be expected
return &ErrUnreportableChannel{nil, "IsReportable=false; no validAfterSeconds entry yet, this must be a new channel", channelID}
}

if validAfterSeconds := out.ValidAfterSeconds[channelID]; validAfterSeconds >= observationsTimestampSeconds {
return fmt.Errorf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds)
return &ErrUnreportableChannel{nil, fmt.Sprintf("IsReportable=false; not valid yet (observationsTimestampSeconds=%d < validAfterSeconds=%d)", observationsTimestampSeconds, validAfterSeconds), channelID}
}

return nil
}

type ErrUnreportableChannel struct {
Inner error
Reason string
ChannelID llotypes.ChannelID
}

func (e *ErrUnreportableChannel) Error() string {
s := fmt.Sprintf("ChannelID: %d; Reason: %s", e.ChannelID, e.Reason)
if e.Inner != nil {
s += fmt.Sprintf("; Err: %v", e.Inner)
}
return s
}

func (e *ErrUnreportableChannel) String() string {
return e.Error()
}

func (e *ErrUnreportableChannel) Unwrap() error {
return e.Inner
}

// List of reportable channels (according to IsReportable), sorted according
// to a canonical ordering
func (out *Outcome) ReportableChannels(lggr logger.Logger, seqNr uint64) []llotypes.ChannelID {
result := []llotypes.ChannelID{}

// TODO: test this
func (out *Outcome) ReportableChannels() (reportable []llotypes.ChannelID, unreportable []*ErrUnreportableChannel) {
for channelID := range out.ChannelDefinitions {
if err := out.IsReportable(channelID); err != nil {
lggr.Debugw("Channel is not reportable", "channelID", channelID, "err", err, "seqNr", seqNr)
continue
unreportable = append(unreportable, err)
} else {
reportable = append(reportable, channelID)
}
result = append(result, channelID)
}

sort.Slice(result, func(i, j int) bool {
return result[i] < result[j]
sort.Slice(reportable, func(i, j int) bool {
return reportable[i] < reportable[j]
})

return result
return
}

// Generates an outcome for a seqNr, typically based on the previous
Expand Down Expand Up @@ -702,11 +740,13 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
invalidObservations = append(invalidObservations, id)
}
}
if len(invalidObservations) > 0 {
sort.Slice(invalidObservations, func(i, j int) bool { return invalidObservations[i] < invalidObservations[j] })
p.Logger.Debugw("Ignoring invalid observations", "streamIDs", invalidObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr)
if p.Config.VerboseLogging {
if len(invalidObservations) > 0 {
sort.Slice(invalidObservations, func(i, j int) bool { return invalidObservations[i] < invalidObservations[j] })
p.Logger.Debugw("Ignoring invalid observations", "streamIDs", invalidObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr)
}
p.Logger.Debugw("Using observations", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr)
}
p.Logger.Debugw("Using observations", "sv", streamObservations, "oracleID", ao.Observer, "seqNr", outctx.SeqNr)
}

if len(timestampsNanoseconds) == 0 {
Expand Down Expand Up @@ -778,21 +818,26 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
}
defWithID := hwid.ChannelDefinitionWithID
if original, exists := outcome.ChannelDefinitions[defWithID.ChannelID]; exists {
p.Logger.Debugw("Replacing channel",
p.Logger.Debugw("Adding channel (replacement)",
"channelID", defWithID.ChannelID,
"originalChannelDefinition", original,
"replacementChannelDefinition", defWithID,
"replaceChannelDefinition", defWithID,
"seqNr", outctx.SeqNr,
)
outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition
} else if len(outcome.ChannelDefinitions) >= MaxOutcomeChannelDefinitionsLength {
p.Logger.Warnw("Cannot add channel, outcome already contains maximum number of channels",
p.Logger.Warnw("Adding channel FAILED. Cannot add channel, outcome already contains maximum number of channels",
"maxOutcomeChannelDefinitionsLength", MaxOutcomeChannelDefinitionsLength,
"addChannelDefinition", defWithID,
"seqNr", outctx.SeqNr,
)
continue
}
p.Logger.Debugw("Adding channel (new)",
"channelID", defWithID.ChannelID,
"addChannelDefinition", defWithID,
"seqNr", outctx.SeqNr,
)
outcome.ChannelDefinitions[defWithID.ChannelID] = defWithID.ChannelDefinition
}

Expand All @@ -812,8 +857,9 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
outcome.ValidAfterSeconds = map[llotypes.ChannelID]uint32{}
for channelID, previousValidAfterSeconds := range previousOutcome.ValidAfterSeconds {
if err3 := previousOutcome.IsReportable(channelID); err3 != nil {
// TODO: change log level based on what type of error we got
p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "seqNr", outctx.SeqNr)
if p.Config.VerboseLogging {
p.Logger.Debugw("Channel is not reportable", "channelID", channelID, "err", err3, "seqNr", outctx.SeqNr)
}
// was reported based on previous outcome
outcome.ValidAfterSeconds[channelID] = previousObservationsTimestampSeconds
} else {
Expand Down Expand Up @@ -857,15 +903,19 @@ func (p *Plugin) Outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
// are allowed to be unparseable/missing. If we have less than f+1
// usable observations, we cannot securely generate a median at
// all.
p.Logger.Debugw("Not enough observations to calculate median, expected at least f+1", "f", p.F, "streamID", streamID, "observations", observations, "seqNr", outctx.SeqNr)
if p.Config.VerboseLogging {
p.Logger.Warnw("Not enough observations to calculate median, expected at least f+1", "f", p.F, "streamID", streamID, "observations", observations, "seqNr", outctx.SeqNr)
}
continue
}
// We use a "rank-k" median here, instead one could average in case of
// an even number of observations.
outcome.StreamMedians[streamID] = observations[len(observations)/2]
}

p.Logger.Debugw("Generated outcome", "outcome", outcome, "seqNr", outctx.SeqNr)
if p.Config.VerboseLogging {
p.Logger.Debugw("Generated outcome", "outcome", outcome, "seqNr", outctx.SeqNr)
}
return p.OutcomeCodec.Encode(outcome)
}

Expand Down Expand Up @@ -946,8 +996,11 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type
})
}

reportableChannels := outcome.ReportableChannels(p.Logger, seqNr)
p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "seqNr", seqNr)
reportableChannels, unreportableChannels := outcome.ReportableChannels()
if p.Config.VerboseLogging {
p.Logger.Debugw("Reportable channels", "reportableChannels", reportableChannels, "unreportableChannels", unreportableChannels, "seqNr", seqNr)
}

for _, channelID := range reportableChannels {
channelDefinition := outcome.ChannelDefinitions[channelID]
values := []*big.Int{}
Expand Down Expand Up @@ -979,7 +1032,7 @@ func (p *Plugin) Reports(seqNr uint64, rawOutcome ocr3types.Outcome) ([]ocr3type
})
}

if len(rwis) == 0 {
if p.Config.VerboseLogging && len(rwis) == 0 {
p.Logger.Debugw("No reports, will not transmit anything", "reportableChannels", reportableChannels, "seqNr", seqNr)
}

Expand Down
6 changes: 5 additions & 1 deletion llo/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func Test_Observation(t *testing.T) {
}

p := &Plugin{
Config: Config{true},
OutcomeCodec: protoOutcomeCodec{},
ShouldRetireCache: &mockShouldRetireCache{},
ChannelDefinitionCache: cdc,
Expand Down Expand Up @@ -394,7 +395,9 @@ func Test_Observation(t *testing.T) {
}

func Test_ValidateObservation(t *testing.T) {
p := &Plugin{}
p := &Plugin{
Config: Config{true},
}

t.Run("SeqNr < 1 is not valid", func(t *testing.T) {
err := p.ValidateObservation(ocr3types.OutcomeContext{}, types.Query{}, types.AttributedObservation{})
Expand All @@ -409,6 +412,7 @@ func Test_ValidateObservation(t *testing.T) {
func Test_Outcome(t *testing.T) {
// cdc := &mockChannelDefinitionCache{}
p := &Plugin{
Config: Config{true},
OutcomeCodec: protoOutcomeCodec{},
// ShouldRetireCache: &mockShouldRetireCache{},
Logger: logger.Test(t),
Expand Down

0 comments on commit 39accb6

Please sign in to comment.