Skip to content

Commit

Permalink
refactor(misc): pretty print namespace.iD and peer.ID (#2255)
Browse files Browse the repository at this point in the history
## Overview

Log `namespace.ID` and `peer.ID` in pretty format.
  • Loading branch information
walldiss authored May 29, 2023
1 parent 9359b65 commit 6e0f3b9
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 29 deletions.
2 changes: 1 addition & 1 deletion das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (d *DASer) Stop(ctx context.Context) error {

// save updated checkpoint after sampler and all workers are shut down
if err = d.store.store(ctx, newCheckpoint(d.sampler.state.unsafeStats())); err != nil {
log.Errorw("storing checkpoint to disk", "Err", err)
log.Errorw("storing checkpoint to disk", "err", err)
}

if err = d.store.wait(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion share/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
minDAH := da.MinDataAvailabilityHeader()
if !bytes.Equal(minDAH.Hash(), dah.Hash()) {
panic(fmt.Sprintf("mismatch in calculated minimum DAH and minimum DAH from celestia-app, "+
"expected %X, got %X", minDAH.Hash(), dah.Hash()))
"expected %s, got %s", minDAH.String(), dah.String()))
}
emptyRoot = &dah

Expand Down
2 changes: 1 addition & 1 deletion share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (ns NamespacedShares) Verify(root *Root, nID namespace.ID) error {
for i, row := range ns {
// verify row data against row hash from original root
if !row.verify(originalRoots[i], nID) {
return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.Hash())
return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.String())
}
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (cg *CascadeGetter) GetSharesByNamespace(
) (share.NamespacedShares, error) {
ctx, span := tracer.Start(ctx, "cascade/get-shares-by-namespace", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.String("nid", id.String()),
attribute.String("nid", hex.EncodeToString(id)),
))
defer span.End()

Expand Down
3 changes: 2 additions & 1 deletion share/getters/ipld.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -93,7 +94,7 @@ func (ig *IPLDGetter) GetSharesByNamespace(
) (shares share.NamespacedShares, err error) {
ctx, span := tracer.Start(ctx, "ipld/get-shares-by-namespace", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.String("nID", nID.String()),
attribute.String("nid", hex.EncodeToString(nID)),
))
defer func() {
utils.SetStatusAndEnd(span, err)
Expand Down
3 changes: 3 additions & 0 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
if getErr != nil {
log.Debugw("nd: couldn't find peer",
"hash", root.String(),
"nid", hex.EncodeToString(id),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordNDAttempt(ctx, attempt, false)
Expand Down Expand Up @@ -245,6 +247,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
}
log.Debugw("nd: request failed",
"hash", root.String(),
"nid", hex.EncodeToString(id),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand Down
3 changes: 2 additions & 1 deletion share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"
"fmt"

Expand Down Expand Up @@ -98,7 +99,7 @@ func (sg *StoreGetter) GetSharesByNamespace(
) (shares share.NamespacedShares, err error) {
ctx, span := tracer.Start(ctx, "store/get-shares-by-namespace", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.String("nID", nID.String()),
attribute.String("nid", hex.EncodeToString(nID)),
))
defer func() {
utils.SetStatusAndEnd(span, err)
Expand Down
3 changes: 2 additions & 1 deletion share/getters/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"
"fmt"

Expand Down Expand Up @@ -75,7 +76,7 @@ func (tg *TeeGetter) GetSharesByNamespace(
) (shares share.NamespacedShares, err error) {
ctx, span := tracer.Start(ctx, "tee/get-shares-by-namespace", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.String("nID", id.String()),
attribute.String("nid", hex.EncodeToString(id)),
))
defer func() {
utils.SetStatusAndEnd(span, err)
Expand Down
3 changes: 2 additions & 1 deletion share/getters/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package getters

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -51,7 +52,7 @@ func collectSharesByNamespace(
) (shares share.NamespacedShares, err error) {
ctx, span := tracer.Start(ctx, "collect-shares-by-namespace", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.String("nid", nID.String()),
attribute.String("nid", hex.EncodeToString(nID)),
))
defer func() {
utils.SetStatusAndEnd(span, err)
Expand Down
6 changes: 3 additions & 3 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (d *Discovery) Discard(id peer.ID) bool {
d.connector.Backoff(id)
d.set.Remove(id)
d.onUpdatedPeers(id, false)
log.Debugw("removed peer from the peer set", "peer", id)
log.Debugw("removed peer from the peer set", "peer", id.String())

if d.set.Size() < d.set.Limit() {
// trigger discovery
Expand Down Expand Up @@ -307,7 +307,7 @@ func (d *Discovery) discover(ctx context.Context) bool {
}

size := d.set.Size()
log.Debugw("found peer", "peer", peer.ID, "found_amount", size)
log.Debugw("found peer", "peer", peer.ID.String(), "found_amount", size)
if size < d.set.Limit() {
return nil
}
Expand All @@ -323,7 +323,7 @@ func (d *Discovery) discover(ctx context.Context) bool {
// handleDiscoveredPeer adds peer to the internal if can connect or is connected.
// Report whether it succeeded.
func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo) bool {
logger := log.With("peer", peer.ID)
logger := log.With("peer", peer.ID.String())
switch {
case peer.ID == d.host.ID():
d.metrics.observeHandlePeer(ctx, handlePeerSkipSelf)
Expand Down
23 changes: 12 additions & 11 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ func NewManager(
func(peerID peer.ID, isAdded bool) {
if isAdded {
if s.isBlacklistedPeer(peerID) {
log.Debugw("got blacklisted peer from discovery", "peer", peerID)
log.Debugw("got blacklisted peer from discovery", "peer", peerID.String())
return
}
s.fullNodes.add(peerID)
log.Debugw("added to full nodes", "peer", peerID)
return
}

log.Debugw("removing peer from discovered full nodes", "peer", peerID)
log.Debugw("removing peer from discovered full nodes", "peer", peerID.String())
s.fullNodes.remove(peerID)
})

Expand Down Expand Up @@ -256,7 +256,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
return func(result result) {
log.Debugw("set peer result",
"hash", datahash.String(),
"peer", peerID,
"peer", peerID.String(),
"source", source,
"result", result)
m.metrics.observeDoneResult(source, result)
Expand Down Expand Up @@ -318,7 +318,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs
if connStatus.Connectedness == network.NotConnected {
peer := connStatus.Peer
if m.fullNodes.has(peer) {
log.Debugw("peer disconnected, removing from full nodes", "peer", peer)
log.Debugw("peer disconnected, removing from full nodes", "peer", peer.String())
m.fullNodes.remove(peer)
}
}
Expand All @@ -328,7 +328,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
logger := log.With("peer", peerID, "hash", msg.DataHash.String())
logger := log.With("peer", peerID.String(), "hash", msg.DataHash.String())

// messages broadcast from self should bypass the validation with Accept
if peerID == m.host.ID() {
Expand Down Expand Up @@ -390,15 +390,16 @@ func (m *Manager) getOrCreatePool(datahash string) *syncPool {
}

func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) {
log.Debugw("blacklisting peers",
"peers", peerIDs,
"reason", reason)
m.metrics.observeBlacklistPeers(reason, len(peerIDs))

if !m.params.EnableBlackListing {
return
}
for _, peerID := range peerIDs {
// blacklisted peers will be logged regardless of EnableBlackListing whether option being is
// enabled, until blacklisting is not properly tested and enabled by default.
log.Debugw("blacklisting peer", "peer", peerID.String(), "reason", reason)
if !m.params.EnableBlackListing {
continue
}

m.fullNodes.remove(peerID)
// add peer to the blacklist, so we can't connect to it in the future.
err := m.connGater.BlockPeer(peerID)
Expand Down
6 changes: 3 additions & 3 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Client) RequestEDS(
if err == nil {
return eds, nil
}
log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String(), "error", err)
log.Debugw("client: eds request to peer failed", "peer", peer.String(), "hash", dataHash.String(), "error", err)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
return nil, err
Expand All @@ -70,7 +70,7 @@ func (c *Client) RequestEDS(
}
if err != p2p.ErrNotFound {
log.Warnw("client: eds request to peer failed",
"peer", peer,
"peer", peer.String(),
"hash", dataHash.String(),
"err", err)
}
Expand All @@ -95,7 +95,7 @@ func (c *Client) doRequest(
req := &pb.EDSRequest{Hash: dataHash}

// request ODS
log.Debugf("client: requesting ods %s from peer %s", dataHash.String(), to)
log.Debugw("client: requesting ods", "hash", dataHash.String(), "peer", to.String())
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Server) observeRateLimitedRequests() {
}

func (s *Server) handleStream(stream network.Stream) {
logger := log.With("peer", stream.Conn().RemotePeer())
logger := log.With("peer", stream.Conn().RemotePeer().String())
logger.Debug("server: handling eds request")

s.observeRateLimitedRequests()
Expand All @@ -91,7 +91,7 @@ func (s *Server) handleStream(stream network.Stream) {
stream.Reset() //nolint:errcheck
return
}
logger = logger.With("hash", hash)
logger = logger.With("hash", hash.String())

ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout)
defer cancel()
Expand Down
5 changes: 3 additions & 2 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package shrexnd

import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (srv *Server) observeRateLimitedRequests() {
}

func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) {
logger := log.With("peer", stream.Conn().RemotePeer())
logger := log.With("peer", stream.Conn().RemotePeer().String())
logger.Debug("server: handling nd request")

srv.observeRateLimitedRequests()
Expand All @@ -99,7 +100,7 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre
stream.Reset() //nolint:errcheck
return
}
logger = logger.With("namespaceId", string(req.NamespaceId), "hash", string(req.RootHash))
logger = logger.With("namespaceId", hex.EncodeToString(req.NamespaceId), "hash", share.DataHash(req.RootHash).String())
logger.Debugw("server: new request")

err = stream.CloseRead()
Expand Down

0 comments on commit 6e0f3b9

Please sign in to comment.