From 4bb6c2b4de1f05e7764150800108e212483f951d Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 2 Oct 2024 18:00:44 +0200 Subject: [PATCH 1/2] add blockstore metrics --- nodebuilder/settings.go | 1 + nodebuilder/share/bitswap.go | 12 +- nodebuilder/share/module.go | 21 +- nodebuilder/share/opts.go | 5 + share/shwap/p2p/bitswap/blockstore_metrics.go | 212 ++++++++++++++++++ 5 files changed, 242 insertions(+), 9 deletions(-) create mode 100644 share/shwap/p2p/bitswap/blockstore_metrics.go diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index ea1aac2d37..a5e0653cb7 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -95,6 +95,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]), fx.Invoke(node.WithMetrics), fx.Invoke(share.WithDiscoveryMetrics), + fx.Invoke(share.WithBlockStoreMetrics), ) samplingMetrics := fx.Options( diff --git a/nodebuilder/share/bitswap.go b/nodebuilder/share/bitswap.go index 9316075dfe..3183ba599c 100644 --- a/nodebuilder/share/bitswap.go +++ b/nodebuilder/share/bitswap.go @@ -60,21 +60,23 @@ func dataExchange(tp node.Type, params bitSwapParams) exchange.SessionExchange { } } -func blockstoreFromDatastore(ds datastore.Batching) (blockstore.Blockstore, error) { - return blockstore.NewBlockstore(ds), nil +func blockstoreFromDatastore(ds datastore.Batching) (*bitswap.BlockstoreWithMetrics, error) { + bs := blockstore.NewBlockstore(ds) + return bitswap.NewBlockstoreWithMetrics(bs) } -func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (blockstore.Blockstore, error) { +func blockstoreFromEDSStore(store *store.Store, blockStoreCacheSize int) (*bitswap.BlockstoreWithMetrics, error) { if blockStoreCacheSize == 0 { // no cache, return plain blockstore - return &bitswap.Blockstore{Getter: store}, nil + bs := &bitswap.Blockstore{Getter: store} + return bitswap.NewBlockstoreWithMetrics(bs) } withCache, err := store.WithCache("blockstore", blockStoreCacheSize) if err != nil { return nil, fmt.Errorf("create cached store for blockstore:%w", err) } bs := &bitswap.Blockstore{Getter: withCache} - return bs, nil + return bitswap.NewBlockstoreWithMetrics(bs) } type bitSwapParams struct { diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index f02b4632b5..231ebf1525 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -16,6 +16,7 @@ import ( "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/shwap" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds" @@ -69,14 +70,26 @@ func bitswapComponents(tp node.Type, cfg *Config) fx.Option { case node.Light: return fx.Options( opts, - fx.Provide(blockstoreFromDatastore), + fx.Provide( + fx.Annotate( + blockstoreFromDatastore, + fx.As(fx.Self()), + fx.As(new(blockstore.Blockstore)), + ), + ), ) case node.Full, node.Bridge: return fx.Options( opts, - fx.Provide(func(store *store.Store) (blockstore.Blockstore, error) { - return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize)) - }), + fx.Provide( + fx.Annotate( + func(store *store.Store) (*bitswap.BlockstoreWithMetrics, error) { + return blockstoreFromEDSStore(store, int(cfg.BlockStoreCacheSize)) + }, + fx.As(fx.Self()), + fx.As(new(blockstore.Blockstore)), + ), + ), ) default: panic("invalid node type") diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index cfea26dbb4..6564099d23 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -3,6 +3,7 @@ package share import ( "errors" + "github.com/celestiaorg/celestia-node/share/shwap/p2p/bitswap" "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter" @@ -56,3 +57,7 @@ func WithShrexGetterMetrics(sg *shrex_getter.Getter) error { func WithStoreMetrics(s *store.Store) error { return s.WithMetrics() } + +func WithBlockStoreMetrics(bs *bitswap.BlockstoreWithMetrics) error { + return bs.WithMetrics() +} diff --git a/share/shwap/p2p/bitswap/blockstore_metrics.go b/share/shwap/p2p/bitswap/blockstore_metrics.go new file mode 100644 index 0000000000..4f5a58224f --- /dev/null +++ b/share/shwap/p2p/bitswap/blockstore_metrics.go @@ -0,0 +1,212 @@ +package bitswap + +import ( + "context" + "errors" + "fmt" + + bstore "github.com/ipfs/boxo/blockstore" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + notFoundKey = "not_found" + failedKey = "failed" +) + +var ( + meter = otel.Meter("bitswap_blockstore") + _ bstore.Blockstore = (*BlockstoreWithMetrics)(nil) +) + +// BlockstoreWithMetrics is a blockstore that collects metrics on blockstore operations. +type BlockstoreWithMetrics struct { + bs bstore.Blockstore + metrics *metrics +} + +type metrics struct { + delete metric.Int64Counter + has metric.Int64Counter + get metric.Int64Counter + getSize metric.Int64Counter + put metric.Int64Counter + putMany metric.Int64Counter + allKeysChan metric.Int64Counter +} + +// NewBlockstoreWithMetrics creates a new BlockstoreWithMetrics. +func NewBlockstoreWithMetrics(bs bstore.Blockstore) (*BlockstoreWithMetrics, error) { + return &BlockstoreWithMetrics{ + bs: bs, + }, nil +} + +// WithMetrics enables metrics collection on the blockstore. +func (w *BlockstoreWithMetrics) WithMetrics() error { + del, err := meter.Int64Counter( + "blockstore_delete_block", + metric.WithDescription("Blockstore delete block operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore delete block counter: %w", err) + } + + has, err := meter.Int64Counter( + "blockstore_has", + metric.WithDescription("Blockstore has operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore has counter: %w", err) + } + + get, err := meter.Int64Counter( + "blockstore_get", + metric.WithDescription("Blockstore get operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore get counter: %w", err) + } + + getSize, err := meter.Int64Counter( + "blockstore_get_size", + metric.WithDescription("Blockstore get size operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore get size counter: %w", err) + } + + put, err := meter.Int64Counter( + "blockstore_put", + metric.WithDescription("Blockstore put operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore put counter: %w", err) + } + + putMany, err := meter.Int64Counter( + "blockstore_put_many", + metric.WithDescription("Blockstore put many operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore put many counter: %w", err) + } + + allKeysChan, err := meter.Int64Counter( + "blockstore_all_keys_chan", + metric.WithDescription("Blockstore all keys chan operation"), + ) + if err != nil { + return fmt.Errorf("failed to create blockstore all keys chan counter: %w", err) + } + + if err != nil { + return fmt.Errorf("failed to create blockstore hash on read counter: %w", err) + } + + w.metrics = &metrics{ + delete: del, + has: has, + get: get, + getSize: getSize, + put: put, + putMany: putMany, + allKeysChan: allKeysChan, + } + return nil +} + +func (w *BlockstoreWithMetrics) DeleteBlock(ctx context.Context, cid cid.Cid) error { + if w.metrics == nil { + return w.bs.DeleteBlock(ctx, cid) + } + err := w.bs.DeleteBlock(ctx, cid) + w.metrics.delete.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, err != nil), + )) + return err +} + +func (w *BlockstoreWithMetrics) Has(ctx context.Context, cid cid.Cid) (bool, error) { + if w.metrics == nil { + return w.bs.Has(ctx, cid) + } + has, err := w.bs.Has(ctx, cid) + notFound := errors.Is(err, ipld.ErrNotFound{}) + failed := err != nil && !notFound + w.metrics.has.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, failed), + attribute.Bool(notFoundKey, notFound), + )) + return has, err +} + +func (w *BlockstoreWithMetrics) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { + if w.metrics == nil { + return w.bs.Get(ctx, cid) + } + get, err := w.bs.Get(ctx, cid) + notFound := errors.Is(err, ipld.ErrNotFound{}) + failed := err != nil && !notFound + w.metrics.get.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, failed), + attribute.Bool(notFoundKey, notFound), + )) + return get, err +} + +func (w *BlockstoreWithMetrics) GetSize(ctx context.Context, cid cid.Cid) (int, error) { + if w.metrics == nil { + return w.bs.GetSize(ctx, cid) + } + size, err := w.bs.GetSize(ctx, cid) + notFound := errors.Is(err, ipld.ErrNotFound{}) + failed := err != nil && !notFound + w.metrics.getSize.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, failed), + attribute.Bool(notFoundKey, notFound), + )) + return size, err +} + +func (w *BlockstoreWithMetrics) Put(ctx context.Context, block blocks.Block) error { + if w.metrics == nil { + return w.bs.Put(ctx, block) + } + err := w.bs.Put(ctx, block) + w.metrics.put.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, err != nil), + )) + return err +} + +func (w *BlockstoreWithMetrics) PutMany(ctx context.Context, blocks []blocks.Block) error { + if w.metrics == nil { + return w.bs.PutMany(ctx, blocks) + } + err := w.bs.PutMany(ctx, blocks) + w.metrics.putMany.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, err != nil), + )) + return err +} + +func (w *BlockstoreWithMetrics) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if w.metrics == nil { + return w.bs.AllKeysChan(ctx) + } + ch, err := w.bs.AllKeysChan(ctx) + w.metrics.allKeysChan.Add(ctx, 1, metric.WithAttributes( + attribute.Bool(failedKey, err != nil), + )) + return ch, err +} + +func (w *BlockstoreWithMetrics) HashOnRead(enabled bool) { + w.bs.HashOnRead(enabled) +} From abd89f41481b0d384bc191bee9e74fb2a3e8699f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Wed, 20 Nov 2024 18:12:16 +0700 Subject: [PATCH 2/2] remove extra err check --- share/shwap/p2p/bitswap/blockstore_metrics.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/share/shwap/p2p/bitswap/blockstore_metrics.go b/share/shwap/p2p/bitswap/blockstore_metrics.go index 4f5a58224f..00633caf85 100644 --- a/share/shwap/p2p/bitswap/blockstore_metrics.go +++ b/share/shwap/p2p/bitswap/blockstore_metrics.go @@ -105,10 +105,6 @@ func (w *BlockstoreWithMetrics) WithMetrics() error { return fmt.Errorf("failed to create blockstore all keys chan counter: %w", err) } - if err != nil { - return fmt.Errorf("failed to create blockstore hash on read counter: %w", err) - } - w.metrics = &metrics{ delete: del, has: has,