Skip to content

Commit

Permalink
Update dht.go
Browse files Browse the repository at this point in the history
  • Loading branch information
crStiv authored Jan 30, 2025
1 parent 2ade7a4 commit 832e635
Showing 1 changed file with 17 additions and 99 deletions.
116 changes: 17 additions & 99 deletions share/shwap/p2p/discovery/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -18,116 +17,35 @@ const (
defaultRoutingRefreshPeriod = time.Minute
)

// PeerRouting provides a constructor for PeerRouting over DHT.
// Essentially, this offers a way to discover peer addresses by respecting public keys.
// PeerRouting provides constructor for PeerRouting over DHT.
// Basically, this provides a way to discover peer addresses by respecting public keys.
func NewDHT(
ctx context.Context,
prefix string,
bootstrappers []peer.AddrInfo,
bootsrappers []peer.AddrInfo,
host host.Host,
dataStore datastore.Batching,
mode dht.ModeOpt,
) (*dht.IpfsDHT, error) {
// Create metrics registry with our labels
reg := prometheus.NewRegistry()
labels := prometheus.Labels{
"network": prefix,
"node_type": mode.String(),
}
wrappedReg := prometheus.WrapRegistererWith(labels, reg)

opts := []dht.Option{
dht.BootstrapPeers(bootstrappers...),
dht.BootstrapPeers(bootsrappers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))),
dht.Datastore(dataStore),
dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod),
dht.Mode(mode),
dht.Validator(dht.DefaultValidator{}),
// Enable built-in DHT metrics
dht.EnabledMetrics(wrappedReg),
}

d, err := dht.New(ctx, host, opts...)
if err != nil {
return nil, err
}

// Create a metrics wrapper
metricsDHT := NewMetricsDHT(d, prefix, mode)

// Add event handlers for metrics
metricsDHT.PeerConnected = func(id peer.ID) {
dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Inc()
if metricsDHT.Host().Network().Connectedness(id) == network.DirInbound {
dhtMetrics.InboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc()
} else {
dhtMetrics.OutboundConnectionsTotal.WithLabelValues(prefix, mode.String()).Inc()
}
}

metricsDHT.PeerDisconnected = func(id peer.ID) {
dhtMetrics.PeersTotal.WithLabelValues(prefix, mode.String()).Dec()
}

// Add metrics for the routing table
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
size := len(metricsDHT.RoutingTable().ListPeers())
dhtMetrics.RoutingTableSize.WithLabelValues(prefix, mode.String()).Set(float64(size))
}
}
}()

// Add metrics for storage operations
metricsDHT.Validator.RecordSize = func(key string, size int) {
dhtMetrics.StoredRecordsTotal.WithLabelValues(prefix, mode.String()).Set(float64(size))
}

return metricsDHT.IpfsDHT, nil
}

// Add wrapper functions for tracking metrics

func trackDHTRequest(ctx context.Context, prefix, mode, requestType string) (context.Context, func(error)) {
start := time.Now()
return ctx, func(err error) {
duration := time.Since(start)
status := "success"
if err != nil {
status = "error"
}

dhtMetrics.RequestsTotal.WithLabelValues(prefix, mode, requestType, status).Inc()
dhtMetrics.RequestDuration.WithLabelValues(prefix, mode, requestType).Observe(duration.Seconds())
}
}

func trackFindPeer(ctx context.Context, prefix, mode string) func(error) {
return func(err error) {
status := "success"
if err != nil {
status = "error"
}
dhtMetrics.FindPeerTotal.WithLabelValues(prefix, mode, status).Inc()
}
}

func trackFindProviders(ctx context.Context, prefix, mode string) func(error) {
return func(err error) {
status := "success"
if err != nil {
status = "error"
}
dhtMetrics.FindProvidersTotal.WithLabelValues(prefix, mode, status).Inc()
}
}

func trackStoreOperation(ctx context.Context, prefix, mode string) func(error) {
return func(err error) {
status := "success"
if err != nil {
status = "error"
}
dhtMetrics.StoreOperationsTotal.WithLabelValues(prefix, mode, status).Inc()
}
}

func trackRoutingTableRefresh(prefix, mode string) {
dhtMetrics.RoutingTableRefreshes.WithLabelValues(prefix, mode).Inc()
return dht.New(ctx, host, opts...)
}
я

0 comments on commit 832e635

Please sign in to comment.