From 3181077093876d8d7370400222388cdebd27c429 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 21 Jan 2025 17:53:26 -0300 Subject: [PATCH] avoid passing logger code-wide --- cli/bootnode/boot_node.go | 14 ++--- cli/operator/node.go | 16 ++--- exporter/api/broadcaster.go | 14 +++-- exporter/api/broadcaster_test.go | 4 +- exporter/api/interfaces.go | 4 +- exporter/api/query_handlers.go | 35 +++++++---- exporter/api/query_handlers_test.go | 24 +++++--- exporter/api/server.go | 53 ++++++++-------- exporter/api/server_test.go | 8 +-- ibft/storage/store.go | 33 +++++----- ibft/storage/store_test.go | 11 ++-- ibft/storage/stores.go | 6 +- monitoring/metrics/handler.go | 8 ++- network/discovery/dv5_filters.go | 15 ++--- network/discovery/dv5_routing.go | 6 +- network/discovery/dv5_service.go | 31 +++++----- network/discovery/local_service.go | 10 +-- network/discovery/service.go | 8 +-- network/discovery/service_test.go | 21 +++---- network/discovery/util_test.go | 2 +- network/network.go | 10 ++- network/p2p/p2p.go | 51 ++++++++-------- network/p2p/p2p_pubsub.go | 26 ++++---- network/p2p/p2p_reporter.go | 6 +- network/p2p/p2p_setup.go | 39 ++++++------ network/peers/conn_manager.go | 30 ++++----- network/peers/conn_manager_test.go | 4 +- network/peers/connections/conn_gater.go | 6 +- network/peers/connections/conn_handler.go | 17 +++--- network/peers/connections/filters.go | 2 +- network/peers/connections/handshaker.go | 12 ++-- .../connections/mock/mock_connection_index.go | 3 +- network/peers/index.go | 3 +- network/peers/peers_index.go | 6 +- network/topics/controller.go | 44 ++++++------- network/topics/controller_test.go | 16 ++--- operator/duties/attester_test.go | 26 ++++---- operator/duties/committee.go | 3 +- operator/duties/committee_test.go | 16 ++--- operator/duties/proposer_test.go | 8 +-- operator/duties/scheduler.go | 34 ++++++----- operator/duties/scheduler_mock.go | 14 +++-- operator/duties/scheduler_test.go | 12 ++-- operator/duties/sync_committee_test.go | 10 +-- operator/fee_recipient/controller.go | 30 ++++----- operator/fee_recipient/controller_test.go | 13 ++-- operator/fee_recipient/mocks/controller.go | 10 +-- operator/node.go | 57 +++++++++-------- operator/validator/controller.go | 7 ++- operator/validator/controller_test.go | 18 ++---- operator/validator/mocks/controller.go | 61 ++++++++----------- protocol/v2/p2p/network.go | 5 +- protocol/v2/qbft/storage/participant_store.go | 5 +- protocol/v2/qbft/testing/storage.go | 5 +- utils/boot_node/node.go | 41 +++++++------ utils/keys.go | 6 +- 56 files changed, 502 insertions(+), 477 deletions(-) diff --git a/cli/bootnode/boot_node.go b/cli/bootnode/boot_node.go index 5bcc72dbb5..f081700bdb 100644 --- a/cli/bootnode/boot_node.go +++ b/cli/bootnode/boot_node.go @@ -4,17 +4,15 @@ import ( "fmt" "log" - "github.com/ssvlabs/ssv/networkconfig" - "github.com/ssvlabs/ssv/utils/commons" - - "github.com/ssvlabs/ssv/logging" - "github.com/ilyakaznacheev/cleanenv" "github.com/spf13/cobra" "go.uber.org/zap" global_config "github.com/ssvlabs/ssv/cli/config" + "github.com/ssvlabs/ssv/logging" + "github.com/ssvlabs/ssv/networkconfig" bootnode "github.com/ssvlabs/ssv/utils/boot_node" + "github.com/ssvlabs/ssv/utils/commons" ) type config struct { @@ -60,11 +58,13 @@ var StartBootNodeCmd = &cobra.Command{ if err != nil { logger.Fatal("failed to get network config", zap.Error(err)) } - bootNode, err := bootnode.New(networkConfig, cfg.Options) + + bootNode, err := bootnode.New(logger, networkConfig, cfg.Options) if err != nil { logger.Fatal("failed to set up boot node", zap.Error(err)) } - if err := bootNode.Start(cmd.Context(), logger); err != nil { + + if err := bootNode.Start(cmd.Context()); err != nil { logger.Fatal("failed to start boot node", zap.Error(err)) } }, diff --git a/cli/operator/node.go b/cli/operator/node.go index 2978741c2c..bf695d281f 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -19,9 +19,9 @@ import ( "github.com/ilyakaznacheev/cleanenv" "github.com/pkg/errors" "github.com/spf13/cobra" + spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/api/handlers" apiserver "github.com/ssvlabs/ssv/api/server" "github.com/ssvlabs/ssv/beacon/goclient" @@ -265,7 +265,7 @@ var StartNodeCmd = &cobra.Command{ cfg.SSVOptions.ValidatorOptions.RecipientsStorage = nodeStorage if cfg.WsAPIPort != 0 { - ws := exporterapi.NewWsServer(cmd.Context(), nil, http.NewServeMux(), cfg.WithPing) + ws := exporterapi.NewWsServer(cmd.Context(), zap.NewNop(), nil, http.NewServeMux(), cfg.WithPing) cfg.SSVOptions.WS = ws cfg.SSVOptions.WsAPIPort = cfg.WsAPIPort cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(networkConfig, logger, ws) @@ -286,7 +286,7 @@ var StartNodeCmd = &cobra.Command{ storageMap := ibftstorage.NewStores() for _, storageRole := range storageRoles { - s := ibftstorage.New(cfg.SSVOptions.ValidatorOptions.DB, storageRole) + s := ibftstorage.New(logger, cfg.SSVOptions.ValidatorOptions.DB, storageRole) storageMap.Add(storageRole, s) } @@ -428,7 +428,7 @@ var StartNodeCmd = &cobra.Command{ } }() } - if err := operatorNode.Start(logger); err != nil { + if err := operatorNode.Start(); err != nil { logger.Fatal("failed to start SSV node", zap.Error(err)) } }, @@ -800,9 +800,9 @@ func syncContractEvents( func startMetricsHandler(logger *zap.Logger, db basedb.Database, port int, enableProf bool, opNode *operator.Node) { logger = logger.Named(logging.NameMetricsHandler) // init and start HTTP handler - metricsHandler := metrics.NewHandler(db, enableProf, opNode) + metricsHandler := metrics.NewHandler(logger, db, enableProf, opNode) addr := fmt.Sprintf(":%d", port) - if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil { + if err := metricsHandler.Start(http.NewServeMux(), addr); err != nil { logger.Panic("failed to serve metrics", zap.Error(err)) } } @@ -817,7 +817,7 @@ func initSlotPruning(ctx context.Context, logger *zap.Logger, stores *ibftstorag wg.Add(1) go func() { defer wg.Done() - store.Prune(ctx, logger, threshold) + store.Prune(ctx, threshold) }() return nil }) @@ -826,7 +826,7 @@ func initSlotPruning(ctx context.Context, logger *zap.Logger, stores *ibftstorag // start background job for removing old slots on every tick _ = stores.Each(func(_ spectypes.BeaconRole, store qbftstorage.ParticipantStore) error { - go store.PruneContinously(ctx, logger, slotTickerProvider, phase0.Slot(retain)) + go store.PruneContinously(ctx, slotTickerProvider, phase0.Slot(retain)) return nil }) } diff --git a/exporter/api/broadcaster.go b/exporter/api/broadcaster.go index 329ab4018d..f089661fc9 100644 --- a/exporter/api/broadcaster.go +++ b/exporter/api/broadcaster.go @@ -11,7 +11,7 @@ import ( // Broadcaster is an interface broadcasting stream message across all available connections type Broadcaster interface { - FromFeed(logger *zap.Logger, feed *event.Feed) error + FromFeed(feed *event.Feed) error Broadcast(msg Message) error Register(conn broadcasted) bool Deregister(conn broadcasted) bool @@ -23,34 +23,36 @@ type broadcasted interface { } type broadcaster struct { + logger *zap.Logger mut sync.Mutex connections map[string]broadcasted } -func newBroadcaster() Broadcaster { +func newBroadcaster(logger *zap.Logger) Broadcaster { return &broadcaster{ + logger: logger, mut: sync.Mutex{}, connections: map[string]broadcasted{}, } } // FromFeed subscribes to the given feed and broadcasts incoming messages -func (b *broadcaster) FromFeed(logger *zap.Logger, msgFeed *event.Feed) error { +func (b *broadcaster) FromFeed(msgFeed *event.Feed) error { cn := make(chan Message, 512) sub := msgFeed.Subscribe(cn) defer sub.Unsubscribe() - defer logger.Debug("done reading from feed") + defer b.logger.Debug("done reading from feed") for { select { case msg := <-cn: go func(msg Message) { if err := b.Broadcast(msg); err != nil { - logger.Error("could not broadcast message", zap.Error(err)) + b.logger.Error("could not broadcast message", zap.Error(err)) } }(msg) case err := <-sub.Err(): - logger.Warn("could not read messages from msgFeed", zap.Error(err)) + b.logger.Warn("could not read messages from msgFeed", zap.Error(err)) return err } } diff --git a/exporter/api/broadcaster_test.go b/exporter/api/broadcaster_test.go index a2cbac0a0a..f61a97f468 100644 --- a/exporter/api/broadcaster_test.go +++ b/exporter/api/broadcaster_test.go @@ -22,11 +22,11 @@ func TestConn_Send_FullQueue(t *testing.T) { func TestBroadcaster(t *testing.T) { logger := zaptest.NewLogger(t) - b := newBroadcaster() + b := newBroadcaster(logger) feed := new(event.Feed) go func() { - require.NoError(t, b.FromFeed(logger, feed)) + require.NoError(t, b.FromFeed(feed)) }() bm1 := newBroadcastedMock("1") bm2 := newBroadcastedMock("2") diff --git a/exporter/api/interfaces.go b/exporter/api/interfaces.go index ca2a89a397..689ee70440 100644 --- a/exporter/api/interfaces.go +++ b/exporter/api/interfaces.go @@ -4,8 +4,6 @@ import ( "fmt" "net" "time" - - "go.uber.org/zap" ) // Connection is an interface to abstract the actual websocket connection implementation @@ -22,7 +20,7 @@ type NetworkMessage struct { } // QueryMessageHandler handles the given message -type QueryMessageHandler func(logger *zap.Logger, nm *NetworkMessage) +type QueryMessageHandler func(nm *NetworkMessage) // ConnectionID calculates the id of the given Connection func ConnectionID(conn Connection) string { diff --git a/exporter/api/query_handlers.go b/exporter/api/query_handlers.go index 9b09dda0ac..3864e3c18d 100644 --- a/exporter/api/query_handlers.go +++ b/exporter/api/query_handlers.go @@ -6,9 +6,10 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" spectypes "github.com/ssvlabs/ssv-spec/types" - qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" "go.uber.org/zap" + qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" + "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/logging/fields" "github.com/ssvlabs/ssv/protocol/v2/message" @@ -18,9 +19,19 @@ const ( unknownError = "unknown error" ) +type Handler struct { + logger *zap.Logger +} + +func NewHandler(logger *zap.Logger) *Handler { + return &Handler{ + logger: logger, + } +} + // HandleErrorQuery handles TypeError queries. -func HandleErrorQuery(logger *zap.Logger, nm *NetworkMessage) { - logger.Warn("handles error message") +func (h *Handler) HandleErrorQuery(nm *NetworkMessage) { + h.logger.Warn("handles error message") if _, ok := nm.Msg.Data.([]string); !ok { nm.Msg.Data = []string{} } @@ -38,8 +49,8 @@ func HandleErrorQuery(logger *zap.Logger, nm *NetworkMessage) { } // HandleUnknownQuery handles unknown queries. -func HandleUnknownQuery(logger *zap.Logger, nm *NetworkMessage) { - logger.Warn("unknown message type", zap.String("messageType", string(nm.Msg.Type))) +func (h *Handler) HandleUnknownQuery(nm *NetworkMessage) { + h.logger.Warn("unknown message type", zap.String("messageType", string(nm.Msg.Type))) nm.Msg = Message{ Type: TypeError, Data: []string{fmt.Sprintf("bad request - unknown message type '%s'", nm.Msg.Type)}, @@ -47,8 +58,8 @@ func HandleUnknownQuery(logger *zap.Logger, nm *NetworkMessage) { } // HandleParticipantsQuery handles TypeParticipants queries. -func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStores, nm *NetworkMessage, domain spectypes.DomainType) { - logger.Debug("handles query request", +func (h *Handler) HandleParticipantsQuery(store *storage.ParticipantStores, nm *NetworkMessage, domain spectypes.DomainType) { + h.logger.Debug("handles query request", zap.Uint64("from", nm.Msg.Filter.From), zap.Uint64("to", nm.Msg.Filter.To), zap.String("publicKey", nm.Msg.Filter.PublicKey), @@ -59,13 +70,13 @@ func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStore } pkRaw, err := hex.DecodeString(nm.Msg.Filter.PublicKey) if err != nil { - logger.Warn("failed to decode validator public key", zap.Error(err)) + h.logger.Warn("failed to decode validator public key", zap.Error(err)) res.Data = []string{"internal error - could not read validator key"} nm.Msg = res return } if len(pkRaw) != pubKeySize { - logger.Warn("bad size for the provided public key", zap.Int("length", len(pkRaw))) + h.logger.Warn("bad size for the provided public key", zap.Int("length", len(pkRaw))) res.Data = []string{"bad size for the provided public key"} nm.Msg = res return @@ -73,14 +84,14 @@ func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStore role, err := message.BeaconRoleFromString(nm.Msg.Filter.Role) if err != nil { - logger.Warn("failed to parse role", zap.Error(err)) + h.logger.Warn("failed to parse role", zap.Error(err)) res.Data = []string{"role doesn't exist"} nm.Msg = res return } roleStorage := store.Get(role) if roleStorage == nil { - logger.Warn("role storage doesn't exist", fields.ExporterRole(role)) + h.logger.Warn("role storage doesn't exist", fields.ExporterRole(role)) res.Data = []string{"internal error - role storage doesn't exist", role.String()} nm.Msg = res return @@ -90,7 +101,7 @@ func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStore to := phase0.Slot(nm.Msg.Filter.To) participantsList, err := roleStorage.GetParticipantsInRange(spectypes.ValidatorPK(pkRaw), from, to) if err != nil { - logger.Warn("failed to get participants", zap.Error(err)) + h.logger.Warn("failed to get participants", zap.Error(err)) res.Data = []string{"internal error - could not get participants messages"} } else { participations := toParticipations(role, spectypes.ValidatorPK(pkRaw), participantsList) diff --git a/exporter/api/query_handlers_test.go b/exporter/api/query_handlers_test.go index c7466e8f50..fb6f08fb11 100644 --- a/exporter/api/query_handlers_test.go +++ b/exporter/api/query_handlers_test.go @@ -13,6 +13,7 @@ import ( specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" + qbftstorage "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/networkconfig" @@ -35,7 +36,8 @@ func TestHandleUnknownQuery(t *testing.T) { Conn: nil, } - HandleUnknownQuery(logger, &nm) + h := NewHandler(logger) + h.HandleUnknownQuery(&nm) errs, ok := nm.Msg.Data.([]string) require.True(t, ok) require.Equal(t, "bad request - unknown message type 'unknown_type'", errs[0]) @@ -72,7 +74,8 @@ func TestHandleErrorQuery(t *testing.T) { Err: test.netErr, Conn: nil, } - HandleErrorQuery(logger, &nm) + h := NewHandler(logger) + h.HandleErrorQuery(&nm) errs, ok := nm.Msg.Data.([]string) require.True(t, ok) require.Equal(t, test.expectedErr, errs[0]) @@ -129,7 +132,8 @@ func TestHandleDecidedQuery(t *testing.T) { t.Run("valid range", func(t *testing.T) { nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleAttester, 0, 250) - HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType) + h := NewHandler(l) + h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType) require.NotNil(t, nm.Msg.Data) msgs, ok := nm.Msg.Data.([]*ParticipantsAPI) @@ -139,7 +143,8 @@ func TestHandleDecidedQuery(t *testing.T) { t.Run("invalid range", func(t *testing.T) { nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleAttester, 400, 404) - HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType) + h := NewHandler(l) + h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType) require.NotNil(t, nm.Msg.Data) data, ok := nm.Msg.Data.([]string) require.True(t, ok) @@ -148,7 +153,8 @@ func TestHandleDecidedQuery(t *testing.T) { t.Run("non-existing validator", func(t *testing.T) { nm := newParticipantsAPIMsg("xxx", spectypes.BNRoleAttester, 400, 404) - HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType) + h := NewHandler(l) + h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType) require.NotNil(t, nm.Msg.Data) errs, ok := nm.Msg.Data.([]string) require.True(t, ok) @@ -157,7 +163,8 @@ func TestHandleDecidedQuery(t *testing.T) { t.Run("non-existing role", func(t *testing.T) { nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), math.MaxUint64, 0, 250) - HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType) + h := NewHandler(l) + h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType) require.NotNil(t, nm.Msg.Data) errs, ok := nm.Msg.Data.([]string) require.True(t, ok) @@ -166,7 +173,8 @@ func TestHandleDecidedQuery(t *testing.T) { t.Run("non-existing storage", func(t *testing.T) { nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleSyncCommitteeContribution, 0, 250) - HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType) + h := NewHandler(l) + h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType) require.NotNil(t, nm.Msg.Data) errs, ok := nm.Msg.Data.([]string) require.True(t, ok) @@ -209,7 +217,7 @@ func newStorageForTest(db basedb.Database, logger *zap.Logger, roles ...spectype storageMap := qbftstorage.NewStores() for _, role := range roles { - storageMap.Add(role, qbftstorage.New(db, role)) + storageMap.Add(role, qbftstorage.New(logger, db, role)) } return sExporter, storageMap diff --git a/exporter/api/server.go b/exporter/api/server.go index 2e34f2348c..95b083ed0e 100644 --- a/exporter/api/server.go +++ b/exporter/api/server.go @@ -21,14 +21,15 @@ const ( // WebSocketServer is responsible for managing all type WebSocketServer interface { - Start(logger *zap.Logger, addr string) error + Start(addr string) error BroadcastFeed() *event.Feed UseQueryHandler(handler QueryMessageHandler) } // wsServer is an implementation of WebSocketServer type wsServer struct { - ctx context.Context + logger *zap.Logger + ctx context.Context handler QueryMessageHandler @@ -41,12 +42,13 @@ type wsServer struct { } // NewWsServer creates a new instance -func NewWsServer(ctx context.Context, handler QueryMessageHandler, mux *http.ServeMux, withPing bool) WebSocketServer { +func NewWsServer(ctx context.Context, logger *zap.Logger, handler QueryMessageHandler, mux *http.ServeMux, withPing bool) WebSocketServer { ws := wsServer{ ctx: ctx, + logger: logger.Named(logging.NameWSServer), handler: handler, router: mux, - broadcaster: newBroadcaster(), + broadcaster: newBroadcaster(logger), out: new(event.Feed), withPing: withPing, } @@ -58,19 +60,17 @@ func (ws *wsServer) UseQueryHandler(handler QueryMessageHandler) { } // Start starts the websocket server and the broadcaster -func (ws *wsServer) Start(logger *zap.Logger, addr string) error { - logger = logger.Named(logging.NameWSServer) - - ws.RegisterHandler(logger, "query", "/query", ws.handleQuery) - ws.RegisterHandler(logger, "stream", "/stream", ws.handleStream) +func (ws *wsServer) Start(addr string) error { + ws.RegisterHandler("query", "/query", ws.handleQuery) + ws.RegisterHandler("stream", "/stream", ws.handleStream) go func() { - if err := ws.broadcaster.FromFeed(logger, ws.out); err != nil { - logger.Debug("failed to pull messages from feed") + if err := ws.broadcaster.FromFeed(ws.out); err != nil { + ws.logger.Debug("failed to pull messages from feed") } }() - logger.Info("starting", fields.Address(addr), zap.Strings("endPoints", []string{"/query", "/stream"})) + ws.logger.Info("starting", fields.Address(addr), zap.Strings("endPoints", []string{"/query", "/stream"})) const timeout = 3 * time.Second @@ -83,7 +83,7 @@ func (ws *wsServer) Start(logger *zap.Logger, addr string) error { err := httpServer.ListenAndServe() if err != nil { - logger.Warn("could not start", zap.Error(err)) + ws.logger.Warn("could not start", zap.Error(err)) } return err } @@ -94,23 +94,22 @@ func (ws *wsServer) BroadcastFeed() *event.Feed { } // RegisterHandler registers an end point -func (ws *wsServer) RegisterHandler(logger *zap.Logger, name, endPoint string, handler func(logger *zap.Logger, conn *websocket.Conn)) { +func (ws *wsServer) RegisterHandler(name, endPoint string, handler func(conn *websocket.Conn)) { wrappedHandler := func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, w.Header()) - logger := logger.With(zap.String("remote addr", conn.RemoteAddr().String())) if err != nil { - logger.Error("could not upgrade connection", zap.Error(err)) + ws.logger.Error("could not upgrade connection", zap.Error(err)) return } - logger.Debug("new websocket connection") + ws.logger.Debug("new websocket connection") defer func() { - logger.Debug("closing connection") + ws.logger.Debug("closing connection") err := conn.Close() if err != nil { - logger.Error("could not close connection", zap.Error(err)) + ws.logger.Error("could not close connection", zap.Error(err)) } }() - handler(logger, conn) + handler(conn) } otelHandler := otelhttp.NewHandler(http.HandlerFunc(wrappedHandler), name) @@ -118,12 +117,14 @@ func (ws *wsServer) RegisterHandler(logger *zap.Logger, name, endPoint string, h } // handleQuery receives query message and respond async -func (ws *wsServer) handleQuery(logger *zap.Logger, conn *websocket.Conn) { +func (ws *wsServer) handleQuery(conn *websocket.Conn) { if ws.handler == nil { return } cid := ConnectionID(conn) - logger = logger.With(fields.ConnectionID(cid)) + logger := ws.logger. + With(fields.ConnectionID(cid)). + With(zap.String("remote addr", conn.RemoteAddr().String())) logger.Debug("handles query requests") for { @@ -144,7 +145,7 @@ func (ws *wsServer) handleQuery(logger *zap.Logger, conn *websocket.Conn) { networkMessage = NetworkMessage{incoming, nil, conn} } // handler is processing the request and updates msg - ws.handler(logger, &networkMessage) + ws.handler(&networkMessage) err = tasks.Retry(func() error { return conn.WriteJSON(&networkMessage.Msg) @@ -157,9 +158,11 @@ func (ws *wsServer) handleQuery(logger *zap.Logger, conn *websocket.Conn) { } // handleStream registers the connection for broadcasting of stream messages -func (ws *wsServer) handleStream(logger *zap.Logger, wsc *websocket.Conn) { +func (ws *wsServer) handleStream(wsc *websocket.Conn) { cid := ConnectionID(wsc) - logger = logger.With(fields.ConnectionID(cid)) + logger := ws.logger. + With(fields.ConnectionID(cid)). + With(zap.String("remote addr", wsc.RemoteAddr().String())) defer logger.Debug("stream handler done") ctx, cancel := context.WithCancel(ws.ctx) diff --git a/exporter/api/server_test.go b/exporter/api/server_test.go index a87cba3957..f961b4671a 100644 --- a/exporter/api/server_test.go +++ b/exporter/api/server_test.go @@ -22,7 +22,7 @@ func TestHandleQuery(t *testing.T) { logger := zaptest.NewLogger(t) ctx, cancelServerCtx := context.WithCancel(context.Background()) mux := http.NewServeMux() - ws := NewWsServer(ctx, func(logger *zap.Logger, nm *NetworkMessage) { + ws := NewWsServer(ctx, zap.NewNop(), func(nm *NetworkMessage) { nm.Msg.Data = []registrystorage.OperatorData{ {PublicKey: []byte(fmt.Sprintf("pubkey-%d", nm.Msg.Filter.From))}, } @@ -36,7 +36,7 @@ func TestHandleQuery(t *testing.T) { time.Sleep(100 * time.Millisecond) wg.Done() }() - require.NoError(t, ws.Start(logger, addr)) + require.NoError(t, ws.Start(addr)) }() wg.Wait() @@ -76,10 +76,10 @@ func TestHandleStream(t *testing.T) { logger := zaptest.NewLogger(t) ctx := context.Background() mux := http.NewServeMux() - ws := NewWsServer(ctx, nil, mux, false).(*wsServer) + ws := NewWsServer(ctx, zap.NewNop(), nil, mux, false).(*wsServer) addr := fmt.Sprintf(":%d", getRandomPort(8001, 14000)) go func() { - require.NoError(t, ws.Start(logger, addr)) + require.NoError(t, ws.Start(addr)) }() testCtx, cancelCtx := context.WithCancel(ctx) diff --git a/ibft/storage/store.go b/ibft/storage/store.go index ced3e1c293..e9b40efa05 100644 --- a/ibft/storage/store.go +++ b/ibft/storage/store.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/logging/fields" "github.com/ssvlabs/ssv/operator/slotticker" qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" @@ -44,6 +45,7 @@ func init() { // participantStorage struct // instanceType is what separates different iBFT eth2 duty types (attestation, proposal and aggregation) type participantStorage struct { + logger *zap.Logger prefix []byte oldPrefix string // kept back for cleanup db basedb.Database @@ -51,9 +53,10 @@ type participantStorage struct { } // New create new participant store -func New(db basedb.Database, prefix spectypes.BeaconRole) qbftstorage.ParticipantStore { +func New(logger *zap.Logger, db basedb.Database, prefix spectypes.BeaconRole) qbftstorage.ParticipantStore { role := byte(prefix & 0xff) return &participantStorage{ + logger: logger, prefix: []byte{role}, oldPrefix: prefix.String(), db: db, @@ -61,20 +64,20 @@ func New(db basedb.Database, prefix spectypes.BeaconRole) qbftstorage.Participan } // Prune waits for the initial tick and then removes all slots below the tickSlot - retain -func (i *participantStorage) Prune(ctx context.Context, logger *zap.Logger, threshold phase0.Slot) { - logger.Info("start initial stale slot cleanup", zap.String("store", i.ID()), fields.Slot(threshold)) +func (i *participantStorage) Prune(ctx context.Context, threshold phase0.Slot) { + i.logger.Info("start initial stale slot cleanup", zap.String("store", i.ID()), fields.Slot(threshold)) // remove ALL slots below the threshold start := time.Now() - count := i.removeSlotsOlderThan(logger, threshold) + count := i.removeSlotsOlderThan(threshold) - logger.Info("removed stale slot entries", zap.String("store", i.ID()), fields.Slot(threshold), zap.Int("count", count), zap.Duration("took", time.Since(start))) + i.logger.Info("removed stale slot entries", zap.String("store", i.ID()), fields.Slot(threshold), zap.Int("count", count), zap.Duration("took", time.Since(start))) } -// PruneContinously on every tick looks up and removes the slots that fall below the retain threshold -func (i *participantStorage) PruneContinously(ctx context.Context, logger *zap.Logger, slotTickerProvider slotticker.Provider, retain phase0.Slot) { +// PruneContinuously on every tick looks up and removes the slots that fall below the retain threshold +func (i *participantStorage) PruneContinously(ctx context.Context, slotTickerProvider slotticker.Provider, retain phase0.Slot) { ticker := slotTickerProvider() - logger.Info("start stale slot cleanup loop", zap.String("store", i.ID())) + i.logger.Info("start stale slot cleanup loop", zap.String("store", i.ID())) for { select { case <-ctx.Done(): @@ -83,10 +86,10 @@ func (i *participantStorage) PruneContinously(ctx context.Context, logger *zap.L threshold := ticker.Slot() - retain - 1 count, err := i.removeSlotAt(threshold) if err != nil { - logger.Error("remove slot at", zap.String("store", i.ID()), fields.Slot(threshold)) + i.logger.Error("remove slot at", zap.String("store", i.ID()), fields.Slot(threshold)) } - logger.Debug("removed stale slots", zap.String("store", i.ID()), fields.Slot(threshold), zap.Int("count", count)) + i.logger.Debug("removed stale slots", zap.String("store", i.ID()), fields.Slot(threshold), zap.Int("count", count)) } } } @@ -130,7 +133,7 @@ func (i *participantStorage) removeSlotAt(slot phase0.Slot) (int, error) { var dropPrefixMu sync.Mutex // removes ALL entries for any slots older or equal to given slot -func (i *participantStorage) removeSlotsOlderThan(logger *zap.Logger, slot phase0.Slot) int { +func (i *participantStorage) removeSlotsOlderThan(slot phase0.Slot) int { var total int for { slot-- // slots are incremental @@ -141,21 +144,21 @@ func (i *participantStorage) removeSlotsOlderThan(logger *zap.Logger, slot phase count, err := i.db.CountPrefix(prefix) if err != nil { - logger.Error("count prefix of stale slots", zap.String("store", i.ID()), fields.Slot(slot), zap.Error(err)) + i.logger.Error("count prefix of stale slots", zap.String("store", i.ID()), fields.Slot(slot), zap.Error(err)) return true } if count == 0 { - logger.Debug("no more keys at slot", zap.String("store", i.ID()), fields.Slot(slot)) + i.logger.Debug("no more keys at slot", zap.String("store", i.ID()), fields.Slot(slot)) return true } if err := i.db.DropPrefix(prefix); err != nil { - logger.Error("drop prefix of stale slots", zap.String("store", i.ID()), fields.Slot(slot), zap.Error(err)) + i.logger.Error("drop prefix of stale slots", zap.String("store", i.ID()), fields.Slot(slot), zap.Error(err)) return true } - logger.Debug("drop prefix", zap.String("store", i.ID()), zap.Int64("count", count), fields.Slot(slot)) + i.logger.Debug("drop prefix", zap.String("store", i.ID()), zap.Int64("count", count), fields.Slot(slot)) total += int(count) return false diff --git a/ibft/storage/store_test.go b/ibft/storage/store_test.go index 03a159dd30..de89e25c4c 100644 --- a/ibft/storage/store_test.go +++ b/ibft/storage/store_test.go @@ -18,6 +18,7 @@ import ( specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/operator/slotticker" mockslotticker "github.com/ssvlabs/ssv/operator/slotticker/mocks" qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" @@ -35,7 +36,7 @@ func TestRemoveSlot(t *testing.T) { role := spectypes.BNRoleAttester ibftStorage := NewStores() - ibftStorage.Add(role, New(db, role)) + ibftStorage.Add(role, New(zap.NewNop(), db, role)) _ = bls.Init(bls.BLS12_381) @@ -78,7 +79,7 @@ func TestRemoveSlot(t *testing.T) { t.Run("remove slot older than", func(t *testing.T) { threshold := phase0.Slot(100) - count := storage.removeSlotsOlderThan(zap.NewNop(), threshold) + count := storage.removeSlotsOlderThan(threshold) require.Equal(t, 100, count) pp, err := storage.GetAllParticipantsInRange(phase0.Slot(0), phase0.Slot(250)) @@ -116,7 +117,7 @@ func TestSlotCleanupJob(t *testing.T) { role := spectypes.BNRoleAttester ibftStorage := NewStores() - ibftStorage.Add(role, New(db, role)) + ibftStorage.Add(role, New(zap.NewNop(), db, role)) _ = bls.Init(bls.BLS12_381) @@ -196,7 +197,7 @@ func TestSlotCleanupJob(t *testing.T) { } // initial cleanup removes ALL slots below 3 - storage.Prune(ctx, zap.NewNop(), 3) + storage.Prune(ctx, 3) pp, err := storage.GetAllParticipantsInRange(phase0.Slot(0), phase0.Slot(10)) require.Nil(t, err) @@ -214,7 +215,7 @@ func TestSlotCleanupJob(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - storage.PruneContinously(ctx, zap.NewNop(), tickerProv, 1) + storage.PruneContinously(ctx, tickerProv, 1) }() mockTimeChan <- time.Now() diff --git a/ibft/storage/stores.go b/ibft/storage/stores.go index e68fef548f..0eb1eef244 100644 --- a/ibft/storage/stores.go +++ b/ibft/storage/stores.go @@ -2,6 +2,8 @@ package storage import ( spectypes "github.com/ssvlabs/ssv-spec/types" + "go.uber.org/zap" + qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage" "github.com/ssvlabs/ssv/storage/basedb" "github.com/ssvlabs/ssv/utils/hashmap" @@ -18,10 +20,10 @@ func NewStores() *ParticipantStores { } } -func NewStoresFromRoles(db basedb.Database, roles ...spectypes.BeaconRole) *ParticipantStores { +func NewStoresFromRoles(logger *zap.Logger, db basedb.Database, roles ...spectypes.BeaconRole) *ParticipantStores { stores := NewStores() for _, role := range roles { - stores.Add(role, New(db, role)) + stores.Add(role, New(logger, db, role)) } return stores } diff --git a/monitoring/metrics/handler.go b/monitoring/metrics/handler.go index fbe867b76a..536eb13338 100644 --- a/monitoring/metrics/handler.go +++ b/monitoring/metrics/handler.go @@ -19,14 +19,16 @@ import ( ) type Handler struct { + logger *zap.Logger db basedb.Database enableProf bool healthChecker HealthChecker } // NewHandler returns a new metrics handler. -func NewHandler(db basedb.Database, enableProf bool, healthChecker HealthChecker) *Handler { +func NewHandler(logger *zap.Logger, db basedb.Database, enableProf bool, healthChecker HealthChecker) *Handler { mh := Handler{ + logger: logger, db: db, enableProf: enableProf, healthChecker: healthChecker, @@ -34,8 +36,8 @@ func NewHandler(db basedb.Database, enableProf bool, healthChecker HealthChecker return &mh } -func (h *Handler) Start(logger *zap.Logger, mux *http.ServeMux, addr string) error { - logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", h.enableProf)) +func (h *Handler) Start(mux *http.ServeMux, addr string) error { + h.logger.Info("setup collection", fields.Address(addr), zap.Bool("enableProf", h.enableProf)) if h.enableProf { h.configureProfiling() diff --git a/network/discovery/dv5_filters.go b/network/discovery/dv5_filters.go index e32bddbbf7..b2c4940ef5 100644 --- a/network/discovery/dv5_filters.go +++ b/network/discovery/dv5_filters.go @@ -4,8 +4,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" - "github.com/ssvlabs/ssv/network/records" "go.uber.org/zap" + + "github.com/ssvlabs/ssv/network/records" ) // limitNodeFilter returns true if the limit is exceeded @@ -14,29 +15,29 @@ func (dvs *DiscV5Service) limitNodeFilter(node *enode.Node) bool { } //// forkVersionFilter checks if the node has the same fork version -// func (dvs *DiscV5Service) forkVersionFilter(logger *zap.Logger, node *enode.Node) bool { +// func (dvs *DiscV5Service) forkVersionFilter(node *enode.Node) bool { // forkv, err := records.GetForkVersionEntry(node.Record()) // if err != nil { -// logger.Warn("could not read fork version from node record", zap.Error(err)) +// dvs.logger.Warn("could not read fork version from node record", zap.Error(err)) // return false // } // return dvs.forkv == forkv //} // badNodeFilter checks if the node was pruned or have a bad score -func (dvs *DiscV5Service) badNodeFilter(logger *zap.Logger) func(node *enode.Node) bool { +func (dvs *DiscV5Service) badNodeFilter() func(node *enode.Node) bool { return func(node *enode.Node) bool { pid, err := PeerID(node) if err != nil { - logger.Warn("could not get peer ID from node record", zap.Error(err)) + dvs.logger.Warn("could not get peer ID from node record", zap.Error(err)) return false } - return !dvs.conns.IsBad(logger, pid) + return !dvs.conns.IsBad(pid) } } // badNodeFilter checks if the node was pruned or have a bad score -func (dvs *DiscV5Service) ssvNodeFilter(logger *zap.Logger) func(node *enode.Node) bool { +func (dvs *DiscV5Service) ssvNodeFilter() func(node *enode.Node) bool { return func(node *enode.Node) bool { var isSSV = new(bool) if err := node.Record().Load(enr.WithEntry("ssv", isSSV)); err != nil { diff --git a/network/discovery/dv5_routing.go b/network/discovery/dv5_routing.go index 0f69fe5d60..30c73447d6 100644 --- a/network/discovery/dv5_routing.go +++ b/network/discovery/dv5_routing.go @@ -32,12 +32,12 @@ func (dvs *DiscV5Service) Advertise(ctx context.Context, ns string, opt ...disco return opts.Ttl, nil } - updated, err := dvs.RegisterSubnets(logger, subnet) + updated, err := dvs.RegisterSubnets(subnet) if err != nil { return 0, err } if updated { - go dvs.PublishENR(logger) + go dvs.PublishENR() } return opts.Ttl, nil @@ -56,7 +56,7 @@ func (dvs *DiscV5Service) FindPeers(ctx context.Context, ns string, opt ...disco dvs.discover(ctx, func(e PeerEvent) { cn <- e.AddrInfo - }, time.Millisecond, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger), dvs.subnetFilter(subnet)) + }, time.Millisecond, dvs.ssvNodeFilter(), dvs.badNodeFilter(), dvs.subnetFilter(subnet)) return cn, nil } diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index f6e1616f7d..c054961dbe 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -50,6 +50,8 @@ type Listener interface { // currently using ENR entry (subnets) to facilitate subnets discovery // TODO: should be changed once discv5 supports topics (v5.2) type DiscV5Service struct { + logger *zap.Logger + ctx context.Context cancel context.CancelFunc @@ -71,6 +73,7 @@ type DiscV5Service struct { func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Options) (Service, error) { ctx, cancel := context.WithCancel(pctx) dvs := DiscV5Service{ + logger: logger.Named(logging.NameDiscoveryService), ctx: ctx, cancel: cancel, conns: discOpts.ConnIndex, @@ -135,16 +138,14 @@ func (dvs *DiscV5Service) Node(logger *zap.Logger, info peer.AddrInfo) (*enode.N // Bootstrap start looking for new nodes, note that this function blocks. // if we reached peers limit, make sure to accept peers with more than 1 shared subnet, // which lets other components to determine whether we'll want to connect to this node or not. -func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) error { - logger = logger.Named(logging.NameDiscoveryService) - +func (dvs *DiscV5Service) Bootstrap(handler HandleNewPeer) error { // Log every 10th skipped peer. // TODO: remove once we've merged https://github.com/ssvlabs/ssv/pull/1803 const logFrequency = 10 var skippedPeers uint64 = 0 dvs.discover(dvs.ctx, func(e PeerEvent) { - logger := logger.With( + logger := dvs.logger.With( fields.ENR(e.Node), fields.PeerID(e.AddrInfo.ID), ) @@ -324,7 +325,7 @@ func (dvs *DiscV5Service) discover(ctx context.Context, handler HandleNewPeer, i } // RegisterSubnets adds the given subnets and publish the updated node record -func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) { +func (dvs *DiscV5Service) RegisterSubnets(subnets ...uint64) (updated bool, err error) { if len(subnets) == 0 { return false, nil } @@ -334,16 +335,14 @@ func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...uint64) } if updatedSubnets != nil { dvs.subnets = updatedSubnets - logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) + dvs.logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) return true, nil } return false, nil } // DeregisterSubnets removes the given subnets and publish the updated node record -func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) { - logger = logger.Named(logging.NameDiscoveryService) - +func (dvs *DiscV5Service) DeregisterSubnets(subnets ...uint64) (updated bool, err error) { if len(subnets) == 0 { return false, nil } @@ -353,23 +352,23 @@ func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...uint6 } if updatedSubnets != nil { dvs.subnets = updatedSubnets - logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) + dvs.logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) return true, nil } return false, nil } // PublishENR publishes the ENR with the current domain type across the network -func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) { +func (dvs *DiscV5Service) PublishENR() { // Update own node record. err := records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyDomainType, dvs.networkConfig.DomainType) if err != nil { - logger.Error("could not set domain type", zap.Error(err)) + dvs.logger.Error("could not set domain type", zap.Error(err)) return } err = records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyNextDomainType, dvs.networkConfig.DomainType) if err != nil { - logger.Error("could not set next domain type", zap.Error(err)) + dvs.logger.Error("could not set next domain type", zap.Error(err)) return } @@ -403,15 +402,15 @@ func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) { // ignore return } - logger.Warn("could not ping node", fields.TargetNodeENR(e.Node), zap.Error(err)) + dvs.logger.Warn("could not ping node", fields.TargetNodeENR(e.Node), zap.Error(err)) return } pings++ peerIDs[e.AddrInfo.ID] = struct{}{} - }, time.Millisecond*100, dvs.ssvNodeFilter(logger), dvs.badNodeFilter(logger)) + }, time.Millisecond*100, dvs.ssvNodeFilter(), dvs.badNodeFilter()) // Log metrics. - logger.Debug("done publishing ENR", + dvs.logger.Debug("done publishing ENR", fields.Duration(start), zap.Int("unique_peers", len(peerIDs)), zap.Int("pings", pings), diff --git a/network/discovery/local_service.go b/network/discovery/local_service.go index 5127036828..bfa701e2a3 100644 --- a/network/discovery/local_service.go +++ b/network/discovery/local_service.go @@ -75,7 +75,7 @@ func handle(host host.Host, handler HandleNewPeer) HandleNewPeer { } // Bootstrap starts to listen to new nodes -func (md *localDiscovery) Bootstrap(logger *zap.Logger, handler HandleNewPeer) error { +func (md *localDiscovery) Bootstrap(handler HandleNewPeer) error { err := md.svc.Start() if err != nil { return errors.Wrap(err, "could not start mdns service") @@ -94,18 +94,18 @@ func (md *localDiscovery) FindPeers(ctx context.Context, ns string, opt ...disco } // RegisterSubnets implements Service -func (md *localDiscovery) RegisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) { +func (md *localDiscovery) RegisterSubnets(subnets ...uint64) (updated bool, err error) { // TODO return false, nil } // DeregisterSubnets implements Service -func (md *localDiscovery) DeregisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) { +func (md *localDiscovery) DeregisterSubnets(subnets ...uint64) (updated bool, err error) { // TODO return false, nil } -func (md *localDiscovery) PublishENR(logger *zap.Logger) { +func (md *localDiscovery) PublishENR() { // TODO } @@ -128,7 +128,7 @@ func (md *localDiscovery) Close() error { return nil } -func (dvs *localDiscovery) UpdateDomainType(logger *zap.Logger, domain spectypes.DomainType) error { +func (dvs *localDiscovery) UpdateDomainType(domain spectypes.DomainType) error { // TODO return nil } diff --git a/network/discovery/service.go b/network/discovery/service.go index aa9bed5498..8f07d54cdb 100644 --- a/network/discovery/service.go +++ b/network/discovery/service.go @@ -47,10 +47,10 @@ type Options struct { type Service interface { discovery.Discovery io.Closer - RegisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) - DeregisterSubnets(logger *zap.Logger, subnets ...uint64) (updated bool, err error) - Bootstrap(logger *zap.Logger, handler HandleNewPeer) error - PublishENR(logger *zap.Logger) + RegisterSubnets(subnets ...uint64) (updated bool, err error) + DeregisterSubnets(subnets ...uint64) (updated bool, err error) + Bootstrap(handler HandleNewPeer) error + PublishENR() } // NewService creates new discovery.Service diff --git a/network/discovery/service_test.go b/network/discovery/service_test.go index cef16171e7..1e32c20eee 100644 --- a/network/discovery/service_test.go +++ b/network/discovery/service_test.go @@ -7,11 +7,10 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" + spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/network/records" "github.com/ssvlabs/ssv/networkconfig" ) @@ -54,7 +53,7 @@ func TestDiscV5Service_RegisterSubnets(t *testing.T) { dvs := testingDiscovery(t) // Register subnets 1, 3, and 5 - updated, err := dvs.RegisterSubnets(testLogger, 1, 3, 5) + updated, err := dvs.RegisterSubnets(1, 3, 5) assert.NoError(t, err) assert.True(t, updated) @@ -64,7 +63,7 @@ func TestDiscV5Service_RegisterSubnets(t *testing.T) { require.Equal(t, byte(0), dvs.subnets[2]) // Register the same subnets. Should not update the state - updated, err = dvs.RegisterSubnets(testLogger, 1, 3, 5) + updated, err = dvs.RegisterSubnets(1, 3, 5) assert.NoError(t, err) assert.False(t, updated) @@ -74,7 +73,7 @@ func TestDiscV5Service_RegisterSubnets(t *testing.T) { require.Equal(t, byte(0), dvs.subnets[2]) // Register different subnets - updated, err = dvs.RegisterSubnets(testLogger, 2, 4) + updated, err = dvs.RegisterSubnets(2, 4) assert.NoError(t, err) assert.True(t, updated) require.Equal(t, byte(1), dvs.subnets[1]) @@ -93,7 +92,7 @@ func TestDiscV5Service_DeregisterSubnets(t *testing.T) { dvs := testingDiscovery(t) // Register subnets first - _, err := dvs.RegisterSubnets(testLogger, 1, 2, 3) + _, err := dvs.RegisterSubnets(1, 2, 3) require.NoError(t, err) require.Equal(t, byte(1), dvs.subnets[1]) @@ -101,7 +100,7 @@ func TestDiscV5Service_DeregisterSubnets(t *testing.T) { require.Equal(t, byte(1), dvs.subnets[3]) // Deregister from 2 and 3 - updated, err := dvs.DeregisterSubnets(testLogger, 2, 3) + updated, err := dvs.DeregisterSubnets(2, 3) assert.NoError(t, err) assert.True(t, updated) @@ -110,7 +109,7 @@ func TestDiscV5Service_DeregisterSubnets(t *testing.T) { require.Equal(t, byte(0), dvs.subnets[3]) // Deregistering non-existent subnets should not update - updated, err = dvs.DeregisterSubnets(testLogger, 4, 5) + updated, err = dvs.DeregisterSubnets(4, 5) assert.NoError(t, err) assert.False(t, updated) @@ -140,7 +139,6 @@ func checkLocalNodeDomainTypeAlignment(t *testing.T, localNode *enode.LocalNode, } func TestDiscV5Service_PublishENR(t *testing.T) { - logger := zap.NewNop() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -161,14 +159,13 @@ func TestDiscV5Service_PublishENR(t *testing.T) { // Change network config dvs.networkConfig = networkconfig.HoleskyStage // Test PublishENR method - dvs.PublishENR(logger) + dvs.PublishENR() // Check LocalNode has been updated checkLocalNodeDomainTypeAlignment(t, localNode, networkconfig.HoleskyStage) } func TestDiscV5Service_Bootstrap(t *testing.T) { - logger := zap.NewNop() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -194,7 +191,7 @@ func TestDiscV5Service_Bootstrap(t *testing.T) { // Run bootstrap go func() { - err := dvs.Bootstrap(logger, handler) + err := dvs.Bootstrap(handler) assert.NoError(t, err) }() diff --git a/network/discovery/util_test.go b/network/discovery/util_test.go index 79da7233e3..0a273136bf 100644 --- a/network/discovery/util_test.go +++ b/network/discovery/util_test.go @@ -318,7 +318,7 @@ func (mc *MockConnection) AtLimit(dir network.Direction) bool { return mc.atLimit } -func (mc *MockConnection) IsBad(logger *zap.Logger, id peer.ID) bool { +func (mc *MockConnection) IsBad(id peer.ID) bool { mc.mu.RLock() defer mc.mu.RUnlock() if bad, ok := mc.isBad[id]; ok { diff --git a/network/network.go b/network/network.go index d28d14c41a..5c9cc1fefb 100644 --- a/network/network.go +++ b/network/network.go @@ -6,8 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/zap" - "github.com/ssvlabs/ssv/network/records" protocolp2p "github.com/ssvlabs/ssv/protocol/v2/p2p" ) @@ -40,13 +38,13 @@ type P2PNetwork interface { // Start starts the network Start() error // UpdateSubnets will update the registered subnets according to active validators - UpdateSubnets(logger *zap.Logger) + UpdateSubnets() // SubscribeAll subscribes to all subnets - SubscribeAll(logger *zap.Logger) error + SubscribeAll() error // SubscribeRandoms subscribes to random subnets - SubscribeRandoms(logger *zap.Logger, numSubnets int) error + SubscribeRandoms(numSubnets int) error // UpdateScoreParams will update the scoring parameters of GossipSub - UpdateScoreParams(logger *zap.Logger) + UpdateScoreParams() // ActiveSubnets returns active subnets ActiveSubnets() records.Subnets // FixedSubnets returns fixed subnets diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 61b1129403..ec178d2295 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -245,7 +245,7 @@ func (n *p2pNetwork) Start() error { zap.Int("trusted_peers", len(n.trustedPeers)), ) - go n.startDiscovery(n.logger, connector) + go n.startDiscovery(connector) async.Interval(n.ctx, connManagerBalancingInterval, n.peersBalancing()) @@ -255,7 +255,7 @@ func (n *p2pNetwork) Start() error { async.Interval(n.ctx, topicsReportingInterval, recordPeerCountPerTopic(n.ctx, n.logger, n.topicsCtrl, 2)) - if err := n.subscribeToSubnets(n.logger); err != nil { + if err := n.subscribeToSubnets(); err != nil { return err } @@ -273,7 +273,7 @@ func (n *p2pNetwork) peersBalancing() func() { connMgr := peers.NewConnManager(n.logger, n.libConnManager, n.idx, n.idx) // Disconnect from bad peers - connMgr.DisconnectFromBadPeers(n.logger, n.host.Network(), allPeers) + connMgr.DisconnectFromBadPeers(n.host.Network(), allPeers) // Check if it has the maximum number of connections currentCount := len(allPeers) @@ -287,21 +287,21 @@ func (n *p2pNetwork) peersBalancing() func() { mySubnets := records.Subnets(n.activeSubnets).Clone() // Disconnect from irrelevant peers - disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(n.logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, mySubnets) + disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, mySubnets) if disconnectedPeers > 0 { return } // Trim peers according to subnet participation (considering the subnet size) - connMgr.TagBestPeers(n.logger, n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers) - connMgr.TrimPeers(ctx, n.logger, n.host.Network()) + connMgr.TagBestPeers(n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers) + connMgr.TrimPeers(ctx, n.host.Network()) } } // startDiscovery starts the required services // it will try to bootstrap discovery service, and inject a connect function. // the connect function checks if we can connect to the given peer and if so passing it to the backoff connector. -func (n *p2pNetwork) startDiscovery(logger *zap.Logger, connector chan peer.AddrInfo) { +func (n *p2pNetwork) startDiscovery(connector chan peer.AddrInfo) { discoveredPeers := make(chan peer.AddrInfo, connectorQueueSize) go func() { ctx, cancel := context.WithCancel(n.ctx) @@ -309,19 +309,19 @@ func (n *p2pNetwork) startDiscovery(logger *zap.Logger, connector chan peer.Addr n.backoffConnector.Connect(ctx, discoveredPeers) }() err := tasks.Retry(func() error { - return n.disc.Bootstrap(logger, func(e discovery.PeerEvent) { + return n.disc.Bootstrap(func(e discovery.PeerEvent) { if !n.idx.CanConnect(e.AddrInfo.ID) { return } select { case connector <- e.AddrInfo: default: - logger.Warn("connector queue is full, skipping new peer", fields.PeerID(e.AddrInfo.ID)) + n.logger.Warn("connector queue is full, skipping new peer", fields.PeerID(e.AddrInfo.ID)) } }) }, 3) if err != nil { - logger.Panic("could not setup discovery", zap.Error(err)) + n.logger.Panic("could not setup discovery", zap.Error(err)) } } @@ -331,10 +331,9 @@ func (n *p2pNetwork) isReady() bool { // UpdateSubnets will update the registered subnets according to active validators // NOTE: it won't subscribe to the subnets (use subscribeToSubnets for that) -func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { +func (n *p2pNetwork) UpdateSubnets() { // TODO: this is a temporary fix to update subnets when validators are added/removed, // there is a pending PR to replace this: https://github.com/ssvlabs/ssv/pull/990 - logger = logger.Named(logging.NameP2PNetwork) ticker := time.NewTicker(time.Second) registeredSubnets := make([]byte, commons.Subnets()) defer ticker.Stop() @@ -387,37 +386,37 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { var hasAdded, hasRemoved bool if len(addedSubnets) > 0 { var err error - hasAdded, err = n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...) + hasAdded, err = n.disc.RegisterSubnets(addedSubnets...) if err != nil { - logger.Debug("could not register subnets", zap.Error(err)) + n.logger.Debug("could not register subnets", zap.Error(err)) errs = errors.Join(errs, err) } } if len(removedSubnets) > 0 { var err error - hasRemoved, err = n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...) + hasRemoved, err = n.disc.DeregisterSubnets(removedSubnets...) if err != nil { - logger.Debug("could not unregister subnets", zap.Error(err)) + n.logger.Debug("could not unregister subnets", zap.Error(err)) errs = errors.Join(errs, err) } // Unsubscribe from the removed subnets. for _, removedSubnet := range removedSubnets { - if err := n.unsubscribeSubnet(logger, removedSubnet); err != nil { - logger.Debug("could not unsubscribe from subnet", zap.Uint64("subnet", removedSubnet), zap.Error(err)) + if err := n.unsubscribeSubnet(removedSubnet); err != nil { + n.logger.Debug("could not unsubscribe from subnet", zap.Uint64("subnet", removedSubnet), zap.Error(err)) errs = errors.Join(errs, err) } else { - logger.Debug("unsubscribed from subnet", zap.Uint64("subnet", removedSubnet)) + n.logger.Debug("unsubscribed from subnet", zap.Uint64("subnet", removedSubnet)) } } } if hasAdded || hasRemoved { - go n.disc.PublishENR(logger.Named(logging.NameDiscoveryService)) + go n.disc.PublishENR() } allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0) - logger.Debug("updated subnets", + n.logger.Debug("updated subnets", zap.Any("added", addedSubnets), zap.Any("removed", removedSubnets), zap.Any("subnets", subnetsList), @@ -430,13 +429,11 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } // UpdateScoreParams updates the scoring parameters once per epoch through the call of n.topicsCtrl.UpdateScoreParams -func (n *p2pNetwork) UpdateScoreParams(logger *zap.Logger) { +func (n *p2pNetwork) UpdateScoreParams() { // TODO: this is a temporary solution to update the score parameters periodically. // But, we should use an appropriate trigger for the UpdateScoreParams function that should be // called once a validator is added or removed from the network - logger = logger.Named(logging.NameP2PNetwork) - // function to get the starting time of the next epoch nextEpochStartingTime := func() time.Time { currEpoch := n.cfg.Network.Beacon.EstimatedCurrentEpoch() @@ -452,11 +449,11 @@ func (n *p2pNetwork) UpdateScoreParams(logger *zap.Logger) { for ; true; <-timer.C { // Update score parameters - err := n.topicsCtrl.UpdateScoreParams(logger) + err := n.topicsCtrl.UpdateScoreParams() if err != nil { - logger.Debug("score parameters update failed", zap.Error(err)) + n.logger.Debug("score parameters update failed", zap.Error(err)) } else { - logger.Debug("updated score parameters successfully") + n.logger.Debug("updated score parameters successfully") } // Reset to trigger on the beginning of the next epoch diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index e5bc71d028..6ad7a5f294 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -70,13 +70,13 @@ func (n *p2pNetwork) Broadcast(msgID spectypes.MessageID, msg *spectypes.SignedS return nil } -func (n *p2pNetwork) SubscribeAll(logger *zap.Logger) error { +func (n *p2pNetwork) SubscribeAll() error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } n.fixedSubnets, _ = records.Subnets{}.FromString(records.AllSubnets) for subnet := uint64(0); subnet < commons.SubnetsCount; subnet++ { - err := n.topicsCtrl.Subscribe(logger, commons.SubnetTopicID(subnet)) + err := n.topicsCtrl.Subscribe(commons.SubnetTopicID(subnet)) if err != nil { return err } @@ -85,7 +85,7 @@ func (n *p2pNetwork) SubscribeAll(logger *zap.Logger) error { } // SubscribeRandoms subscribes to random subnets. This method isn't thread-safe. -func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error { +func (n *p2pNetwork) SubscribeRandoms(numSubnets int) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } @@ -99,7 +99,7 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error randomSubnets = randomSubnets[:numSubnets] for _, subnet := range randomSubnets { // #nosec G115 - err := n.topicsCtrl.Subscribe(logger, commons.SubnetTopicID(uint64(subnet))) // Perm slice is [0, n) + err := n.topicsCtrl.Subscribe(commons.SubnetTopicID(uint64(subnet))) // Perm slice is [0, n) if err != nil { return fmt.Errorf("could not subscribe to subnet %d: %w", subnet, err) } @@ -144,7 +144,7 @@ func (n *p2pNetwork) subscribeCommittee(cid spectypes.CommitteeID) error { } for _, topic := range commons.CommitteeTopicID(cid) { - if err := n.topicsCtrl.Subscribe(n.logger, topic); err != nil { + if err := n.topicsCtrl.Subscribe(topic); err != nil { return fmt.Errorf("could not subscribe to topic %s: %w", topic, err) } } @@ -152,21 +152,21 @@ func (n *p2pNetwork) subscribeCommittee(cid spectypes.CommitteeID) error { return nil } -func (n *p2pNetwork) unsubscribeSubnet(logger *zap.Logger, subnet uint64) error { +func (n *p2pNetwork) unsubscribeSubnet(subnet uint64) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } if subnet >= commons.SubnetsCount { return fmt.Errorf("invalid subnet %d", subnet) } - if err := n.topicsCtrl.Unsubscribe(logger, commons.SubnetTopicID(subnet), false); err != nil { + if err := n.topicsCtrl.Unsubscribe(commons.SubnetTopicID(subnet), false); err != nil { return fmt.Errorf("could not unsubscribe from subnet %d: %w", subnet, err) } return nil } // Unsubscribe unsubscribes from the validator subnet -func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error { +func (n *p2pNetwork) Unsubscribe(pk spectypes.ValidatorPK) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } @@ -179,7 +179,7 @@ func (n *p2pNetwork) Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) e cmtid := share.CommitteeID() topics := commons.CommitteeTopicID(cmtid) for _, topic := range topics { - if err := n.topicsCtrl.Unsubscribe(logger, topic, false); err != nil { + if err := n.topicsCtrl.Unsubscribe(topic, false); err != nil { return err } } @@ -215,18 +215,18 @@ func (n *p2pNetwork) handlePubsubMessages() func(ctx context.Context, topic stri } // subscribeToSubnets subscribes to all the node's subnets -func (n *p2pNetwork) subscribeToSubnets(logger *zap.Logger) error { +func (n *p2pNetwork) subscribeToSubnets() error { if !discovery.HasActiveSubnets(n.fixedSubnets) { return nil } - logger.Debug("subscribing to fixed subnets", fields.Subnets(n.fixedSubnets)) + n.logger.Debug("subscribing to fixed subnets", fields.Subnets(n.fixedSubnets)) for i, val := range n.fixedSubnets { if val > 0 { subnet := fmt.Sprintf("%d", i) - if err := n.topicsCtrl.Subscribe(logger, subnet); err != nil { - logger.Warn("could not subscribe to subnet", + if err := n.topicsCtrl.Subscribe(subnet); err != nil { + n.logger.Warn("could not subscribe to subnet", zap.String("subnet", subnet), zap.Error(err)) // TODO: handle error } diff --git a/network/p2p/p2p_reporter.go b/network/p2p/p2p_reporter.go index da44e87efc..d7cccdc57e 100644 --- a/network/p2p/p2p_reporter.go +++ b/network/p2p/p2p_reporter.go @@ -15,20 +15,20 @@ import ( // ReportValidation reports the result for the given message // the result will be converted to a score and reported to peers.ScoreIndex -func (n *p2pNetwork) ReportValidation(logger *zap.Logger, msg *spectypes.SSVMessage, res protocolp2p.MsgValidationResult) { +func (n *p2pNetwork) ReportValidation(msg *spectypes.SSVMessage, res protocolp2p.MsgValidationResult) { if !n.isReady() { return } data, err := commons.EncodeNetworkMsg(msg) if err != nil { - logger.Warn("could not encode message", zap.Error(err)) + n.logger.Warn("could not encode message", zap.Error(err)) return } peers := n.msgResolver.GetPeers(data) for _, pi := range peers { err := n.idx.Score(pi, &ssvpeers.NodeScore{Name: "validation", Value: msgValidationScore(res)}) if err != nil { - logger.Warn("could not score peer", fields.PeerID(pi), zap.Error(err)) + n.logger.Warn("could not score peer", fields.PeerID(pi), zap.Error(err)) continue } } diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index c8e2039d84..db103d7889 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -107,12 +107,12 @@ func (n *p2pNetwork) initCfg() error { return nil } -// Returns whetehr a peer is bad -func (n *p2pNetwork) IsBadPeer(logger *zap.Logger, peerID peer.ID) bool { +// IsBadPeer returns whether a peer is bad +func (n *p2pNetwork) IsBadPeer(peerID peer.ID) bool { if n.idx == nil { return false } - return n.idx.IsBad(logger, peerID) + return n.idx.IsBad(peerID) } // SetupHost configures a libp2p host and backoff connector utility @@ -211,23 +211,26 @@ func (n *p2pNetwork) setupPeerServices() error { } } - handshaker := connections.NewHandshaker(n.ctx, &connections.HandshakerCfg{ - Streams: n.streamCtrl, - NodeInfos: n.idx, - PeerInfos: n.idx, - ConnIdx: n.idx, - SubnetsIdx: n.idx, - IDService: ids, - Network: n.host.Network(), - DomainType: n.cfg.Network.DomainType, - SubnetsProvider: n.ActiveSubnets, - }, filters) - - n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler(n.logger)) + handshaker := connections.NewHandshaker( + n.ctx, + n.logger, + &connections.HandshakerCfg{ + Streams: n.streamCtrl, + NodeInfos: n.idx, + PeerInfos: n.idx, + ConnIdx: n.idx, + SubnetsIdx: n.idx, + IDService: ids, + Network: n.host.Network(), + DomainType: n.cfg.Network.DomainType, + SubnetsProvider: n.ActiveSubnets, + }, filters) + + n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler()) n.logger.Debug("handshaker is ready") - n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx) - n.host.Network().Notify(n.connHandler.Handle(n.logger)) + n.connHandler = connections.NewConnHandler(n.ctx, n.logger, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx) + n.host.Network().Notify(n.connHandler.Handle()) n.logger.Debug("connection handler is ready") return nil diff --git a/network/peers/conn_manager.go b/network/peers/conn_manager.go index 38b1886aa3..7fa559798c 100644 --- a/network/peers/conn_manager.go +++ b/network/peers/conn_manager.go @@ -27,13 +27,13 @@ type PeerScore float64 // rather than relaying on libp2p's connection manager. type ConnManager interface { // TagBestPeers tags the best n peers from the given list, based on subnets distribution scores. - TagBestPeers(logger *zap.Logger, n int, mySubnets records.Subnets, allPeers []peer.ID, topicMaxPeers int) + TagBestPeers(n int, mySubnets records.Subnets, allPeers []peer.ID, topicMaxPeers int) // TrimPeers will trim unprotected peers. - TrimPeers(ctx context.Context, logger *zap.Logger, net libp2pnetwork.Network) + TrimPeers(ctx context.Context, net libp2pnetwork.Network) // DisconnectFromBadPeers will disconnect from bad peers according to their Gossip scores. It returns the number of disconnected peers. - DisconnectFromBadPeers(logger *zap.Logger, net libp2pnetwork.Network, allPeers []peer.ID) int + DisconnectFromBadPeers(net libp2pnetwork.Network, allPeers []peer.ID) int // DisconnectFromIrrelevantPeers will disconnect from at most [disconnectQuota] peers that doesn't share any subnet in common. It returns the number of disconnected peers. - DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int + DisconnectFromIrrelevantPeers(disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int } // connManager implements ConnManager @@ -61,9 +61,9 @@ func (c connManager) disconnect(peerID peer.ID, net libp2pnetwork.Network) error } // Set the "Protect" tag for the best [n] peers. For the others, set the "Unprotect" tag -func (c connManager) TagBestPeers(logger *zap.Logger, n int, mySubnets records.Subnets, allPeers []peer.ID, topicMaxPeers int) { +func (c connManager) TagBestPeers(n int, mySubnets records.Subnets, allPeers []peer.ID, topicMaxPeers int) { bestPeers := c.getBestPeers(n, mySubnets, allPeers, topicMaxPeers) - logger.Debug("tagging best peers", + c.logger.Debug("tagging best peers", zap.Int("n", n), zap.Int("allPeers", len(allPeers)), zap.Int("bestPeers", len(bestPeers))) @@ -80,7 +80,7 @@ func (c connManager) TagBestPeers(logger *zap.Logger, n int, mySubnets records.S } // Closes the connection to all peers that are not protected -func (c connManager) TrimPeers(ctx context.Context, logger *zap.Logger, net libp2pnetwork.Network) { +func (c connManager) TrimPeers(ctx context.Context, net libp2pnetwork.Network) { allPeers := net.Peers() before := len(allPeers) // TODO: use libp2p's conn manager once ready @@ -88,10 +88,10 @@ func (c connManager) TrimPeers(ctx context.Context, logger *zap.Logger, net libp for _, pid := range allPeers { if !c.connManager.IsProtected(pid, protectedTag) { err := c.disconnect(pid, net) - logger.Debug("closing peer", fields.PeerID(pid), zap.Error(err)) + c.logger.Debug("closing peer", fields.PeerID(pid), zap.Error(err)) } } - logger.Debug("trimmed peers", zap.Int("beforeTrim", before), + c.logger.Debug("trimmed peers", zap.Int("beforeTrim", before), zap.Int("afterTrim", len(net.Peers()))) } @@ -209,16 +209,16 @@ func scorePeer(peerSubnets records.Subnets, subnetsScores []float64) PeerScore { } // DisconnectFromBadPeers will disconnect from bad peers according to their Gossip scores. It returns the number of disconnected peers. -func (c connManager) DisconnectFromBadPeers(logger *zap.Logger, net libp2pnetwork.Network, allPeers []peer.ID) int { +func (c connManager) DisconnectFromBadPeers(net libp2pnetwork.Network, allPeers []peer.ID) int { disconnectedPeers := 0 for _, peerID := range allPeers { // Disconnect if peer has bad gossip score. if isBad, gossipScore := c.gossipScoreIndex.HasBadGossipScore(peerID); isBad { err := c.disconnect(peerID, net) if err != nil { - logger.Error("failed to disconnect from bad peer", fields.PeerID(peerID), zap.Float64("gossip_score", gossipScore)) + c.logger.Error("failed to disconnect from bad peer", fields.PeerID(peerID), zap.Float64("gossip_score", gossipScore)) } else { - logger.Debug("disconnecting from bad peer", fields.PeerID(peerID), zap.Float64("gossip_score", gossipScore)) + c.logger.Debug("disconnecting from bad peer", fields.PeerID(peerID), zap.Float64("gossip_score", gossipScore)) disconnectedPeers++ } } @@ -228,7 +228,7 @@ func (c connManager) DisconnectFromBadPeers(logger *zap.Logger, net libp2pnetwor } // DisconnectFromIrrelevantPeers will disconnect from at most [disconnectQuota] peers that doesn't share any subnet in common. It returns the number of disconnected peers. -func (c connManager) DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int { +func (c connManager) DisconnectFromIrrelevantPeers(disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int { disconnectedPeers := 0 for _, peerID := range allPeers { peerSubnets := c.subnetsIdx.GetPeerSubnets(peerID) @@ -238,9 +238,9 @@ func (c connManager) DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnec if len(sharedSubnets) == 0 { err := c.disconnect(peerID, net) if err != nil { - logger.Error("failed to disconnect from peer with irrelevant subnets", fields.PeerID(peerID)) + c.logger.Error("failed to disconnect from peer with irrelevant subnets", fields.PeerID(peerID)) } else { - logger.Debug("disconnecting from peer with irrelevant subnets", fields.PeerID(peerID)) + c.logger.Debug("disconnecting from peer with irrelevant subnets", fields.PeerID(peerID)) disconnectedPeers++ if disconnectedPeers >= disconnectQuota { return disconnectedPeers diff --git a/network/peers/conn_manager_test.go b/network/peers/conn_manager_test.go index 17cb8ac53a..1e9f85f972 100644 --- a/network/peers/conn_manager_test.go +++ b/network/peers/conn_manager_test.go @@ -11,12 +11,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/network/records" ) func TestTagBestPeers(t *testing.T) { - logger := logging.TestLogger(t) connMgrMock := newConnMgr() allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) @@ -36,7 +34,7 @@ func TestTagBestPeers(t *testing.T) { best := cm.getBestPeers(40, mySubnets, pids, 10) require.Len(t, best, 40) - cm.TagBestPeers(logger, 20, mySubnets, pids, 10) + cm.TagBestPeers(20, mySubnets, pids, 10) require.Equal(t, 20, len(connMgrMock.tags)) } diff --git a/network/peers/connections/conn_gater.go b/network/peers/connections/conn_gater.go index 5f9274d5b4..defa401c11 100644 --- a/network/peers/connections/conn_gater.go +++ b/network/peers/connections/conn_gater.go @@ -26,7 +26,7 @@ const ( // ) -type BadPeerF func(logger *zap.Logger, peerID peer.ID) bool +type BadPeerF func(peerID peer.ID) bool // connGater implements ConnectionGater interface: // https://github.com/libp2p/go-libp2p/core/blob/master/connmgr/gater.go @@ -60,7 +60,7 @@ func (n *connGater) InterceptPeerDial(id peer.ID) bool { // particular address. Blocking connections at this stage is typical for // address filtering. func (n *connGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) bool { - if n.isBadPeer(n.logger, id) { + if n.isBadPeer(id) { n.logger.Debug("preventing outbound connection due to bad peer", fields.PeerID(id)) return false } @@ -89,7 +89,7 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo // InterceptSecured is called for both inbound and outbound connections, // after a security handshake has taken place and we've authenticated the peer. func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool { - if n.isBadPeer(n.logger, id) { + if n.isBadPeer(id) { n.logger.Debug("rejecting inbound connection due to bad peer", fields.PeerID(id)) return false } diff --git a/network/peers/connections/conn_handler.go b/network/peers/connections/conn_handler.go index de0846d7aa..0f8478eb4b 100644 --- a/network/peers/connections/conn_handler.go +++ b/network/peers/connections/conn_handler.go @@ -19,12 +19,13 @@ import ( // ConnHandler handles new connections (inbound / outbound) using libp2pnetwork.NotifyBundle type ConnHandler interface { - Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle + Handle() *libp2pnetwork.NotifyBundle } // connHandler implements ConnHandler type connHandler struct { - ctx context.Context + ctx context.Context + logger *zap.Logger handshaker Handshaker subnetsProvider SubnetsProvider @@ -36,6 +37,7 @@ type connHandler struct { // NewConnHandler creates a new connection handler func NewConnHandler( ctx context.Context, + logger *zap.Logger, handshaker Handshaker, subnetsProvider SubnetsProvider, subnetsIndex peers.SubnetsIndex, @@ -44,6 +46,7 @@ func NewConnHandler( ) ConnHandler { return &connHandler{ ctx: ctx, + logger: logger, handshaker: handshaker, subnetsProvider: subnetsProvider, subnetsIndex: subnetsIndex, @@ -53,7 +56,7 @@ func NewConnHandler( } // Handle configures a network notifications handler that handshakes and tracks all p2p connections -func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { +func (ch *connHandler) Handle() *libp2pnetwork.NotifyBundle { disconnect := func(logger *zap.Logger, net libp2pnetwork.Network, conn libp2pnetwork.Conn) { id := conn.RemotePeer() errClose := net.ClosePeer(id) @@ -138,7 +141,7 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { } } - if !ch.sharesEnoughSubnets(logger, conn) { + if !ch.sharesEnoughSubnets(conn) { return errors.New("peer doesn't share enough subnets") } return nil @@ -154,7 +157,7 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { } connLogger := func(conn libp2pnetwork.Conn) *zap.Logger { - return logger.Named(logging.NameConnHandler). + return ch.logger.Named(logging.NameConnHandler). With( fields.PeerID(conn.RemotePeer()), zap.String("remote_addr", conn.RemoteMultiaddr().String()), @@ -212,7 +215,7 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { } } -func (ch *connHandler) sharesEnoughSubnets(logger *zap.Logger, conn libp2pnetwork.Conn) bool { +func (ch *connHandler) sharesEnoughSubnets(conn libp2pnetwork.Conn) bool { pid := conn.RemotePeer() subnets := ch.subnetsIndex.GetPeerSubnets(pid) if len(subnets) == 0 { @@ -221,7 +224,7 @@ func (ch *connHandler) sharesEnoughSubnets(logger *zap.Logger, conn libp2pnetwor } mySubnets := ch.subnetsProvider() - logger = logger.With(fields.Subnets(subnets), zap.String("my_subnets", mySubnets.String())) + logger := ch.logger.With(fields.Subnets(subnets), zap.String("my_subnets", mySubnets.String())) if mySubnets.String() == records.ZeroSubnets { // this node has no subnets return true diff --git a/network/peers/connections/filters.go b/network/peers/connections/filters.go index d814042ff3..9f7a9d161c 100644 --- a/network/peers/connections/filters.go +++ b/network/peers/connections/filters.go @@ -23,7 +23,7 @@ func NetworkIDFilter(networkID string) HandshakeFilter { // BadPeerFilter avoids connecting to a bad peer func BadPeerFilter(logger *zap.Logger, n peers.Index) HandshakeFilter { return func(senderID peer.ID, sni *records.NodeInfo) error { - if n.IsBad(logger, senderID) { + if n.IsBad(senderID) { return errors.New("bad peer") } return nil diff --git a/network/peers/connections/handshaker.go b/network/peers/connections/handshaker.go index 26b94051db..ea9dd8f738 100644 --- a/network/peers/connections/handshaker.go +++ b/network/peers/connections/handshaker.go @@ -38,11 +38,12 @@ type SubnetsProvider func() records.Subnets // we accept nodes with user agent as a fallback when the new protocol is not supported. type Handshaker interface { Handshake(logger *zap.Logger, conn libp2pnetwork.Conn) error - Handler(logger *zap.Logger) libp2pnetwork.StreamHandler + Handler() libp2pnetwork.StreamHandler } type handshaker struct { - ctx context.Context + ctx context.Context + logger *zap.Logger filters func() []HandshakeFilter @@ -73,9 +74,10 @@ type HandshakerCfg struct { } // NewHandshaker creates a new instance of handshaker -func NewHandshaker(ctx context.Context, cfg *HandshakerCfg, filters func() []HandshakeFilter) Handshaker { +func NewHandshaker(ctx context.Context, logger *zap.Logger, cfg *HandshakerCfg, filters func() []HandshakeFilter) Handshaker { h := &handshaker{ ctx: ctx, + logger: logger, streams: cfg.Streams, nodeInfos: cfg.NodeInfos, connIdx: cfg.ConnIdx, @@ -91,7 +93,7 @@ func NewHandshaker(ctx context.Context, cfg *HandshakerCfg, filters func() []Han } // Handler returns the handshake handler -func (h *handshaker) Handler(logger *zap.Logger) libp2pnetwork.StreamHandler { +func (h *handshaker) Handler() libp2pnetwork.StreamHandler { handleHandshake := func(logger *zap.Logger, h *handshaker, stream libp2pnetwork.Stream) error { pid := stream.Conn().RemotePeer() request, respond, done, err := h.streams.HandleStream(logger, stream) @@ -126,7 +128,7 @@ func (h *handshaker) Handler(logger *zap.Logger) libp2pnetwork.StreamHandler { return func(stream libp2pnetwork.Stream) { pid := stream.Conn().RemotePeer() - logger := logger.With(fields.PeerID(pid)) + logger := h.logger.With(fields.PeerID(pid)) // Update PeerInfo with the result of this handshake. var err error diff --git a/network/peers/connections/mock/mock_connection_index.go b/network/peers/connections/mock/mock_connection_index.go index 0048afd2b8..6f555aa544 100644 --- a/network/peers/connections/mock/mock_connection_index.go +++ b/network/peers/connections/mock/mock_connection_index.go @@ -3,7 +3,6 @@ package mock import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/zap" ) // MockConnectionIndex is a mock implementation of the ConnectionIndex interface @@ -27,6 +26,6 @@ func (m *MockConnectionIndex) AtLimit(dir network.Direction) bool { } // IsBad panics if called -func (m *MockConnectionIndex) IsBad(logger *zap.Logger, id peer.ID) bool { +func (m *MockConnectionIndex) IsBad(id peer.ID) bool { panic("IsBad method is not implemented in MockConnectionIndex") } diff --git a/network/peers/index.go b/network/peers/index.go index a102aca9fb..001ab8f143 100644 --- a/network/peers/index.go +++ b/network/peers/index.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" - "go.uber.org/zap" "github.com/ssvlabs/ssv/network/records" ) @@ -42,7 +41,7 @@ type ConnectionIndex interface { AtLimit(dir libp2pnetwork.Direction) bool // IsBad returns whether the given peer is bad - IsBad(logger *zap.Logger, id peer.ID) bool + IsBad(id peer.ID) bool } // ScoreIndex is an interface for managing peers scores diff --git a/network/peers/peers_index.go b/network/peers/peers_index.go index 3b21282339..d1cecfeaf5 100644 --- a/network/peers/peers_index.go +++ b/network/peers/peers_index.go @@ -22,6 +22,7 @@ type NetworkKeyProvider func() libp2pcrypto.PrivKey // peersIndex implements Index interface. type peersIndex struct { + logger *zap.Logger netKeyProvider NetworkKeyProvider network libp2pnetwork.Network @@ -42,6 +43,7 @@ func NewPeersIndex(logger *zap.Logger, network libp2pnetwork.Network, self *reco netKeyProvider NetworkKeyProvider, subnetsCount int, pruneTTL time.Duration, gossipScoreIndex GossipScoreIndex) *peersIndex { return &peersIndex{ + logger: logger, network: network, scoreIdx: newScoreIndex(), SubnetsIndex: NewSubnetsIndex(subnetsCount), @@ -59,7 +61,7 @@ func NewPeersIndex(logger *zap.Logger, network libp2pnetwork.Network, self *reco // - bad gossip score // - pruned (that was not expired) // - bad score -func (pi *peersIndex) IsBad(logger *zap.Logger, id peer.ID) bool { +func (pi *peersIndex) IsBad(id peer.ID) bool { if isBad, _ := pi.HasBadGossipScore(id); isBad { return true } @@ -74,7 +76,7 @@ func (pi *peersIndex) IsBad(logger *zap.Logger, id peer.ID) bool { for _, score := range scores { if score.Value < threshold { - logger.Debug("bad peer (low score)") + pi.logger.Debug("bad peer (low score)") return true } } diff --git a/network/topics/controller.go b/network/topics/controller.go index f91491eb7f..86ef9a37ed 100644 --- a/network/topics/controller.go +++ b/network/topics/controller.go @@ -24,9 +24,9 @@ var ( // Controller is an interface for managing pubsub topics type Controller interface { // Subscribe subscribes to the given topic - Subscribe(logger *zap.Logger, name string) error + Subscribe(name string) error // Unsubscribe unsubscribes from the given topic - Unsubscribe(logger *zap.Logger, topicName string, hard bool) error + Unsubscribe(topicName string, hard bool) error // Peers returns the peers subscribed to the given topic Peers(topicName string) ([]peer.ID, error) // Topics lists all the available topics @@ -34,7 +34,7 @@ type Controller interface { // Broadcast publishes the message on the given topic Broadcast(topicName string, data []byte, timeout time.Duration) error // UpdateScoreParams refreshes the score params for every subscribed topic - UpdateScoreParams(logger *zap.Logger) error + UpdateScoreParams() error io.Closer } @@ -81,33 +81,33 @@ func NewTopicsController( subFilter: subFilter, } - ctrl.container = newTopicsContainer(pubSub, ctrl.onNewTopic(logger)) + ctrl.container = newTopicsContainer(pubSub, ctrl.onNewTopic()) return ctrl } -func (ctrl *topicsCtrl) onNewTopic(logger *zap.Logger) onTopicJoined { +func (ctrl *topicsCtrl) onNewTopic() onTopicJoined { return func(ps *pubsub.PubSub, topic *pubsub.Topic) { // initial setup for the topic, should happen only once name := topic.String() if err := ctrl.setupTopicValidator(topic.String()); err != nil { // TODO: close topic? // return err - logger.Warn("could not setup topic", zap.String("topic", name), zap.Error(err)) + ctrl.logger.Warn("could not setup topic", zap.String("topic", name), zap.Error(err)) } if ctrl.scoreParamsFactory != nil { if p := ctrl.scoreParamsFactory(name); p != nil { - logger.Debug("using scoring params for topic", zap.String("topic", name), zap.Any("params", p)) + ctrl.logger.Debug("using scoring params for topic", zap.String("topic", name), zap.Any("params", p)) if err := topic.SetScoreParams(p); err != nil { - // logger.Warn("could not set topic score params", zap.String("topic", name), zap.Error(err)) - logger.Warn("could not set topic score params", zap.String("topic", name), zap.Error(err)) + // ctrl.logger.Warn("could not set topic score params", zap.String("topic", name), zap.Error(err)) + ctrl.logger.Warn("could not set topic score params", zap.String("topic", name), zap.Error(err)) } } } } } -func (ctrl *topicsCtrl) UpdateScoreParams(logger *zap.Logger) error { +func (ctrl *topicsCtrl) UpdateScoreParams() error { if ctrl.scoreParamsFactory == nil { return fmt.Errorf("scoreParamsFactory is not set") } @@ -136,7 +136,7 @@ func (ctrl *topicsCtrl) UpdateScoreParams(logger *zap.Logger) error { func (ctrl *topicsCtrl) Close() error { topics := ctrl.ps.GetTopics() for _, tp := range topics { - _ = ctrl.Unsubscribe(ctrl.logger, commons.GetTopicBaseName(tp), true) + _ = ctrl.Unsubscribe(commons.GetTopicBaseName(tp), true) _ = ctrl.container.Leave(tp) } return nil @@ -166,18 +166,18 @@ func (ctrl *topicsCtrl) Topics() []string { // Subscribe subscribes to the given topic, it can handle multiple concurrent calls. // it will create a single goroutine and channel for every topic -func (ctrl *topicsCtrl) Subscribe(logger *zap.Logger, name string) error { +func (ctrl *topicsCtrl) Subscribe(name string) error { name = commons.GetTopicFullName(name) ctrl.subFilter.(Whitelist).Register(name) sub, err := ctrl.container.Subscribe(name) - defer logger.Debug("subscribing to topic", zap.String("topic", name), zap.Bool("already_subscribed", sub == nil), zap.Error(err)) + defer ctrl.logger.Debug("subscribing to topic", zap.String("topic", name), zap.Bool("already_subscribed", sub == nil), zap.Error(err)) if err != nil { return err } if sub == nil { // already subscribed return nil } - go ctrl.start(logger, name, sub) + go ctrl.start(name, sub) return nil } @@ -206,7 +206,7 @@ func (ctrl *topicsCtrl) Broadcast(name string, data []byte, timeout time.Duratio // Unsubscribe unsubscribes from the given topic, only if there are no other subscribers of the given topic // if hard is true, we will unsubscribe the topic even if there are more subscribers. -func (ctrl *topicsCtrl) Unsubscribe(logger *zap.Logger, name string, hard bool) error { +func (ctrl *topicsCtrl) Unsubscribe(name string, hard bool) error { name = commons.GetTopicFullName(name) if !ctrl.container.Unsubscribe(name) { @@ -216,7 +216,7 @@ func (ctrl *topicsCtrl) Unsubscribe(logger *zap.Logger, name string, hard bool) if ctrl.msgValidator != nil { err := ctrl.ps.UnregisterTopicValidator(name) if err != nil { - logger.Debug("could not unregister msg validator", zap.String("topic", name), zap.Error(err)) + ctrl.logger.Debug("could not unregister msg validator", zap.String("topic", name), zap.Error(err)) } } ctrl.subFilter.(Whitelist).Deregister(name) @@ -227,32 +227,32 @@ func (ctrl *topicsCtrl) Unsubscribe(logger *zap.Logger, name string, hard bool) // start will listen to *pubsub.Subscription, // if some error happened we try to leave and rejoin the topic // the loop stops once a topic is unsubscribed and therefore not listed -func (ctrl *topicsCtrl) start(logger *zap.Logger, name string, sub *pubsub.Subscription) { +func (ctrl *topicsCtrl) start(name string, sub *pubsub.Subscription) { for ctrl.ctx.Err() == nil { - err := ctrl.listen(logger, sub) + err := ctrl.listen(sub) if err == nil { return } // rejoin in case failed - logger.Debug("could not listen to topic", zap.String("topic", name), zap.Error(err)) + ctrl.logger.Debug("could not listen to topic", zap.String("topic", name), zap.Error(err)) ctrl.container.Unsubscribe(name) _ = ctrl.container.Leave(name) sub, err = ctrl.container.Subscribe(name) if err == nil { continue } - logger.Debug("could not rejoin topic", zap.String("topic", name), zap.Error(err)) + ctrl.logger.Debug("could not rejoin topic", zap.String("topic", name), zap.Error(err)) } } // listen handles incoming messages from the topic -func (ctrl *topicsCtrl) listen(logger *zap.Logger, sub *pubsub.Subscription) error { +func (ctrl *topicsCtrl) listen(sub *pubsub.Subscription) error { ctx, cancel := context.WithCancel(ctrl.ctx) defer cancel() topicName := sub.Topic() - logger = logger.With(zap.String("topic", topicName)) + logger := ctrl.logger.With(zap.String("topic", topicName)) logger.Debug("start listening to topic") for ctx.Err() == nil { msg, err := sub.Next(ctx) diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 39746be94e..8f738c14c0 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -110,14 +110,14 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P, // listen to topics for _, cid := range cids { for _, p := range peers { - require.NoError(t, p.tm.Subscribe(logger, committeeTopic(cid))) + require.NoError(t, p.tm.Subscribe(committeeTopic(cid))) // simulate concurrency, by trying to subscribe multiple times go func(tm Controller, cid string) { - require.NoError(t, tm.Subscribe(logger, committeeTopic(cid))) + require.NoError(t, tm.Subscribe(committeeTopic(cid))) }(p.tm, cid) go func(tm Controller, cid string) { <-time.After(100 * time.Millisecond) - require.NoError(t, tm.Subscribe(logger, committeeTopic(cid))) + require.NoError(t, tm.Subscribe(committeeTopic(cid))) }(p.tm, cid) } } @@ -182,13 +182,13 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P, topic := committeeTopic(cid) topicFullName := commons.GetTopicFullName(topic) - err := p.tm.Unsubscribe(logger, topic, false) + err := p.tm.Unsubscribe(topic, false) require.NoError(t, err) go func(p *P) { <-time.After(time.Millisecond) - err := p.tm.Unsubscribe(logger, topic, false) + err := p.tm.Unsubscribe(topic, false) require.ErrorContains(t, err, fmt.Sprintf("failed to unsubscribe from topic %s: not subscribed", topicFullName)) }(p) @@ -197,7 +197,7 @@ func baseTest(t *testing.T, ctx context.Context, logger *zap.Logger, peers []*P, defer wg.Done() <-time.After(time.Millisecond * 50) - err := p.tm.Unsubscribe(logger, topic, false) + err := p.tm.Unsubscribe(topic, false) require.ErrorContains(t, err, fmt.Sprintf("failed to unsubscribe from topic %s: not subscribed", topicFullName)) }(p) }(p, cids[i]) @@ -211,7 +211,7 @@ func banningTest(t *testing.T, logger *zap.Logger, peers []*P, cids []string, sc for _, cid := range cids { for _, p := range peers { - require.NoError(t, p.tm.Subscribe(logger, committeeTopic(cid))) + require.NoError(t, p.tm.Subscribe(committeeTopic(cid))) } } @@ -420,7 +420,7 @@ func newPeer(ctx context.Context, logger *zap.Logger, t *testing.T, msgValidator atomic.AddUint64(&p.connsCount, 1) }, }) - require.NoError(t, ds.Bootstrap(logger, func(e discovery.PeerEvent) { + require.NoError(t, ds.Bootstrap(func(e discovery.PeerEvent) { _ = h.Connect(ctx, e.AddrInfo) })) diff --git a/operator/duties/attester_test.go b/operator/duties/attester_test.go index d09097f0f3..8c9ca573ba 100644 --- a/operator/duties/attester_test.go +++ b/operator/duties/attester_test.go @@ -336,7 +336,7 @@ func TestScheduler_Attester_Reorg_Previous_Epoch_Transition(t *testing.T) { PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -358,7 +358,7 @@ func TestScheduler_Attester_Reorg_Previous_Epoch_Transition(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for attester duties to be fetched again for the current epoch @@ -420,7 +420,7 @@ func TestScheduler_Attester_Reorg_Previous_Epoch_Transition_Indices_Changed(t *t PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -442,7 +442,7 @@ func TestScheduler_Attester_Reorg_Previous_Epoch_Transition_Indices_Changed(t *t ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: trigger indices change @@ -511,7 +511,7 @@ func TestScheduler_Attester_Reorg_Previous(t *testing.T) { PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -534,7 +534,7 @@ func TestScheduler_Attester_Reorg_Previous(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for no action to be taken @@ -593,7 +593,7 @@ func TestScheduler_Attester_Reorg_Previous_Indices_Change_Same_Slot(t *testing.T PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -616,7 +616,7 @@ func TestScheduler_Attester_Reorg_Previous_Indices_Change_Same_Slot(t *testing.T ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: trigger indices change @@ -686,7 +686,7 @@ func TestScheduler_Attester_Reorg_Current(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -708,7 +708,7 @@ func TestScheduler_Attester_Reorg_Current(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for attester duties to be fetched again for the current epoch @@ -776,7 +776,7 @@ func TestScheduler_Attester_Reorg_Current_Indices_Changed(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -798,7 +798,7 @@ func TestScheduler_Attester_Reorg_Current_Indices_Changed(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: trigger indices change @@ -886,7 +886,7 @@ func TestScheduler_Attester_Early_Block(t *testing.T) { Slot: currentSlot.Get(), }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) require.Less(t, time.Since(startTime), scheduler.network.Beacon.SlotDurationSec()/3) diff --git a/operator/duties/committee.go b/operator/duties/committee.go index f2232907f1..adf4e23059 100644 --- a/operator/duties/committee.go +++ b/operator/duties/committee.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/logging/fields" "github.com/ssvlabs/ssv/operator/duties/dutystore" ) @@ -79,7 +80,7 @@ func (h *CommitteeHandler) processExecution(ctx context.Context, period uint64, } committeeMap := h.buildCommitteeDuties(attDuties, syncDuties, epoch, slot) - h.dutiesExecutor.ExecuteCommitteeDuties(ctx, h.logger, committeeMap) + h.dutiesExecutor.ExecuteCommitteeDuties(ctx, committeeMap) } func (h *CommitteeHandler) buildCommitteeDuties(attDuties []*eth2apiv1.AttesterDuty, syncDuties []*eth2apiv1.SyncCommitteeDuty, epoch phase0.Epoch, slot phase0.Slot) committeeDutiesMap { diff --git a/operator/duties/committee_test.go b/operator/duties/committee_test.go index bd3478bd52..46129f955d 100644 --- a/operator/duties/committee_test.go +++ b/operator/duties/committee_test.go @@ -701,7 +701,7 @@ func TestScheduler_Committee_Reorg_Previous_Epoch_Transition_Attester_only(t *te PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -723,7 +723,7 @@ func TestScheduler_Committee_Reorg_Previous_Epoch_Transition_Attester_only(t *te ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) // wait for attester duties to be fetched again for the current epoch waitForDutiesFetchCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // no execution should happen in slot 64 @@ -806,7 +806,7 @@ func TestScheduler_Committee_Reorg_Previous_Epoch_Transition_Indices_Changed_Att PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -828,7 +828,7 @@ func TestScheduler_Committee_Reorg_Previous_Epoch_Transition_Indices_Changed_Att ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) // wait for attester duties to be fetched waitForDutiesFetchCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // no execution should happen in slot 64 @@ -912,7 +912,7 @@ func TestScheduler_Committee_Reorg_Previous_Attester_only(t *testing.T) { PreviousDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -934,7 +934,7 @@ func TestScheduler_Committee_Reorg_Previous_Attester_only(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) // wait for attester duties to be fetched again for the current epoch waitForDutiesFetchCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // no execution should happen in slot 33 @@ -1019,7 +1019,7 @@ func TestScheduler_Committee_Early_Block_Attester_Only(t *testing.T) { Slot: currentSlot.Get(), }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesExecutionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout, committeeMap) require.Less(t, time.Since(startTime), scheduler.network.Beacon.SlotDurationSec()/3) @@ -1093,7 +1093,7 @@ func TestScheduler_Committee_Early_Block(t *testing.T) { Slot: currentSlot.Get(), }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesExecutionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout, committeeMap) require.Less(t, time.Since(startTime), scheduler.network.Beacon.SlotDurationSec()/3) diff --git a/operator/duties/proposer_test.go b/operator/duties/proposer_test.go index e9a8b5d2a7..a7ea743d4f 100644 --- a/operator/duties/proposer_test.go +++ b/operator/duties/proposer_test.go @@ -313,7 +313,7 @@ func TestScheduler_Proposer_Reorg_Current(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -335,7 +335,7 @@ func TestScheduler_Proposer_Reorg_Current(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for proposer duties to be fetched again for the current epoch. @@ -389,7 +389,7 @@ func TestScheduler_Proposer_Reorg_Current_Indices_Changed(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -411,7 +411,7 @@ func TestScheduler_Proposer_Reorg_Current_Indices_Changed(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(1), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: trigger a change in active indices in the same slot diff --git a/operator/duties/scheduler.go b/operator/duties/scheduler.go index 5362191ccd..719eb3ed58 100644 --- a/operator/duties/scheduler.go +++ b/operator/duties/scheduler.go @@ -17,6 +17,7 @@ import ( "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/beacon/goclient" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/logging/fields" @@ -39,7 +40,7 @@ const ( // DutiesExecutor is an interface for executing duties. type DutiesExecutor interface { ExecuteDuties(ctx context.Context, logger *zap.Logger, duties []*spectypes.ValidatorDuty) - ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logger, duties committeeDutiesMap) + ExecuteCommitteeDuties(ctx context.Context, duties committeeDutiesMap) } // DutyExecutor is an interface for executing duty. @@ -89,6 +90,7 @@ type SchedulerOptions struct { } type Scheduler struct { + logger *zap.Logger beaconNode BeaconNode executionClient ExecutionClient network networkconfig.NetworkConfig @@ -112,13 +114,14 @@ type Scheduler struct { previousDutyDependentRoot phase0.Root } -func NewScheduler(opts *SchedulerOptions) *Scheduler { +func NewScheduler(logger *zap.Logger, opts *SchedulerOptions) *Scheduler { dutyStore := opts.DutyStore if dutyStore == nil { dutyStore = dutystore.New() } s := &Scheduler{ + logger: logger.Named(logging.NameDutyScheduler), beaconNode: opts.BeaconNode, executionClient: opts.ExecutionClient, network: opts.Network, @@ -155,13 +158,12 @@ type ReorgEvent struct { // Start initializes the Scheduler and begins its operation. // Note: This function includes blocking operations, especially within the handler's HandleInitialDuties call, // which will block until initial duties are fully handled. -func (s *Scheduler) Start(ctx context.Context, logger *zap.Logger) error { - logger = logger.Named(logging.NameDutyScheduler) - logger.Info("duty scheduler started") +func (s *Scheduler) Start(ctx context.Context) error { + s.logger.Info("duty scheduler started") // Subscribe to head events. This allows us to go early for attestations & sync committees if a block arrives, // as well as re-request duties if there is a change in beacon block. - if err := s.beaconNode.Events(ctx, []string{"head"}, s.HandleHeadEvent(logger)); err != nil { + if err := s.beaconNode.Events(ctx, []string{"head"}, s.HandleHeadEvent()); err != nil { return fmt.Errorf("failed to subscribe to head events: %w", err) } @@ -178,7 +180,7 @@ func (s *Scheduler) Start(ctx context.Context, logger *zap.Logger) error { handler.Setup( handler.Name(), - logger, + s.logger, s.beaconNode, s.executionClient, s.network, @@ -273,7 +275,7 @@ func (s *Scheduler) SlotTicker(ctx context.Context) { } // HandleHeadEvent handles the "head" events from the beacon node. -func (s *Scheduler) HandleHeadEvent(logger *zap.Logger) func(event *eth2apiv1.Event) { +func (s *Scheduler) HandleHeadEvent() func(event *eth2apiv1.Event) { return func(event *eth2apiv1.Event) { if event.Data == nil { return @@ -289,7 +291,7 @@ func (s *Scheduler) HandleHeadEvent(logger *zap.Logger) func(event *eth2apiv1.Ev // check for reorg epoch := s.network.Beacon.EstimatedEpochAtSlot(data.Slot) buildStr := fmt.Sprintf("e%v-s%v-#%v", epoch, data.Slot, data.Slot%32+1) - logger := logger.With(zap.String("epoch_slot_pos", buildStr)) + logger := s.logger.With(zap.String("epoch_slot_pos", buildStr)) if s.lastBlockEpoch != 0 { if epoch > s.lastBlockEpoch { // Change of epoch. @@ -361,7 +363,7 @@ func (s *Scheduler) HandleHeadEvent(logger *zap.Logger) func(event *eth2apiv1.Ev func (s *Scheduler) ExecuteDuties(ctx context.Context, logger *zap.Logger, duties []*spectypes.ValidatorDuty) { for _, duty := range duties { duty := duty - logger := s.loggerWithDutyContext(logger, duty) + logger := s.loggerWithDutyContext(duty) slotDelay := time.Since(s.network.Beacon.GetSlotStartTime(duty.Slot)) if slotDelay >= 100*time.Millisecond { logger.Debug("⚠️ late duty execution", zap.Int64("slot_delay", slotDelay.Milliseconds())) @@ -378,10 +380,10 @@ func (s *Scheduler) ExecuteDuties(ctx context.Context, logger *zap.Logger, dutie } // ExecuteCommitteeDuties tries to execute the given committee duties -func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logger, duties committeeDutiesMap) { +func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, duties committeeDutiesMap) { for _, committee := range duties { duty := committee.duty - logger := s.loggerWithCommitteeDutyContext(logger, committee) + logger := s.loggerWithCommitteeDutyContext(committee) // TODO: extract this in dutyExecutor (validator controller), don't pass logger to ExecuteCommitteeDuty dutyEpoch := s.network.Beacon.EstimatedEpochAtSlot(duty.Slot) logger.Debug("🔧 executing committee duty", fields.Duties(dutyEpoch, duty.ValidatorDuties)) @@ -399,8 +401,8 @@ func (s *Scheduler) ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logg } // loggerWithDutyContext returns an instance of logger with the given duty's information -func (s *Scheduler) loggerWithDutyContext(logger *zap.Logger, duty *spectypes.ValidatorDuty) *zap.Logger { - return logger. +func (s *Scheduler) loggerWithDutyContext(duty *spectypes.ValidatorDuty) *zap.Logger { + return s.logger. With(fields.BeaconRole(duty.Type)). With(zap.Uint64("committee_index", uint64(duty.CommitteeIndex))). With(fields.CurrentSlot(s.network.Beacon.EstimatedCurrentSlot())). @@ -411,12 +413,12 @@ func (s *Scheduler) loggerWithDutyContext(logger *zap.Logger, duty *spectypes.Va } // loggerWithCommitteeDutyContext returns an instance of logger with the given committee duty's information -func (s *Scheduler) loggerWithCommitteeDutyContext(logger *zap.Logger, committeeDuty *committeeDuty) *zap.Logger { +func (s *Scheduler) loggerWithCommitteeDutyContext(committeeDuty *committeeDuty) *zap.Logger { duty := committeeDuty.duty dutyEpoch := s.network.Beacon.EstimatedEpochAtSlot(duty.Slot) committeeDutyID := fields.FormatCommitteeDutyID(committeeDuty.operatorIDs, dutyEpoch, duty.Slot) - return logger. + return s.logger. With(fields.CommitteeID(committeeDuty.id)). With(fields.DutyID(committeeDutyID)). With(fields.Role(duty.RunnerRole())). diff --git a/operator/duties/scheduler_mock.go b/operator/duties/scheduler_mock.go index 12a945bb3e..a63a62b493 100644 --- a/operator/duties/scheduler_mock.go +++ b/operator/duties/scheduler_mock.go @@ -28,6 +28,7 @@ import ( type MockDutiesExecutor struct { ctrl *gomock.Controller recorder *MockDutiesExecutorMockRecorder + isgomock struct{} } // MockDutiesExecutorMockRecorder is the mock recorder for MockDutiesExecutor. @@ -48,15 +49,15 @@ func (m *MockDutiesExecutor) EXPECT() *MockDutiesExecutorMockRecorder { } // ExecuteCommitteeDuties mocks base method. -func (m *MockDutiesExecutor) ExecuteCommitteeDuties(ctx context.Context, logger *zap.Logger, duties committeeDutiesMap) { +func (m *MockDutiesExecutor) ExecuteCommitteeDuties(ctx context.Context, duties committeeDutiesMap) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ExecuteCommitteeDuties", ctx, logger, duties) + m.ctrl.Call(m, "ExecuteCommitteeDuties", ctx, duties) } // ExecuteCommitteeDuties indicates an expected call of ExecuteCommitteeDuties. -func (mr *MockDutiesExecutorMockRecorder) ExecuteCommitteeDuties(ctx, logger, duties any) *gomock.Call { +func (mr *MockDutiesExecutorMockRecorder) ExecuteCommitteeDuties(ctx, duties any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecuteCommitteeDuties", reflect.TypeOf((*MockDutiesExecutor)(nil).ExecuteCommitteeDuties), ctx, logger, duties) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecuteCommitteeDuties", reflect.TypeOf((*MockDutiesExecutor)(nil).ExecuteCommitteeDuties), ctx, duties) } // ExecuteDuties mocks base method. @@ -75,6 +76,7 @@ func (mr *MockDutiesExecutorMockRecorder) ExecuteDuties(ctx, logger, duties any) type MockDutyExecutor struct { ctrl *gomock.Controller recorder *MockDutyExecutorMockRecorder + isgomock struct{} } // MockDutyExecutorMockRecorder is the mock recorder for MockDutyExecutor. @@ -122,6 +124,7 @@ func (mr *MockDutyExecutorMockRecorder) ExecuteDuty(ctx, logger, duty any) *gomo type MockBeaconNode struct { ctrl *gomock.Controller recorder *MockBeaconNodeMockRecorder + isgomock struct{} } // MockBeaconNodeMockRecorder is the mock recorder for MockBeaconNode. @@ -232,6 +235,7 @@ func (mr *MockBeaconNodeMockRecorder) SyncCommitteeDuties(ctx, epoch, indices an type MockExecutionClient struct { ctrl *gomock.Controller recorder *MockExecutionClientMockRecorder + isgomock struct{} } // MockExecutionClientMockRecorder is the mock recorder for MockExecutionClient. @@ -270,6 +274,7 @@ func (mr *MockExecutionClientMockRecorder) BlockByNumber(ctx, blockNumber any) * type MockValidatorProvider struct { ctrl *gomock.Controller recorder *MockValidatorProviderMockRecorder + isgomock struct{} } // MockValidatorProviderMockRecorder is the mock recorder for MockValidatorProvider. @@ -336,6 +341,7 @@ func (mr *MockValidatorProviderMockRecorder) Validator(pubKey any) *gomock.Call type MockValidatorController struct { ctrl *gomock.Controller recorder *MockValidatorControllerMockRecorder + isgomock struct{} } // MockValidatorControllerMockRecorder is the mock recorder for MockValidatorController. diff --git a/operator/duties/scheduler_test.go b/operator/duties/scheduler_test.go index 685187f2e3..e02d99c1e4 100644 --- a/operator/duties/scheduler_test.go +++ b/operator/duties/scheduler_test.go @@ -105,7 +105,7 @@ func setupSchedulerAndMocks(t *testing.T, handlers []dutyHandler, currentSlot *S }, } - s := NewScheduler(opts) + s := NewScheduler(logger, opts) s.blockPropagateDelay = 1 * time.Millisecond s.indicesChg = make(chan struct{}) s.handlers = handlers @@ -144,7 +144,7 @@ func setupSchedulerAndMocks(t *testing.T, handlers []dutyHandler, currentSlot *S schedulerPool := pool.New().WithErrors().WithContext(ctx) startFunction := func() { - err := s.Start(ctx, logger) + err := s.Start(ctx) require.NoError(t, err) schedulerPool.Go(func(ctx context.Context) error { @@ -359,7 +359,7 @@ func TestScheduler_Run(t *testing.T) { }, } - s := NewScheduler(opts) + s := NewScheduler(logger, opts) // add multiple mock duty handlers s.handlers = []dutyHandler{mockDutyHandler1, mockDutyHandler2} @@ -377,7 +377,7 @@ func TestScheduler_Run(t *testing.T) { mockDutyHandler.(*MockdutyHandler).EXPECT().Name().Times(1) } - require.NoError(t, s.Start(ctx, logger)) + require.NoError(t, s.Start(ctx)) // Cancel the context and test that the scheduler stops. cancel() @@ -408,13 +408,13 @@ func TestScheduler_Regression_IndicesChangeStuck(t *testing.T) { IndicesChg: make(chan struct{}), } - s := NewScheduler(opts) + s := NewScheduler(logger, opts) // add multiple mock duty handlers s.handlers = []dutyHandler{NewValidatorRegistrationHandler()} mockBeaconNode.EXPECT().Events(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockTicker.EXPECT().Next().Return(nil).AnyTimes() - err := s.Start(ctx, logger) + err := s.Start(ctx) require.NoError(t, err) s.indicesChg <- struct{}{} // first time make fanout stuck diff --git a/operator/duties/sync_committee_test.go b/operator/duties/sync_committee_test.go index 230cf4ded4..343dc45e7f 100644 --- a/operator/duties/sync_committee_test.go +++ b/operator/duties/sync_committee_test.go @@ -472,7 +472,7 @@ func TestScheduler_SyncCommittee_Reorg_Current(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -493,7 +493,7 @@ func TestScheduler_SyncCommittee_Reorg_Current(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(2), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for sync committee duties to be fetched again for the current epoch @@ -573,7 +573,7 @@ func TestScheduler_SyncCommittee_Reorg_Current_Indices_Changed(t *testing.T) { CurrentDutyDependentRoot: phase0.Root{0x01}, }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action @@ -594,7 +594,7 @@ func TestScheduler_SyncCommittee_Reorg_Current_Indices_Changed(t *testing.T) { ValidatorIndex: phase0.ValidatorIndex(2), }, }) - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: trigger a change in active indices @@ -685,7 +685,7 @@ func TestScheduler_SyncCommittee_Early_Block(t *testing.T) { Slot: currentSlot.Get(), }, } - scheduler.HandleHeadEvent(logger)(e) + scheduler.HandleHeadEvent()(e) waitForDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) require.Greater(t, time.Since(startTime), scheduler.network.Beacon.SlotDurationSec()/3) diff --git a/operator/fee_recipient/controller.go b/operator/fee_recipient/controller.go index 7ebccd2408..6a26f8f638 100644 --- a/operator/fee_recipient/controller.go +++ b/operator/fee_recipient/controller.go @@ -7,20 +7,21 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + "go.uber.org/zap" + "github.com/ssvlabs/ssv/networkconfig" operatordatastore "github.com/ssvlabs/ssv/operator/datastore" "github.com/ssvlabs/ssv/operator/slotticker" beaconprotocol "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" "github.com/ssvlabs/ssv/protocol/v2/types" "github.com/ssvlabs/ssv/registry/storage" - "go.uber.org/zap" ) //go:generate mockgen -package=mocks -destination=./mocks/controller.go -source=./controller.go // RecipientController submit proposal preparation to beacon node for all committee validators type RecipientController interface { - Start(logger *zap.Logger) + Start() } // ControllerOptions holds the needed dependencies @@ -36,6 +37,7 @@ type ControllerOptions struct { // recipientController implementation of RecipientController type recipientController struct { + logger *zap.Logger ctx context.Context beaconClient beaconprotocol.BeaconNode network networkconfig.NetworkConfig @@ -45,8 +47,9 @@ type recipientController struct { operatorDataStore operatordatastore.OperatorDataStore } -func NewController(opts *ControllerOptions) *recipientController { +func NewController(logger *zap.Logger, opts *ControllerOptions) *recipientController { return &recipientController{ + logger: logger, ctx: opts.Ctx, beaconClient: opts.BeaconClient, network: opts.Network, @@ -57,8 +60,8 @@ func NewController(opts *ControllerOptions) *recipientController { } } -func (rc *recipientController) Start(logger *zap.Logger) { - rc.listenToTicker(logger) +func (rc *recipientController) Start() { + rc.listenToTicker() } // listenToTicker loop over the given slot channel @@ -66,7 +69,7 @@ func (rc *recipientController) Start(logger *zap.Logger) { // in addition, submitting "same data" every slot is not efficient and can overload beacon node // instead we can subscribe to beacon node events and submit only when there is // a new fee recipient event (or new validator) was handled or when there is a syncing issue with beacon node -func (rc *recipientController) listenToTicker(logger *zap.Logger) { +func (rc *recipientController) listenToTicker() { firstTimeSubmitted := false ticker := rc.slotTickerProvider() for { @@ -78,14 +81,13 @@ func (rc *recipientController) listenToTicker(logger *zap.Logger) { } firstTimeSubmitted = true - err := rc.prepareAndSubmit(logger, slot) - if err != nil { - logger.Warn("could not submit proposal preparations", zap.Error(err)) + if err := rc.prepareAndSubmit(); err != nil { + rc.logger.Warn("could not submit proposal preparations", zap.Error(err)) } } } -func (rc *recipientController) prepareAndSubmit(logger *zap.Logger, slot phase0.Slot) error { +func (rc *recipientController) prepareAndSubmit() error { shares := rc.shareStorage.List( nil, storage.ByOperatorID(rc.operatorDataStore.GetOperatorID()), @@ -101,9 +103,9 @@ func (rc *recipientController) prepareAndSubmit(logger *zap.Logger, slot phase0. } batch := shares[start:end] - count, err := rc.submit(logger, batch) + count, err := rc.submit(batch) if err != nil { - logger.Warn("could not submit proposal preparation batch", + rc.logger.Warn("could not submit proposal preparation batch", zap.Int("start_index", start), zap.Error(err), ) @@ -112,14 +114,14 @@ func (rc *recipientController) prepareAndSubmit(logger *zap.Logger, slot phase0. submitted += count } - logger.Debug("✅ successfully submitted proposal preparations", + rc.logger.Debug("✅ successfully submitted proposal preparations", zap.Int("submitted", submitted), zap.Int("total", len(shares)), ) return nil } -func (rc *recipientController) submit(logger *zap.Logger, shares []*types.SSVShare) (int, error) { +func (rc *recipientController) submit(shares []*types.SSVShare) (int, error) { m, err := rc.toProposalPreparation(shares) if err != nil { return 0, errors.Wrap(err, "could not build proposal preparation batch") diff --git a/operator/fee_recipient/controller_test.go b/operator/fee_recipient/controller_test.go index 64e91aa01d..08aac9ea31 100644 --- a/operator/fee_recipient/controller_test.go +++ b/operator/fee_recipient/controller_test.go @@ -13,6 +13,10 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/common" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/networkconfig" operatordatastore "github.com/ssvlabs/ssv/operator/datastore" @@ -23,9 +27,6 @@ import ( registrystorage "github.com/ssvlabs/ssv/registry/storage" "github.com/ssvlabs/ssv/storage/basedb" "github.com/ssvlabs/ssv/storage/kv" - "github.com/stretchr/testify/require" - gomock "go.uber.org/mock/gomock" - "go.uber.org/zap" ) func TestSubmitProposal(t *testing.T) { @@ -44,7 +45,7 @@ func TestSubmitProposal(t *testing.T) { network := networkconfig.TestNetwork populateStorage(t, logger, shareStorage, operatorData) - frCtrl := NewController(&ControllerOptions{ + frCtrl := NewController(logger, &ControllerOptions{ Ctx: context.TODO(), Network: network, ShareStorage: shareStorage, @@ -76,7 +77,7 @@ func TestSubmitProposal(t *testing.T) { return ticker } - go frCtrl.Start(logger) + go frCtrl.Start() slots := []phase0.Slot{ 1, // first time @@ -116,7 +117,7 @@ func TestSubmitProposal(t *testing.T) { return ticker } - go frCtrl.Start(logger) + go frCtrl.Start() mockTimeChan <- time.Now() wg.Add(2) wg.Wait() diff --git a/operator/fee_recipient/mocks/controller.go b/operator/fee_recipient/mocks/controller.go index ef3945ee3c..daafe8cafb 100644 --- a/operator/fee_recipient/mocks/controller.go +++ b/operator/fee_recipient/mocks/controller.go @@ -13,13 +13,13 @@ import ( reflect "reflect" gomock "go.uber.org/mock/gomock" - zap "go.uber.org/zap" ) // MockRecipientController is a mock of RecipientController interface. type MockRecipientController struct { ctrl *gomock.Controller recorder *MockRecipientControllerMockRecorder + isgomock struct{} } // MockRecipientControllerMockRecorder is the mock recorder for MockRecipientController. @@ -40,13 +40,13 @@ func (m *MockRecipientController) EXPECT() *MockRecipientControllerMockRecorder } // Start mocks base method. -func (m *MockRecipientController) Start(logger *zap.Logger) { +func (m *MockRecipientController) Start() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Start", logger) + m.ctrl.Call(m, "Start") } // Start indicates an expected call of Start. -func (mr *MockRecipientControllerMockRecorder) Start(logger any) *gomock.Call { +func (mr *MockRecipientControllerMockRecorder) Start() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRecipientController)(nil).Start), logger) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRecipientController)(nil).Start)) } diff --git a/operator/node.go b/operator/node.go index 22b44a0f1f..c78b370a80 100644 --- a/operator/node.go +++ b/operator/node.go @@ -44,6 +44,7 @@ type Options struct { } type Node struct { + logger *zap.Logger network networkconfig.NetworkConfig context context.Context validatorsCtrl validator.Controller @@ -63,6 +64,7 @@ type Node struct { // New is the constructor of Node func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provider, qbftStorage *qbftstorage.ParticipantStores) *Node { node := &Node{ + logger: logger.Named(logging.NameOperator), context: opts.Context, validatorsCtrl: opts.ValidatorController, validatorOptions: opts.ValidatorOptions, @@ -72,7 +74,7 @@ func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provide net: opts.P2PNetwork, storage: opts.ValidatorOptions.RegistryStorage, qbftStorage: qbftStorage, - dutyScheduler: duties.NewScheduler(&duties.SchedulerOptions{ + dutyScheduler: duties.NewScheduler(logger, &duties.SchedulerOptions{ Ctx: opts.Context, BeaconNode: opts.BeaconNode, ExecutionClient: opts.ExecutionClient, @@ -86,7 +88,7 @@ func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provide SlotTickerProvider: slotTickerProvider, P2PNetwork: opts.P2PNetwork, }), - feeRecipientCtrl: fee_recipient.NewController(&fee_recipient.ControllerOptions{ + feeRecipientCtrl: fee_recipient.NewController(logger, &fee_recipient.ControllerOptions{ Ctx: opts.Context, BeaconClient: opts.BeaconNode, Network: opts.Network, @@ -104,13 +106,11 @@ func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provide } // Start starts to stream duties and run IBFT instances -func (n *Node) Start(logger *zap.Logger) error { - logger = logger.Named(logging.NameOperator) - - logger.Info("All required services are ready. OPERATOR SUCCESSFULLY CONFIGURED AND NOW RUNNING!") +func (n *Node) Start() error { + n.logger.Info("All required services are ready. OPERATOR SUCCESSFULLY CONFIGURED AND NOW RUNNING!") go func() { - err := n.startWSServer(logger) + err := n.startWSServer() if err != nil { // TODO: think if we need to panic return @@ -119,7 +119,7 @@ func (n *Node) Start(logger *zap.Logger) error { // Start the duty scheduler, and a background goroutine to crash the node // in case there were any errors. - if err := n.dutyScheduler.Start(n.context, logger); err != nil { + if err := n.dutyScheduler.Start(n.context); err != nil { return fmt.Errorf("failed to run duty scheduler: %w", err) } @@ -127,22 +127,22 @@ func (n *Node) Start(logger *zap.Logger) error { if n.validatorOptions.Exporter { // Subscribe to all subnets. - err := n.net.SubscribeAll(logger) + err := n.net.SubscribeAll() if err != nil { - logger.Error("failed to subscribe to all subnets", zap.Error(err)) + n.logger.Error("failed to subscribe to all subnets", zap.Error(err)) } } - go n.net.UpdateSubnets(logger) - go n.net.UpdateScoreParams(logger) + go n.net.UpdateSubnets() + go n.net.UpdateScoreParams() n.validatorsCtrl.StartValidators(n.context) - go n.reportOperators(logger) + go n.reportOperators() - go n.feeRecipientCtrl.Start(logger) + go n.feeRecipientCtrl.Start() go n.validatorsCtrl.HandleMetadataUpdates(n.context) go n.validatorsCtrl.ReportValidatorStatuses(n.context) if err := n.dutyScheduler.Wait(); err != nil { - logger.Fatal("duty scheduler exited with error", zap.Error(err)) + n.logger.Fatal("duty scheduler exited with error", zap.Error(err)) } return nil @@ -157,29 +157,32 @@ func (n *Node) HealthCheck() error { } // handleQueryRequests waits for incoming messages and -func (n *Node) handleQueryRequests(logger *zap.Logger, nm *api.NetworkMessage) { +func (n *Node) handleQueryRequests(nm *api.NetworkMessage) { if nm.Err != nil { nm.Msg = api.Message{Type: api.TypeError, Data: []string{"could not parse network message"}} } - logger.Debug("got incoming export request", + n.logger.Debug("got incoming export request", zap.String("type", string(nm.Msg.Type))) + + h := api.NewHandler(n.logger) + switch nm.Msg.Type { case api.TypeDecided: - api.HandleParticipantsQuery(logger, n.qbftStorage, nm, n.network.DomainType) + h.HandleParticipantsQuery(n.qbftStorage, nm, n.network.DomainType) case api.TypeError: - api.HandleErrorQuery(logger, nm) + h.HandleErrorQuery(nm) default: - api.HandleUnknownQuery(logger, nm) + h.HandleUnknownQuery(nm) } } -func (n *Node) startWSServer(logger *zap.Logger) error { +func (n *Node) startWSServer() error { if n.ws != nil { - logger.Info("starting WS server") + n.logger.Info("starting WS server") n.ws.UseQueryHandler(n.handleQueryRequests) - if err := n.ws.Start(logger, fmt.Sprintf(":%d", n.wsAPIPort)); err != nil { + if err := n.ws.Start(fmt.Sprintf(":%d", n.wsAPIPort)); err != nil { return err } } @@ -187,15 +190,15 @@ func (n *Node) startWSServer(logger *zap.Logger) error { return nil } -func (n *Node) reportOperators(logger *zap.Logger) { +func (n *Node) reportOperators() { operators, err := n.storage.ListOperators(nil, 0, 1000) // TODO more than 1000? if err != nil { - logger.Warn("failed to get all operators for reporting", zap.Error(err)) + n.logger.Warn("failed to get all operators for reporting", zap.Error(err)) return } - logger.Debug("reporting operators", zap.Int("count", len(operators))) + n.logger.Debug("reporting operators", zap.Int("count", len(operators))) for i := range operators { - logger.Debug("report operator public key", + n.logger.Debug("report operator public key", fields.OperatorID(operators[i].ID), fields.PubKey(operators[i].PublicKey)) } diff --git a/operator/validator/controller.go b/operator/validator/controller.go index 7b095987bf..238126b7cc 100644 --- a/operator/validator/controller.go +++ b/operator/validator/controller.go @@ -16,6 +16,8 @@ import ( "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" + "go.uber.org/zap" + "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/logging/fields" @@ -43,7 +45,6 @@ import ( ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types" registrystorage "github.com/ssvlabs/ssv/registry/storage" "github.com/ssvlabs/ssv/storage/basedb" - "go.uber.org/zap" ) //go:generate mockgen -package=mocks -destination=./mocks/controller.go -source=./controller.go @@ -136,7 +137,7 @@ type SharesStorage interface { type P2PNetwork interface { protocolp2p.Broadcaster UseMessageRouter(router network.MessageRouter) - SubscribeRandoms(logger *zap.Logger, numSubnets int) error + SubscribeRandoms(numSubnets int) error ActiveSubnets() records.Subnets FixedSubnets() records.Subnets } @@ -459,7 +460,7 @@ func (c *controller) StartValidators(ctx context.Context) { if len(inited) == 0 { // If no validators were started and therefore we're not subscribed to any subnets, // then subscribe to a random subnet to participate in the network. - if err := c.network.SubscribeRandoms(c.logger, 1); err != nil { + if err := c.network.SubscribeRandoms(1); err != nil { c.logger.Error("failed to subscribe to random subnets", zap.Error(err)) } } diff --git a/operator/validator/controller_test.go b/operator/validator/controller_test.go index 1a8856362f..ccff8e5c22 100644 --- a/operator/validator/controller_test.go +++ b/operator/validator/controller_test.go @@ -18,6 +18,10 @@ import ( "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + "github.com/ssvlabs/ssv/ekm" ibftstorage "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/logging" @@ -41,9 +45,6 @@ import ( registrystoragemocks "github.com/ssvlabs/ssv/registry/storage/mocks" "github.com/ssvlabs/ssv/storage/basedb" "github.com/ssvlabs/ssv/storage/kv" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "go.uber.org/zap" ) const ( @@ -198,7 +199,6 @@ func TestSetupValidatorsExporter(t *testing.T) { } } }).AnyTimes() - sharesStorage.EXPECT().UpdateValidatorsMetadata(gomock.Any()).Return(nil).AnyTimes() recipientStorage.EXPECT().GetRecipientData(gomock.Any(), gomock.Any()).Return(recipientData, true, nil).AnyTimes() } @@ -354,8 +354,6 @@ func TestSetupValidators(t *testing.T) { ownerAddressBytes := decodeHex(t, "67Ce5c69260bd819B4e0AD13f4b873074D479811", "Failed to decode owner address") feeRecipientBytes := decodeHex(t, "45E668aba4b7fc8761331EC3CE77584B7A99A51A", "Failed to decode second fee recipient address") testValidator := setupTestValidator(ownerAddressBytes, feeRecipientBytes) - storageMu := sync.Mutex{} - storageData := make(map[string]*beacon.ValidatorMetadata) opStorage, done := newOperatorStorageForTest(logger) defer done() @@ -505,14 +503,6 @@ func TestSetupValidators(t *testing.T) { sharesStorage.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(func(_ basedb.Reader, pubKey []byte) (*types.SSVShare, bool) { return shareWithMetaData, true }).AnyTimes() - sharesStorage.EXPECT().UpdateValidatorsMetadata(gomock.Any()).DoAndReturn(func(pk string, metadata *beacon.ValidatorMetadata) error { - storageMu.Lock() - defer storageMu.Unlock() - - storageData[pk] = metadata - - return nil - }).AnyTimes() testValidatorsMap := map[spectypes.ValidatorPK]*validator.Validator{ createPubKey(byte('0')): testValidator, diff --git a/operator/validator/mocks/controller.go b/operator/validator/mocks/controller.go index 23eaac61ab..7c9c229053 100644 --- a/operator/validator/mocks/controller.go +++ b/operator/validator/mocks/controller.go @@ -19,7 +19,6 @@ import ( network "github.com/ssvlabs/ssv/network" records "github.com/ssvlabs/ssv/network/records" duties "github.com/ssvlabs/ssv/operator/duties" - beacon "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" validator "github.com/ssvlabs/ssv/protocol/v2/ssv/validator" types0 "github.com/ssvlabs/ssv/protocol/v2/types" storage "github.com/ssvlabs/ssv/registry/storage" @@ -32,6 +31,7 @@ import ( type MockController struct { ctrl *gomock.Controller recorder *MockControllerMockRecorder + isgomock struct{} } // MockControllerMockRecorder is the mock recorder for MockController. @@ -103,20 +103,6 @@ func (mr *MockControllerMockRecorder) ExitValidator(pubKey, blockNumber, validat return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExitValidator", reflect.TypeOf((*MockController)(nil).ExitValidator), pubKey, blockNumber, validatorIndex, ownValidator) } -// GetOperatorShares mocks base method. -func (m *MockController) GetOperatorShares() []*types0.SSVShare { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOperatorShares") - ret0, _ := ret[0].([]*types0.SSVShare) - return ret0 -} - -// GetOperatorShares indicates an expected call of GetOperatorShares. -func (mr *MockControllerMockRecorder) GetOperatorShares() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOperatorShares", reflect.TypeOf((*MockController)(nil).GetOperatorShares)) -} - // GetValidator mocks base method. func (m *MockController) GetValidator(pubKey types.ValidatorPK) (*validator.Validator, bool) { m.ctrl.T.Helper() @@ -149,6 +135,18 @@ func (mr *MockControllerMockRecorder) GetValidatorStats() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValidatorStats", reflect.TypeOf((*MockController)(nil).GetValidatorStats)) } +// HandleMetadataUpdates mocks base method. +func (m *MockController) HandleMetadataUpdates(ctx context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "HandleMetadataUpdates", ctx) +} + +// HandleMetadataUpdates indicates an expected call of HandleMetadataUpdates. +func (mr *MockControllerMockRecorder) HandleMetadataUpdates(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleMetadataUpdates", reflect.TypeOf((*MockController)(nil).HandleMetadataUpdates), ctx) +} + // IndicesChangeChan mocks base method. func (m *MockController) IndicesChangeChan() chan struct{} { m.ctrl.T.Helper() @@ -216,15 +214,15 @@ func (mr *MockControllerMockRecorder) StartNetworkHandlers() *gomock.Call { } // StartValidators mocks base method. -func (m *MockController) StartValidators(context.Context) { +func (m *MockController) StartValidators(ctx context.Context) { m.ctrl.T.Helper() - m.ctrl.Call(m, "StartValidators") + m.ctrl.Call(m, "StartValidators", ctx) } // StartValidators indicates an expected call of StartValidators. -func (mr *MockControllerMockRecorder) StartValidators() *gomock.Call { +func (mr *MockControllerMockRecorder) StartValidators(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartValidators", reflect.TypeOf((*MockController)(nil).StartValidators)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartValidators", reflect.TypeOf((*MockController)(nil).StartValidators), ctx) } // StopValidator mocks base method. @@ -273,6 +271,7 @@ func (mr *MockControllerMockRecorder) ValidatorExitChan() *gomock.Call { type MockRecipients struct { ctrl *gomock.Controller recorder *MockRecipientsMockRecorder + isgomock struct{} } // MockRecipientsMockRecorder is the mock recorder for MockRecipients. @@ -312,6 +311,7 @@ func (mr *MockRecipientsMockRecorder) GetRecipientData(r, owner any) *gomock.Cal type MockSharesStorage struct { ctrl *gomock.Controller recorder *MockSharesStorageMockRecorder + isgomock struct{} } // MockSharesStorageMockRecorder is the mock recorder for MockSharesStorage. @@ -377,24 +377,11 @@ func (mr *MockSharesStorageMockRecorder) Range(txn, fn any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockSharesStorage)(nil).Range), txn, fn) } -// UpdateValidatorsMetadata mocks base method. -func (m *MockSharesStorage) UpdateValidatorsMetadata(arg0 map[types.ValidatorPK]*beacon.ValidatorMetadata) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateValidatorsMetadata", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// UpdateValidatorsMetadata indicates an expected call of UpdateValidatorsMetadata. -func (mr *MockSharesStorageMockRecorder) UpdateValidatorsMetadata(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateValidatorsMetadata", reflect.TypeOf((*MockSharesStorage)(nil).UpdateValidatorsMetadata), arg0) -} - // MockP2PNetwork is a mock of P2PNetwork interface. type MockP2PNetwork struct { ctrl *gomock.Controller recorder *MockP2PNetworkMockRecorder + isgomock struct{} } // MockP2PNetworkMockRecorder is the mock recorder for MockP2PNetwork. @@ -457,17 +444,17 @@ func (mr *MockP2PNetworkMockRecorder) FixedSubnets() *gomock.Call { } // SubscribeRandoms mocks base method. -func (m *MockP2PNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error { +func (m *MockP2PNetwork) SubscribeRandoms(numSubnets int) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SubscribeRandoms", logger, numSubnets) + ret := m.ctrl.Call(m, "SubscribeRandoms", numSubnets) ret0, _ := ret[0].(error) return ret0 } // SubscribeRandoms indicates an expected call of SubscribeRandoms. -func (mr *MockP2PNetworkMockRecorder) SubscribeRandoms(logger, numSubnets any) *gomock.Call { +func (mr *MockP2PNetworkMockRecorder) SubscribeRandoms(numSubnets any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeRandoms", reflect.TypeOf((*MockP2PNetwork)(nil).SubscribeRandoms), logger, numSubnets) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeRandoms", reflect.TypeOf((*MockP2PNetwork)(nil).SubscribeRandoms), numSubnets) } // UseMessageRouter mocks base method. diff --git a/protocol/v2/p2p/network.go b/protocol/v2/p2p/network.go index 5024adbe79..588cd66b6d 100644 --- a/protocol/v2/p2p/network.go +++ b/protocol/v2/p2p/network.go @@ -8,7 +8,6 @@ import ( "github.com/ssvlabs/ssv-spec/p2p" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" - "go.uber.org/zap" ) var ( @@ -20,7 +19,7 @@ var ( type Subscriber interface { p2p.Subscriber // Unsubscribe unsubscribes from the validator subnet - Unsubscribe(logger *zap.Logger, pk spectypes.ValidatorPK) error + Unsubscribe(pk spectypes.ValidatorPK) error // Peers returns the peers that are connected to the given validator } @@ -95,7 +94,7 @@ const ( // ValidationReporting is the interface for reporting on message validation results type ValidationReporting interface { // ReportValidation reports the result for the given message - ReportValidation(logger *zap.Logger, message *spectypes.SSVMessage, res MsgValidationResult) + ReportValidation(message *spectypes.SSVMessage, res MsgValidationResult) } // Network holds the networking layer used to complement the underlying protocols diff --git a/protocol/v2/qbft/storage/participant_store.go b/protocol/v2/qbft/storage/participant_store.go index fe6ff67b44..ac9dd9b55c 100644 --- a/protocol/v2/qbft/storage/participant_store.go +++ b/protocol/v2/qbft/storage/participant_store.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/attestantio/go-eth2-client/spec/phase0" - "go.uber.org/zap" specqbft "github.com/ssvlabs/ssv-spec/qbft" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -59,8 +58,8 @@ type ParticipantStore interface { GetParticipants(pk spectypes.ValidatorPK, slot phase0.Slot) ([]spectypes.OperatorID, error) // InitialSlotGC performs an initial cleanup (blocking) of slots bellow the retained threshold - Prune(ctx context.Context, logger *zap.Logger, below phase0.Slot) + Prune(ctx context.Context, below phase0.Slot) // SlotGC continuously removes old slots - PruneContinously(ctx context.Context, logger *zap.Logger, slotTickerProvider slotticker.Provider, retain phase0.Slot) + PruneContinously(ctx context.Context, slotTickerProvider slotticker.Provider, retain phase0.Slot) } diff --git a/protocol/v2/qbft/testing/storage.go b/protocol/v2/qbft/testing/storage.go index f41f955c3a..11a11b7363 100644 --- a/protocol/v2/qbft/testing/storage.go +++ b/protocol/v2/qbft/testing/storage.go @@ -5,10 +5,11 @@ import ( "sync" spectypes "github.com/ssvlabs/ssv-spec/types" + "go.uber.org/zap" + qbftstorage "github.com/ssvlabs/ssv/ibft/storage" "github.com/ssvlabs/ssv/storage/basedb" "github.com/ssvlabs/ssv/storage/kv" - "go.uber.org/zap" ) var db basedb.Database @@ -38,5 +39,5 @@ var allRoles = []spectypes.BeaconRole{ } func TestingStores(logger *zap.Logger) *qbftstorage.ParticipantStores { - return qbftstorage.NewStoresFromRoles(getDB(logger), allRoles...) + return qbftstorage.NewStoresFromRoles(logger, getDB(logger), allRoles...) } diff --git a/utils/boot_node/node.go b/utils/boot_node/node.go index a0c10df823..011506e59e 100644 --- a/utils/boot_node/node.go +++ b/utils/boot_node/node.go @@ -38,11 +38,12 @@ type Options struct { // Node represents the behavior of boot node type Node interface { // Start starts the SSV node - Start(ctx context.Context, logger *zap.Logger) error + Start(ctx context.Context) error } // bootNode implements Node interface type bootNode struct { + logger *zap.Logger privateKey string discv5port uint16 forkVersion []byte @@ -53,8 +54,9 @@ type bootNode struct { } // New is the constructor of ssvNode -func New(networkConfig networkconfig.NetworkConfig, opts Options) (Node, error) { +func New(logger *zap.Logger, networkConfig networkconfig.NetworkConfig, opts Options) (Node, error) { return &bootNode{ + logger: logger.Named(logging.NameBootNode), privateKey: opts.PrivateKey, discv5port: opts.UDPPort, forkVersion: []byte{0x00, 0x00, 0x20, 0x09}, @@ -66,15 +68,16 @@ func New(networkConfig networkconfig.NetworkConfig, opts Options) (Node, error) } type handler struct { + logger *zap.Logger listener discovery.Listener } -func (h *handler) httpHandler(logger *zap.Logger) func(w http.ResponseWriter, _ *http.Request) { +func (h *handler) httpHandler() func(w http.ResponseWriter, _ *http.Request) { return func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) write := func(w io.Writer, b []byte) { if _, err := w.Write(b); err != nil { - logger.Error("Failed to write to http response", zap.Error(err)) + h.logger.Error("Failed to write to http response", zap.Error(err)) } } allNodes := h.listener.AllNodes() @@ -91,31 +94,33 @@ func (h *handler) httpHandler(logger *zap.Logger) func(w http.ResponseWriter, _ } // Start implements Node interface -func (n *bootNode) Start(ctx context.Context, logger *zap.Logger) error { - logger = logger.Named(logging.NameBootNode) - privKey, err := utils.ECDSAPrivateKey(logger, n.privateKey) +func (n *bootNode) Start(ctx context.Context) error { + privKey, err := utils.ECDSAPrivateKey(n.logger, n.privateKey) if err != nil { log.Fatal("Failed to get p2p privateKey", zap.Error(err)) } + ipAddr, err := network.ExternalIP() // ipAddr = "127.0.0.1" log.Print("TEST Ip addr----", ipAddr) if err != nil { - logger.Fatal("Failed to get ExternalIP", zap.Error(err)) + n.logger.Fatal("Failed to get ExternalIP", zap.Error(err)) } - listener := n.createListener(logger, ipAddr, n.discv5port, privKey) + + listener := n.createListener(ipAddr, n.discv5port, privKey) node := listener.LocalNode().Node() - logger.Info("Running", + n.logger.Info("Running", zap.String("node", node.String()), zap.String("network", n.network.Name), fields.ProtocolID(n.network.DiscoveryProtocolID), ) handler := &handler{ + logger: n.logger, listener: listener, } mux := http.NewServeMux() - mux.HandleFunc("/p2p", handler.httpHandler(logger)) + mux.HandleFunc("/p2p", handler.httpHandler()) const timeout = 3 * time.Second @@ -133,11 +138,11 @@ func (n *bootNode) Start(ctx context.Context, logger *zap.Logger) error { return nil } -func (n *bootNode) createListener(logger *zap.Logger, ipAddr string, port uint16, privateKey *ecdsa.PrivateKey) discovery.Listener { +func (n *bootNode) createListener(ipAddr string, port uint16, privateKey *ecdsa.PrivateKey) discovery.Listener { // Create the UDP listener and the LocalNode record. ip := net.ParseIP(ipAddr) if ip.To4() == nil { - logger.Fatal("IPV4 address not provided", fields.Address(ipAddr)) + n.logger.Fatal("IPV4 address not provided", fields.Address(ipAddr)) } var bindIP net.IP var networkVersion string @@ -149,7 +154,7 @@ func (n *bootNode) createListener(logger *zap.Logger, ipAddr string, port uint16 bindIP = net.IPv4zero networkVersion = "udp4" default: - logger.Fatal("Valid ip address not provided", fields.Address(ipAddr)) + n.logger.Fatal("Valid ip address not provided", fields.Address(ipAddr)) } udpAddr := &net.UDPAddr{ IP: bindIP, @@ -159,7 +164,7 @@ func (n *bootNode) createListener(logger *zap.Logger, ipAddr string, port uint16 if err != nil { log.Fatal(err) } - localNode, err := n.createLocalNode(logger, privateKey, ip, port) + localNode, err := n.createLocalNode(privateKey, ip, port) if err != nil { log.Fatal(err) } @@ -175,7 +180,7 @@ func (n *bootNode) createListener(logger *zap.Logger, ipAddr string, port uint16 return listener } -func (n *bootNode) createLocalNode(logger *zap.Logger, privKey *ecdsa.PrivateKey, ipAddr net.IP, port uint16) (*enode.LocalNode, error) { +func (n *bootNode) createLocalNode(privKey *ecdsa.PrivateKey, ipAddr net.IP, port uint16) (*enode.LocalNode, error) { db, err := enode.OpenDB(filepath.Join(n.dbPath, "enode")) if err != nil { return nil, errors.Wrap(err, "Could not open node's peer database") @@ -183,9 +188,9 @@ func (n *bootNode) createLocalNode(logger *zap.Logger, privKey *ecdsa.PrivateKey external := net.ParseIP(n.externalIP) if n.externalIP == "" { external = ipAddr - logger.Info("Running with IP", zap.String("ip", ipAddr.String())) + n.logger.Info("Running with IP", zap.String("ip", ipAddr.String())) } else { - logger.Info("Running with External IP", zap.String("external_ip", n.externalIP)) + n.logger.Info("Running with External IP", zap.String("external_ip", n.externalIP)) } localNode := enode.NewLocalNode(db, privKey) diff --git a/utils/keys.go b/utils/keys.go index b513b68725..87e24f9bc7 100644 --- a/utils/keys.go +++ b/utils/keys.go @@ -5,12 +5,12 @@ import ( "crypto/rand" "encoding/hex" - "github.com/ssvlabs/ssv/logging/fields" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/pkg/errors" - "github.com/ssvlabs/ssv/network/commons" "go.uber.org/zap" + + "github.com/ssvlabs/ssv/logging/fields" + "github.com/ssvlabs/ssv/network/commons" ) // ECDSAPrivateKey extracts the ecdsa.PrivateKey from the given string or generate a new key