Skip to content

Commit

Permalink
move to max size instead of real
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Nov 14, 2024
1 parent 8efcbdc commit 8c953dc
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 68 deletions.
4 changes: 0 additions & 4 deletions share/shwap/p2p/bitswap/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ type Block interface {
CID() cid.Cid
// Height reports the Height of the Shwap container behind the Block.
Height() uint64
// Size reports expected size of the Block(without serialization overhead).
// Must support getting size when the Block is not populated or empty and strive to
// be low overhead.
Size(context.Context, eds.Accessor) (int, error)

// Populate fills up the Block with the Shwap container getting it out of the EDS
// Accessor.
Expand Down
14 changes: 13 additions & 1 deletion share/shwap/p2p/bitswap/block_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@ func EmptyBlock(cid cid.Cid) (Block, error) {
return blk, nil
}

// maxBlockSize returns the maximum size of the Block type in the given CID.
func maxBlockSize(cid cid.Cid) (int, error) {
spec, ok := specRegistry[cid.Prefix().MhType]
if !ok {
return -1, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType)
}

return spec.maxSize, nil
}

// registerBlock registers the new Block type and multihash for it.
func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) {
func registerBlock(mhcode, codec uint64, maxSize, idSize int, bldrFn func(cid.Cid) (Block, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: idSize}
})
specRegistry[mhcode] = blockSpec{
idSize: idSize,
maxSize: maxSize,
codec: codec,
builder: bldrFn,
}
Expand All @@ -38,6 +49,7 @@ func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block
// blockSpec holds constant metadata about particular Block types.
type blockSpec struct {
idSize int
maxSize int
codec uint64
builder func(cid.Cid) (Block, error)
}
Expand Down
41 changes: 12 additions & 29 deletions share/shwap/p2p/bitswap/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,21 @@ type Blockstore struct {
Getter AccessorGetter
}

func (b *Blockstore) getBlockAndAccessor(ctx context.Context, cid cid.Cid) (Block, eds.AccessorStreamer, error) {
func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, err := EmptyBlock(cid)
if err != nil {
return nil, nil, err
return nil, err
}

acc, err := b.Getter.GetByHeight(ctx, blk.Height())
if errors.Is(err, store.ErrNotFound) {
log.Debugf("no EDS Accessor for height %v found", blk.Height())
return nil, nil, ipld.ErrNotFound{Cid: cid}
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return nil, nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err)
return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err)
}

return blk, acc, nil
}

func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, acc, err := b.getBlockAndAccessor(ctx, cid)
if err != nil {
return nil, err
}
defer func() {
if err := acc.Close(); err != nil {
log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err)
Expand All @@ -64,25 +56,16 @@ func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
return convertBitswap(blk)
}

