Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap/bitswap): add blockstore metrics #3862

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
fx.Invoke(share.WithBlockStoreMetrics),
)

samplingMetrics := fx.Options(
Expand Down
12 changes: 7 additions & 5 deletions nodebuilder/share/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,23 @@ func dataExchange(tp node.Type, params bitSwapParams) exchange.SessionExchange {
}
}

func blockstoreFromDatastore(ds datastore.Batching) (blockstore.Blockstore, error) {
return blockstore.NewBlockstore(ds), nil
func blockstoreFromDatastore(ds datastore.Batching) (*bitswap.BlockstoreWithMetrics, error) {
bs := blockstore.NewBlockstore(ds)
return bitswap.NewBlockstoreWithMetrics(bs)
}

func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (blockstore.Blockstore, error) {
func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (*bitswap.BlockstoreWithMetrics, error) {
if blockStoreCacheSize == 0 {
// no cache, return plain blockstore
return &bitswap.Blockstore{Getter: store}, nil
bs := &bitswap.Blockstore{Getter: store}
return bitswap.NewBlockstoreWithMetrics(bs)
}
withCache, err := store.WithCache("blockstore", blockStoreCacheSize)
if err != nil {
return nil, fmt.Errorf("create cached store for blockstore:%w", err)
}
bs := &bitswap.Blockstore{Getter: withCache}
return bs, nil
return bitswap.NewBlockstoreWithMetrics(bs)
}

type bitSwapParams struct {
Expand Down
21 changes: 17 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
Expand Down Expand Up @@ -69,14 +70,26 @@ func bitswapComponents(tp node.Type, cfg *Config) fx.Option {
case node.Light:
return fx.Options(
opts,
fx.Provide(blockstoreFromDatastore),
fx.Provide(
fx.Annotate(
blockstoreFromDatastore,
fx.As(fx.Self()),
fx.As(new(blockstore.Blockstore)),
),
),
)
case node.Full, node.Bridge:
return fx.Options(
opts,
fx.Provide(func(store *store.Store) (blockstore.Blockstore, error) {
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
}),
fx.Provide(
fx.Annotate(
func(store *store.Store) (*bitswap.BlockstoreWithMetrics, error) {
return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize))
},
fx.As(fx.Self()),
fx.As(new(blockstore.Blockstore)),
),
),
)
default:
panic("invalid node type")
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package share
import (
"errors"

"github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
Expand Down Expand Up @@ -56,3 +57,7 @@ func WithShrexGetterMetrics(sg *shrex_getter.Getter) error {
func WithStoreMetrics(s *store.Store) error {
return s.WithMetrics()
}

func WithBlockStoreMetrics(bs *bitswap.BlockstoreWithMetrics) error {
return bs.WithMetrics()
}
208 changes: 208 additions & 0 deletions share/shwap/p2p/bitswap/blockstore_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package bitswap

import (
"context"
"errors"
"fmt"

bstore "github.com/ipfs/boxo/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
notFoundKey = "not_found"
failedKey = "failed"
)

var (
meter = otel.Meter("bitswap_blockstore")
_ bstore.Blockstore = (*BlockstoreWithMetrics)(nil)
)

// BlockstoreWithMetrics is a blockstore that collects metrics on blockstore operations.
type BlockstoreWithMetrics struct {
bs bstore.Blockstore
metrics *metrics
}

type metrics struct {
delete metric.Int64Counter
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
has metric.Int64Counter
get metric.Int64Counter
getSize metric.Int64Counter
put metric.Int64Counter
putMany metric.Int64Counter
allKeysChan metric.Int64Counter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need this one realistically. We dont implement it and never call it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blockstore metrics are design as general purpose wrapper. Metrics wrapper is also used for Light node blockstore, which implements those operations.

}

// NewBlockstoreWithMetrics creates a new BlockstoreWithMetrics.
func NewBlockstoreWithMetrics(bs bstore.Blockstore) (*BlockstoreWithMetrics, error) {
return &BlockstoreWithMetrics{
bs: bs,
}, nil
}

// WithMetrics enables metrics collection on the blockstore.
func (w *BlockstoreWithMetrics) WithMetrics() error {
del, err := meter.Int64Counter(
"blockstore_delete_block",
metric.WithDescription("Blockstore delete block operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore delete block counter: %w", err)
}

has, err := meter.Int64Counter(
"blockstore_has",
metric.WithDescription("Blockstore has operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore has counter: %w", err)
}

get, err := meter.Int64Counter(
"blockstore_get",
metric.WithDescription("Blockstore get operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore get counter: %w", err)
}

getSize, err := meter.Int64Counter(
"blockstore_get_size",
metric.WithDescription("Blockstore get size operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore get size counter: %w", err)
}

put, err := meter.Int64Counter(
"blockstore_put",
metric.WithDescription("Blockstore put operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore put counter: %w", err)
}

putMany, err := meter.Int64Counter(
"blockstore_put_many",
metric.WithDescription("Blockstore put many operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore put many counter: %w", err)
}

allKeysChan, err := meter.Int64Counter(
"blockstore_all_keys_chan",
metric.WithDescription("Blockstore all keys chan operation"),
)
if err != nil {
return fmt.Errorf("failed to create blockstore all keys chan counter: %w", err)
}

w.metrics = &metrics{
delete: del,
has: has,
get: get,
getSize: getSize,
put: put,
putMany: putMany,
allKeysChan: allKeysChan,
}
return nil
}

func (w *BlockstoreWithMetrics) DeleteBlock(ctx context.Context, cid cid.Cid) error {
if w.metrics == nil {
return w.bs.DeleteBlock(ctx, cid)
}
err := w.bs.DeleteBlock(ctx, cid)
w.metrics.delete.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if w.metrics == nil {
return w.bs.Has(ctx, cid)
}
has, err := w.bs.Has(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.has.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return has, err
}

func (w *BlockstoreWithMetrics) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if w.metrics == nil {
return w.bs.Get(ctx, cid)
}
get, err := w.bs.Get(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.get.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return get, err
}

func (w *BlockstoreWithMetrics) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
if w.metrics == nil {
return w.bs.GetSize(ctx, cid)
}
size, err := w.bs.GetSize(ctx, cid)
notFound := errors.Is(err, ipld.ErrNotFound{})
failed := err != nil && !notFound
w.metrics.getSize.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, failed),
attribute.Bool(notFoundKey, notFound),
))
return size, err
}

func (w *BlockstoreWithMetrics) Put(ctx context.Context, block blocks.Block) error {
if w.metrics == nil {
return w.bs.Put(ctx, block)
}
err := w.bs.Put(ctx, block)
w.metrics.put.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) PutMany(ctx context.Context, blocks []blocks.Block) error {
if w.metrics == nil {
return w.bs.PutMany(ctx, blocks)
}
err := w.bs.PutMany(ctx, blocks)
w.metrics.putMany.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return err
}

func (w *BlockstoreWithMetrics) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if w.metrics == nil {
return w.bs.AllKeysChan(ctx)
}
ch, err := w.bs.AllKeysChan(ctx)
w.metrics.allKeysChan.Add(ctx, 1, metric.WithAttributes(
attribute.Bool(failedKey, err != nil),
))
return ch, err
}

func (w *BlockstoreWithMetrics) HashOnRead(enabled bool) {
w.bs.HashOnRead(enabled)
}
Loading