Skip to content

Commit

Permalink
avoid passing logger code-wide
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jan 21, 2025
1 parent b7fc230 commit 3181077
Show file tree
Hide file tree
Showing 56 changed files with 502 additions and 477 deletions.
14 changes: 7 additions & 7 deletions cli/bootnode/boot_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ import (
"fmt"
"log"

"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/utils/commons"

"github.com/ssvlabs/ssv/logging"

"github.com/ilyakaznacheev/cleanenv"
"github.com/spf13/cobra"
"go.uber.org/zap"

global_config "github.com/ssvlabs/ssv/cli/config"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/networkconfig"
bootnode "github.com/ssvlabs/ssv/utils/boot_node"
"github.com/ssvlabs/ssv/utils/commons"
)

type config struct {
Expand Down Expand Up @@ -60,11 +58,13 @@ var StartBootNodeCmd = &cobra.Command{
if err != nil {
logger.Fatal("failed to get network config", zap.Error(err))
}
bootNode, err := bootnode.New(networkConfig, cfg.Options)

bootNode, err := bootnode.New(logger, networkConfig, cfg.Options)
if err != nil {
logger.Fatal("failed to set up boot node", zap.Error(err))
}
if err := bootNode.Start(cmd.Context(), logger); err != nil {

if err := bootNode.Start(cmd.Context()); err != nil {
logger.Fatal("failed to start boot node", zap.Error(err))
}
},
Expand Down
16 changes: 8 additions & 8 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"github.com/ilyakaznacheev/cleanenv"
"github.com/pkg/errors"
"github.com/spf13/cobra"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/api/handlers"
apiserver "github.com/ssvlabs/ssv/api/server"
"github.com/ssvlabs/ssv/beacon/goclient"
Expand Down Expand Up @@ -265,7 +265,7 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorOptions.RecipientsStorage = nodeStorage

if cfg.WsAPIPort != 0 {
ws := exporterapi.NewWsServer(cmd.Context(), nil, http.NewServeMux(), cfg.WithPing)
ws := exporterapi.NewWsServer(cmd.Context(), zap.NewNop(), nil, http.NewServeMux(), cfg.WithPing)
cfg.SSVOptions.WS = ws
cfg.SSVOptions.WsAPIPort = cfg.WsAPIPort
cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(networkConfig, logger, ws)
Expand All @@ -286,7 +286,7 @@ var StartNodeCmd = &cobra.Command{
storageMap := ibftstorage.NewStores()

for _, storageRole := range storageRoles {
s := ibftstorage.New(cfg.SSVOptions.ValidatorOptions.DB, storageRole)
s := ibftstorage.New(logger, cfg.SSVOptions.ValidatorOptions.DB, storageRole)
storageMap.Add(storageRole, s)
}

Expand Down Expand Up @@ -428,7 +428,7 @@ var StartNodeCmd = &cobra.Command{
}
}()
}
if err := operatorNode.Start(logger); err != nil {
if err := operatorNode.Start(); err != nil {
logger.Fatal("failed to start SSV node", zap.Error(err))
}
},
Expand Down Expand Up @@ -800,9 +800,9 @@ func syncContractEvents(
func startMetricsHandler(logger *zap.Logger, db basedb.Database, port int, enableProf bool, opNode *operator.Node) {
logger = logger.Named(logging.NameMetricsHandler)
// init and start HTTP handler
metricsHandler := metrics.NewHandler(db, enableProf, opNode)
metricsHandler := metrics.NewHandler(logger, db, enableProf, opNode)
addr := fmt.Sprintf(":%d", port)
if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil {
if err := metricsHandler.Start(http.NewServeMux(), addr); err != nil {
logger.Panic("failed to serve metrics", zap.Error(err))
}
}
Expand All @@ -817,7 +817,7 @@ func initSlotPruning(ctx context.Context, logger *zap.Logger, stores *ibftstorag
wg.Add(1)
go func() {
defer wg.Done()
store.Prune(ctx, logger, threshold)
store.Prune(ctx, threshold)
}()
return nil
})
Expand All @@ -826,7 +826,7 @@ func initSlotPruning(ctx context.Context, logger *zap.Logger, stores *ibftstorag

// start background job for removing old slots on every tick
_ = stores.Each(func(_ spectypes.BeaconRole, store qbftstorage.ParticipantStore) error {
go store.PruneContinously(ctx, logger, slotTickerProvider, phase0.Slot(retain))
go store.PruneContinously(ctx, slotTickerProvider, phase0.Slot(retain))
return nil
})
}
14 changes: 8 additions & 6 deletions exporter/api/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// Broadcaster is an interface broadcasting stream message across all available connections
type Broadcaster interface {
FromFeed(logger *zap.Logger, feed *event.Feed) error
FromFeed(feed *event.Feed) error
Broadcast(msg Message) error
Register(conn broadcasted) bool
Deregister(conn broadcasted) bool
Expand All @@ -23,34 +23,36 @@ type broadcasted interface {
}

type broadcaster struct {
logger *zap.Logger
mut sync.Mutex
connections map[string]broadcasted
}

func newBroadcaster() Broadcaster {
func newBroadcaster(logger *zap.Logger) Broadcaster {
return &broadcaster{
logger: logger,
mut: sync.Mutex{},
connections: map[string]broadcasted{},
}
}

// FromFeed subscribes to the given feed and broadcasts incoming messages
func (b *broadcaster) FromFeed(logger *zap.Logger, msgFeed *event.Feed) error {
func (b *broadcaster) FromFeed(msgFeed *event.Feed) error {
cn := make(chan Message, 512)
sub := msgFeed.Subscribe(cn)
defer sub.Unsubscribe()
defer logger.Debug("done reading from feed")
defer b.logger.Debug("done reading from feed")

for {
select {
case msg := <-cn:
go func(msg Message) {
if err := b.Broadcast(msg); err != nil {
logger.Error("could not broadcast message", zap.Error(err))
b.logger.Error("could not broadcast message", zap.Error(err))
}
}(msg)
case err := <-sub.Err():
logger.Warn("could not read messages from msgFeed", zap.Error(err))
b.logger.Warn("could not read messages from msgFeed", zap.Error(err))
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/api/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ func TestConn_Send_FullQueue(t *testing.T) {

func TestBroadcaster(t *testing.T) {
logger := zaptest.NewLogger(t)
b := newBroadcaster()
b := newBroadcaster(logger)

feed := new(event.Feed)
go func() {
require.NoError(t, b.FromFeed(logger, feed))
require.NoError(t, b.FromFeed(feed))
}()
bm1 := newBroadcastedMock("1")
bm2 := newBroadcastedMock("2")
Expand Down
4 changes: 1 addition & 3 deletions exporter/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"net"
"time"

"go.uber.org/zap"
)

// Connection is an interface to abstract the actual websocket connection implementation
Expand All @@ -22,7 +20,7 @@ type NetworkMessage struct {
}

// QueryMessageHandler handles the given message
type QueryMessageHandler func(logger *zap.Logger, nm *NetworkMessage)
type QueryMessageHandler func(nm *NetworkMessage)

// ConnectionID calculates the id of the given Connection
func ConnectionID(conn Connection) string {
Expand Down
35 changes: 23 additions & 12 deletions exporter/api/query_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

"github.com/attestantio/go-eth2-client/spec/phase0"
spectypes "github.com/ssvlabs/ssv-spec/types"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
"go.uber.org/zap"

qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"

"github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/protocol/v2/message"
Expand All @@ -18,9 +19,19 @@ const (
unknownError = "unknown error"
)

type Handler struct {
logger *zap.Logger
}

func NewHandler(logger *zap.Logger) *Handler {
return &Handler{
logger: logger,
}
}

// HandleErrorQuery handles TypeError queries.
func HandleErrorQuery(logger *zap.Logger, nm *NetworkMessage) {
logger.Warn("handles error message")
func (h *Handler) HandleErrorQuery(nm *NetworkMessage) {
h.logger.Warn("handles error message")
if _, ok := nm.Msg.Data.([]string); !ok {
nm.Msg.Data = []string{}
}
Expand All @@ -38,17 +49,17 @@ func HandleErrorQuery(logger *zap.Logger, nm *NetworkMessage) {
}

// HandleUnknownQuery handles unknown queries.
func HandleUnknownQuery(logger *zap.Logger, nm *NetworkMessage) {
logger.Warn("unknown message type", zap.String("messageType", string(nm.Msg.Type)))
func (h *Handler) HandleUnknownQuery(nm *NetworkMessage) {
h.logger.Warn("unknown message type", zap.String("messageType", string(nm.Msg.Type)))
nm.Msg = Message{
Type: TypeError,
Data: []string{fmt.Sprintf("bad request - unknown message type '%s'", nm.Msg.Type)},
}
}

// HandleParticipantsQuery handles TypeParticipants queries.
func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStores, nm *NetworkMessage, domain spectypes.DomainType) {
logger.Debug("handles query request",
func (h *Handler) HandleParticipantsQuery(store *storage.ParticipantStores, nm *NetworkMessage, domain spectypes.DomainType) {
h.logger.Debug("handles query request",
zap.Uint64("from", nm.Msg.Filter.From),
zap.Uint64("to", nm.Msg.Filter.To),
zap.String("publicKey", nm.Msg.Filter.PublicKey),
Expand All @@ -59,28 +70,28 @@ func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStore
}
pkRaw, err := hex.DecodeString(nm.Msg.Filter.PublicKey)
if err != nil {
logger.Warn("failed to decode validator public key", zap.Error(err))
h.logger.Warn("failed to decode validator public key", zap.Error(err))
res.Data = []string{"internal error - could not read validator key"}
nm.Msg = res
return
}
if len(pkRaw) != pubKeySize {
logger.Warn("bad size for the provided public key", zap.Int("length", len(pkRaw)))
h.logger.Warn("bad size for the provided public key", zap.Int("length", len(pkRaw)))
res.Data = []string{"bad size for the provided public key"}
nm.Msg = res
return
}

role, err := message.BeaconRoleFromString(nm.Msg.Filter.Role)
if err != nil {
logger.Warn("failed to parse role", zap.Error(err))
h.logger.Warn("failed to parse role", zap.Error(err))
res.Data = []string{"role doesn't exist"}
nm.Msg = res
return
}
roleStorage := store.Get(role)
if roleStorage == nil {
logger.Warn("role storage doesn't exist", fields.ExporterRole(role))
h.logger.Warn("role storage doesn't exist", fields.ExporterRole(role))
res.Data = []string{"internal error - role storage doesn't exist", role.String()}
nm.Msg = res
return
Expand All @@ -90,7 +101,7 @@ func HandleParticipantsQuery(logger *zap.Logger, store *storage.ParticipantStore
to := phase0.Slot(nm.Msg.Filter.To)
participantsList, err := roleStorage.GetParticipantsInRange(spectypes.ValidatorPK(pkRaw), from, to)
if err != nil {
logger.Warn("failed to get participants", zap.Error(err))
h.logger.Warn("failed to get participants", zap.Error(err))
res.Data = []string{"internal error - could not get participants messages"}
} else {
participations := toParticipations(role, spectypes.ValidatorPK(pkRaw), participantsList)
Expand Down
24 changes: 16 additions & 8 deletions exporter/api/query_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"

qbftstorage "github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/logging"
"github.com/ssvlabs/ssv/networkconfig"
Expand All @@ -35,7 +36,8 @@ func TestHandleUnknownQuery(t *testing.T) {
Conn: nil,
}

HandleUnknownQuery(logger, &nm)
h := NewHandler(logger)
h.HandleUnknownQuery(&nm)
errs, ok := nm.Msg.Data.([]string)
require.True(t, ok)
require.Equal(t, "bad request - unknown message type 'unknown_type'", errs[0])
Expand Down Expand Up @@ -72,7 +74,8 @@ func TestHandleErrorQuery(t *testing.T) {
Err: test.netErr,
Conn: nil,
}
HandleErrorQuery(logger, &nm)
h := NewHandler(logger)
h.HandleErrorQuery(&nm)
errs, ok := nm.Msg.Data.([]string)
require.True(t, ok)
require.Equal(t, test.expectedErr, errs[0])
Expand Down Expand Up @@ -129,7 +132,8 @@ func TestHandleDecidedQuery(t *testing.T) {

t.Run("valid range", func(t *testing.T) {
nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleAttester, 0, 250)
HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType)
h := NewHandler(l)
h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType)
require.NotNil(t, nm.Msg.Data)
msgs, ok := nm.Msg.Data.([]*ParticipantsAPI)

Expand All @@ -139,7 +143,8 @@ func TestHandleDecidedQuery(t *testing.T) {

t.Run("invalid range", func(t *testing.T) {
nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleAttester, 400, 404)
HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType)
h := NewHandler(l)
h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType)
require.NotNil(t, nm.Msg.Data)
data, ok := nm.Msg.Data.([]string)
require.True(t, ok)
Expand All @@ -148,7 +153,8 @@ func TestHandleDecidedQuery(t *testing.T) {

t.Run("non-existing validator", func(t *testing.T) {
nm := newParticipantsAPIMsg("xxx", spectypes.BNRoleAttester, 400, 404)
HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType)
h := NewHandler(l)
h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType)
require.NotNil(t, nm.Msg.Data)
errs, ok := nm.Msg.Data.([]string)
require.True(t, ok)
Expand All @@ -157,7 +163,8 @@ func TestHandleDecidedQuery(t *testing.T) {

t.Run("non-existing role", func(t *testing.T) {
nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), math.MaxUint64, 0, 250)
HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType)
h := NewHandler(l)
h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType)
require.NotNil(t, nm.Msg.Data)
errs, ok := nm.Msg.Data.([]string)
require.True(t, ok)
Expand All @@ -166,7 +173,8 @@ func TestHandleDecidedQuery(t *testing.T) {

t.Run("non-existing storage", func(t *testing.T) {
nm := newParticipantsAPIMsg(pk.SerializeToHexStr(), spectypes.BNRoleSyncCommitteeContribution, 0, 250)
HandleParticipantsQuery(l, ibftStorage, nm, networkConfig.DomainType)
h := NewHandler(l)
h.HandleParticipantsQuery(ibftStorage, nm, networkConfig.DomainType)
require.NotNil(t, nm.Msg.Data)
errs, ok := nm.Msg.Data.([]string)
require.True(t, ok)
Expand Down Expand Up @@ -209,7 +217,7 @@ func newStorageForTest(db basedb.Database, logger *zap.Logger, roles ...spectype

storageMap := qbftstorage.NewStores()
for _, role := range roles {
storageMap.Add(role, qbftstorage.New(db, role))
storageMap.Add(role, qbftstorage.New(logger, db, role))
}

return sExporter, storageMap
Expand Down
Loading

0 comments on commit 3181077

Please sign in to comment.