func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// NOTE: Bitswap prioritizes peers based on their active/pending work and the priority that peers set for the work
// themselves, where the work is the Get operation. The prioritization happens only on the Get operation and not
// GetSize, while GetSize is expected to be as lightweight as possible.
//
// Here is the best case we only open the Accessor and getting its size, avoiding expensive compute to get the size.
blk, acc, err := b.getBlockAndAccessor(ctx, cid)
if err != nil {
return 0, err
}
defer func() {
if err := acc.Close(); err != nil {
log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err)
}
}()
func (b *Blockstore) GetSize(_ context.Context, cid cid.Cid) (int, error) {
// NOTE: Size is used as a weight for the incoming Bitswap requests. Bitswap uses fair scheduling for the requests
// and prioritizes peers with less *active* work. Active work of a peer is a cumulative weight of all the in-progress
// requests.

size, err := blk.Size(ctx, acc)
// Constant max block size is used instead of factual size. This avoids disk IO but equalizes the weights of the
// requests of the same type. E.g. row of 2MB EDS and row of 8MB EDS will have the same weight.
size, err := maxBlockSize(cid)
if err != nil {
return 0, fmt.Errorf("getting block size: %w", err)
return 0, err
}

return size, nil
Expand Down
8 changes: 3 additions & 5 deletions share/shwap/p2p/bitswap/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
const (
testCodec = 0x9999
testMultihashCode = 0x9999
testBlockSize = 256
testIDSize = 2
)

func init() {
registerBlock(
testMultihashCode,
testCodec,
testBlockSize,
testIDSize,
func(cid cid.Cid) (Block, error) {
return newEmptyTestBlock(cid)
Expand Down Expand Up @@ -67,7 +69,7 @@ type testBlock struct {
}

func newTestBlock(id int) *testBlock {
bytes := make([]byte, 256)
bytes := make([]byte, testBlockSize)
_, _ = crand.Read(bytes)
return &testBlock{id: testID(id), data: bytes}
}
Expand Down Expand Up @@ -95,10 +97,6 @@ func (t *testBlock) Height() uint64 {
return 1
}

func (t *testBlock) Size(_ context.Context, _ eds.Accessor) (int, error) {
return len(t.data), nil
}

func (t *testBlock) Populate(context.Context, eds.Accessor) error {
return nil // noop
}
Expand Down
11 changes: 5 additions & 6 deletions share/shwap/p2p/bitswap/row_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ const (
rowMultihashCode = 0x7801
)

// maxRowSize is the maximum size of the RowBlock.
// It is calculated as half of the square size multiplied by the share size.
var maxRowSize = share.MaxSquareSize / 2 * libshare.ShareSize

func init() {
registerBlock(
rowMultihashCode,
rowCodec,
maxRowSize,
shwap.RowIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowBlockFromCID(cid)
Expand Down Expand Up @@ -73,12 +78,6 @@ func (rb *RowBlock) Height() uint64 {
return rb.ID.Height
}

func (rb *RowBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) {
squareSize := acc.Size(ctx)
rowSize := libshare.ShareSize * squareSize / 2
return rowSize, nil
}

func (rb *RowBlock) Marshal() ([]byte, error) {
if rb.Container.IsEmpty() {
return nil, fmt.Errorf("cannot marshal empty RowBlock")
Expand Down
21 changes: 4 additions & 17 deletions share/shwap/p2p/bitswap/row_namespace_data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ const (
rowNamespaceDataMultihashCode = 0x7821
)

// maxRNDSize is the maximum size of the RowNamespaceDataBlock.
var maxRNDSize = maxRowSize

func init() {
registerBlock(
rowNamespaceDataMultihashCode,
rowNamespaceDataCodec,
maxRNDSize,
shwap.RowNamespaceDataIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowNamespaceDataBlockFromCID(cid)
Expand Down Expand Up @@ -78,23 +82,6 @@ func (rndb *RowNamespaceDataBlock) Height() uint64 {
return rndb.ID.Height
}

func (rndb *RowNamespaceDataBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) {
// no way to statically learn the size of requested data, so read it out and compute
// TODO(@Wondertan): Consider adding optimized RowNamespaceDataSize method to the Accessor
err := rndb.Populate(ctx, acc)
if err != nil {
return 0, err
}

// TODO(@Wondertan): Avoid converting in favor of getting size just by looking at container
blk, err := convertBitswap(rndb)
if err != nil {
return 0, err
}

return len(blk.RawData()), nil
}

func (rndb *RowNamespaceDataBlock) Marshal() ([]byte, error) {
if rndb.Container.IsEmpty() {
return nil, fmt.Errorf("cannot marshal empty RowNamespaceDataBlock")
Expand Down
11 changes: 5 additions & 6 deletions share/shwap/p2p/bitswap/sample_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ const (
sampleMultihashCode = 0x7811
)

// maxSampleSize is the maximum size of the SampleBlock.
// It is calculated as the size of the share plus the size of the proof.
var maxSampleSize = libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(share.MaxSquareSize)))

func init() {
registerBlock(
sampleMultihashCode,
sampleCodec,
maxSampleSize,
shwap.SampleIDSize,
func(cid cid.Cid) (Block, error) {
return EmptySampleBlockFromCID(cid)
Expand Down Expand Up @@ -74,12 +79,6 @@ func (sb *SampleBlock) Height() uint64 {
return sb.ID.Height
}

func (sb *SampleBlock) Size(ctx context.Context, acc eds.Accessor) (int, error) {
squareSize := acc.Size(ctx)
sampleSize := libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(squareSize)))
return sampleSize, nil
}

func (sb *SampleBlock) Marshal() ([]byte, error) {
if sb.Container.IsEmpty() {
return nil, fmt.Errorf("cannot marshal empty SampleBlock")
Expand Down

0 comments on commit 8c953dc

Please sign in to comment.