Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Remove references to SignedObservation #4244

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ const (
// gossipVaaSendBufferSize configures the size of the gossip network send buffer
gossipVaaSendBufferSize = 5000

// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these. With 19 guardians, we would expect 19 messages
// per second during normal operations. However, since some messages get published immediately, we need to allow extra room.
Expand Down Expand Up @@ -84,8 +80,6 @@ type G struct {
gossipControlSendC chan []byte
gossipAttestationSendC chan []byte
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
Expand Down Expand Up @@ -127,7 +121,6 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize)
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
Expand Down
2 changes: 0 additions & 2 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func GuardianOptionP2P(
p2p.WithGuardianOptions(
nodeName,
g.guardianSigner,
g.obsvC,
g.batchObsvC.writeC,
signedInC,
g.obsvReqC.writeC,
Expand Down Expand Up @@ -593,7 +592,6 @@ func GuardianOptionProcessor(networkId string) *GuardianOption {
g.setC.readC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
Expand Down
15 changes: 2 additions & 13 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil || params.batchObsvRecvC != nil {
if params.gossipAttestationSendC != nil || params.batchObsvRecvC != nil {
attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
attestationPubsubTopic, err = ps.Join(attestationTopic)
Expand All @@ -407,7 +407,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvRecvC != nil || params.batchObsvRecvC != nil {
if params.batchObsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down Expand Up @@ -883,17 +883,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedObservation:
if params.obsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
Expand Down
13 changes: 0 additions & 13 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ type (
gst *common.GuardianSetState
rootCtxCancel context.CancelFunc

// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvRecvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

Expand Down Expand Up @@ -117,14 +114,6 @@ func WithProcessorFeaturesFunc(processorFeaturesFunc func() string) RunOpt {
}
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvRecvC = obsvRecvC
return nil
}
}

// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
Expand Down Expand Up @@ -177,7 +166,6 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
func WithGuardianOptions(
nodeName string,
guardianSigner guardiansigner.GuardianSigner,
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqRecvC chan<- *gossipv1.ObservationRequest,
Expand All @@ -201,7 +189,6 @@ func WithGuardianOptions(
return func(p *RunParams) error {
p.nodeName = nodeName
p.guardianSigner = guardianSigner
p.obsvRecvC = obsvRecvC
p.batchObsvRecvC = batchObsvRecvC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
p.obsvReqRecvC = obsvReqRecvC
Expand Down
3 changes: 0 additions & 3 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, guardianSigner)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
Expand Down Expand Up @@ -170,7 +169,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
WithGuardianOptions(
nodeName,
guardianSigner,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
Expand All @@ -195,7 +193,6 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, params)
assert.Equal(t, nodeName, params.nodeName)
assert.Equal(t, obsvC, params.obsvRecvC)
assert.Equal(t, signedInC, params.signedIncomingVaaRecvC)
assert.Equal(t, obsvReqC, params.obsvReqRecvC)
assert.Equal(t, gossipControlSendC, params.gossipControlSendC)
Expand Down
4 changes: 0 additions & 4 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const LOCAL_P2P_PORTRANGE_START = 11000

type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
Expand Down Expand Up @@ -66,7 +65,6 @@ func NewG(t *testing.T, nodeName string) *G {
_, rootCtxCancel := context.WithCancel(context.Background())

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
Expand All @@ -91,7 +89,6 @@ func NewG(t *testing.T, nodeName string) *G {
name := g.nodeName
t.Logf("[%s] consuming\n", name)
select {
case <-g.obsvC:
case <-g.obsvReqC:
case <-g.signedInC:
case <-g.signedGovCfg:
Expand Down Expand Up @@ -182,7 +179,6 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
WithGuardianOptions(
g.nodeName,
g.guardianSigner,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
Expand Down
14 changes: 1 addition & 13 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,6 @@ func (p *Processor) handleBatchObservation(m *node_common.MsgWithTimeStamp[gossi
batchObservationTotalDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
}

// handleObservation processes a remote VAA observation.
func (p *Processor) handleObservation(m *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) {
obs := gossipv1.Observation{
Hash: m.Msg.Hash,
Signature: m.Msg.Signature,
TxHash: m.Msg.TxHash,
MessageId: m.Msg.MessageId,
}
p.handleSingleObservation(m.Msg.Addr, &obs)
observationTotalDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
}

// handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, and assembles and submits a valid VAA if possible.
func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation) {
// SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
Expand Down Expand Up @@ -197,7 +185,7 @@ func (p *Processor) handleSingleObservation(addr []byte, m *gossipv1.Observation
return
}

// Hooray! Now, we have verified all fields on SignedObservation and know that it includes
// Hooray! Now, we have verified all fields on the observation and know that it includes
// a valid signature by an active guardian. We still don't fully trust them, as they may be
// byzantine, but now we know who we're dealing with.

Expand Down
22 changes: 0 additions & 22 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ type Processor struct {
// gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p
gossipVaaSendC chan<- []byte

// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvC is a channel of inbound decoded batches of observations from p2p
batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

Expand Down Expand Up @@ -166,20 +163,6 @@ type updateVaaEntry struct {
}

var (
observationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_channel_delay_us",
Help: "Latency histogram for delay of signed observations in channel",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})

observationTotalDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_total_delay_us",
Help: "Latency histogram for total time to process signed observations",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})

batchObservationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_batch_observation_channel_delay_us",
Expand Down Expand Up @@ -225,7 +208,6 @@ func NewProcessor(
setC <-chan *common.GuardianSet,
gossipAttestationSendC chan<- []byte,
gossipVaaSendC chan<- []byte,
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvC <-chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
obsvReqSendC chan<- *gossipv1.ObservationRequest,
signedInC <-chan *gossipv1.SignedVAAWithQuorum,
Expand All @@ -243,7 +225,6 @@ func NewProcessor(
setC: setC,
gossipAttestationSendC: gossipAttestationSendC,
gossipVaaSendC: gossipVaaSendC,
obsvC: obsvC,
batchObsvC: batchObsvC,
obsvReqSendC: obsvReqSendC,
signedInC: signedInC,
Expand Down Expand Up @@ -319,9 +300,6 @@ func (p *Processor) Run(ctx context.Context) error {
return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
}
p.handleMessage(ctx, k)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(m)
case m := <-p.batchObsvC:
batchObservationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleBatchObservation(m)
Expand Down
Loading
Loading