Skip to content

Commit

Permalink
feat(share/eds): Rework accessor cache (#2612)
Browse files Browse the repository at this point in the history
**Motivation** 
Previously, all accessors went into cache. They were closed only when
they were evicted from it and there was no way to manually close one.
Now, when we need to cache accessors only in some cases and in some not
to, this approach will not work. Enabling control of accessor close
required major cache rework.

New cache features:
- Old cache didn't allow to close and release of an accessor. Now,
Instead of returning just a reader/blockstore, cache returns object that
composed of builder and `io.Closer` (Accessor interface). This allows
caller to have an ability to close and release accessor once it is done
with reading from it.
- Items stored in cache also provide io.Closer`, but it is noop.
Lifecycle of cached items is controlled by cache itself. For now objects
are released right after they are evicted from cache, but it could be
changed to smarter release based on current readers count. Will be
implemented as separate issue.
- Lazy blockstore builder. Old cache created blockstore every time
accessor was acquired, even when only reader was used and blockstore was
not needed. Creating blockstore is costly operation that involves
building index, so this was reworked to create blockstore, only when it
is requested and store it for subsequent requests.
- adds cache unit tests.
- adds metrics for cache methods to track cache hits/miss.
- general cleanup of unused methods/logic.

It will be easier for reviewer to start from `store.go` and
`blockstore.go` and then go into new `cache` pkg
  • Loading branch information
walldiss authored Sep 18, 2023
1 parent 7c1f8f3 commit 4d98585
Show file tree
Hide file tree
Showing 17 changed files with 876 additions and 333 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/golang-lru v1.0.2
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/imdario/mergo v0.3.16
github.com/ipfs/boxo v0.12.0
github.com/ipfs/go-block-format v0.2.0
Expand Down Expand Up @@ -185,8 +185,8 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect
github.com/holiman/uint256 v1.2.3 // indirect
Expand Down
1 change: 1 addition & 0 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func TestStoreRestart(t *testing.T) {
require.NoError(t, err)
_, err = eds.ReadEDS(ctx, odsReader, h)
require.NoError(t, err)
require.NoError(t, edsReader.Close())
}
}

Expand Down
178 changes: 0 additions & 178 deletions share/eds/accessor_cache.go

This file was deleted.

63 changes: 39 additions & 24 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
bstore "github.com/ipfs/boxo/blockstore"
dshelp "github.com/ipfs/boxo/datastore/dshelp"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ipld "github.com/ipfs/go-ipld-format"

"github.com/celestiaorg/celestia-node/share/eds/cache"
)

var _ bstore.Blockstore = (*blockstore)(nil)
Expand All @@ -32,18 +32,9 @@ var (
// implementation to allow for the blockstore operations to be routed to the underlying stores.
type blockstore struct {
store *Store
cache *blockstoreCache
ds datastore.Batching
}

func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
Expand All @@ -63,6 +54,11 @@ func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {

func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.Get(ctx, cid)
}

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
blockData, err := bs.ds.Get(ctx, k)
Expand All @@ -72,15 +68,18 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}
return blockstr.Get(ctx, cid)

log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}

func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if err == nil {
defer closeAndLog("blockstore", blockstr)
return blockstr.GetSize(ctx, cid)
}

if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
Expand All @@ -90,10 +89,9 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return 0, err
}
return blockstr.GetSize(ctx, cid)

log.Debugf("failed to get size for cid %s: %s", cid, err)
return 0, err
}

func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
Expand Down Expand Up @@ -139,7 +137,7 @@ func (bs *blockstore) HashOnRead(bool) {
}

// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*BlockstoreCloser, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
return nil, ErrNotFound
Expand All @@ -148,11 +146,28 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (d
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}

// a share can exist in multiple EDSes, so just take the first one.
// check if cache contains any of accessors
shardKey := keys[0]
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
if accessor, err := bs.store.cache.Get(shardKey); err == nil {
return blockstoreCloser(accessor)
}

// load accessor to the cache and use it as blockstoreCloser
accessor, err := bs.store.cache.GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
return accessor.bs, nil
return blockstoreCloser(accessor)
}

// blockstoreCloser constructs new BlockstoreCloser from cache.Accessor
func blockstoreCloser(ac cache.Accessor) (*BlockstoreCloser, error) {
bs, err := ac.Blockstore()
if err != nil {
return nil, fmt.Errorf("eds/store: failed to get blockstore: %w", err)
}
return &BlockstoreCloser{
ReadBlockstore: bs,
Closer: ac,
}, nil
}
3 changes: 3 additions & 0 deletions share/eds/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestBlockstore_Operations(t *testing.T) {
topLevelBS := edsStore.Blockstore()
carBS, err := edsStore.CARBlockstore(ctx, dah.Hash())
require.NoError(t, err)
defer func() {
require.NoError(t, carBS.Close())
}()

root, err := edsStore.GetDAH(ctx, dah.Hash())
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 4d98585

Please sign in to comment.