Skip to content

Commit

Permalink
More ToChunker refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Feb 12, 2025
1 parent c3440f0 commit 72e7029
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 55 deletions.
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.Health
return nbs.CompressedChunk{}, err
}

return ArchiveToChunker{h: h, dictionary: dict, chunkData: buf}, nil
return nbs.NewArchiveToChunker(h, dict, buf), nil
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
Expand Down Expand Up @@ -385,7 +384,7 @@ func (p *Puller) Pull(ctx context.Context) error {
if err != nil {
return err
}
} else if _, ok := cChk.(remotestorage.ArchiveToChunker); ok {
} else if _, ok := cChk.(nbs.ArchiveToChunker); ok {
// NM4 - Until we can write quickly to archives.....
cc := nbs.ChunkToCompressedChunk(chnk)

Expand Down
15 changes: 7 additions & 8 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,23 @@ func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, keeper keepe
return res, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
// single threaded first pass.
foundAll := true
for i, req := range reqs {
h := *req.a
data, err := acs.aRdr.get(h)

chunker, err := acs.aRdr.getAsToChunker(h)
if err != nil {
return true, gcBehavior_Continue, err
}
if data == nil {
if chunker == nil {
foundAll = false
} else {
if keeper != nil && keeper(h) {
return true, gcBehavior_Block, nil
}
chunk := chunks.NewChunk(data)
found(ctx, &chunk)
found(ctx, chunker)
reqs[i].found = true
}
}
Expand Down Expand Up @@ -201,9 +201,8 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getR
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
// NM4 - UPDATE. this is def wrong.
found(ctx, ChunkToCompressedChunk(*chk))
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk ToChunker) {
found(ctx, chk)
}, keeper, stats)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package remotestorage
package nbs

import (
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/gozstd"
)

Expand All @@ -27,7 +26,11 @@ type ArchiveToChunker struct {
chunkData []byte
}

var _ nbs.ToChunker = (*ArchiveToChunker)(nil)
var _ ToChunker = (*ArchiveToChunker)(nil)

func NewArchiveToChunker(h hash.Hash, dict *gozstd.DDict, chunkData []byte) ToChunker {
return ArchiveToChunker{h: h, dictionary: dict, chunkData: chunkData}
}

func (a ArchiveToChunker) Hash() hash.Hash {
return a.h
Expand Down
13 changes: 13 additions & 0 deletions go/store/nbs/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ func (ar archiveReader) get(hash hash.Hash) ([]byte, error) {

var result []byte
if dict == nil {
// NM4 - This should never happen in practice. transport protocol requires there be a dictionary for every chunk.
// in an archive. Determine if we can remove this.
result, err = gozstd.Decompress(nil, data)
} else {
result, err = gozstd.DecompressDict(nil, data, dict)
Expand All @@ -284,6 +286,17 @@ func (ar archiveReader) get(hash hash.Hash) ([]byte, error) {
return result, nil
}

// getAsToChunker returns the chunk which is has not been decompressed. Similar to get, but with a different return type.
// If the hash is not found, nil is returnes (no error)
func (ar archiveReader) getAsToChunker(h hash.Hash) (ToChunker, error) {
dict, data, err := ar.getRaw(h)
if err != nil || data == nil {
return nil, err
}

return ArchiveToChunker{h, dict, data}, nil
}

func (ar archiveReader) count() uint32 {
return ar.footer.chunkCount
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (tcs *testChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gc
panic("never used")
}

func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
panic("never used")
}

Expand Down
12 changes: 8 additions & 4 deletions go/store/nbs/cmp_chunk_table_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -144,9 +143,9 @@ func compareContentsOfTables(t *testing.T, ctx context.Context, hashes hash.Hash

func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader) (map[hash.Hash][]byte, error) {
reqs := toGetRecords(hashes)
found := make([]*chunks.Chunk, 0)
found := make([]ToChunker, 0)
eg, ctx := errgroup.WithContext(ctx)
_, _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, nil, &Stats{})
_, _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c ToChunker) { found = append(found, c) }, nil, &Stats{})
if err != nil {
return nil, err
}
Expand All @@ -156,7 +155,12 @@ func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader)
}

hashToData := make(map[hash.Hash][]byte)
for _, c := range found {
for _, tc := range found {
c, err := tc.ToChunk()
if err != nil {
return nil, err
}

hashToData[c.Hash()] = c.Data()
}

Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/empty_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (ecs emptyChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF
return nil, gcBehavior_Continue, nil
}

func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return true, gcBehavior_Continue, nil
}

Expand Down
12 changes: 2 additions & 10 deletions go/store/nbs/journal_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,9 @@ type journalRecord struct {
idx int
}

func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (s journalChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return s.getManyCompressed(ctx, eg, reqs, func(ctx context.Context, cc ToChunker) {
ch, err := cc.ToChunk()
if err != nil {
eg.Go(func() error {
return err
})
return
}
chWHash := chunks.NewChunkWithHash(cc.Hash(), ch.Data())
found(ctx, &chWHash)
found(ctx, cc)
}, keeper, stats)
}

Expand Down
34 changes: 31 additions & 3 deletions go/store/nbs/mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,16 @@ func (mt *memTable) get(ctx context.Context, h hash.Hash, keeper keeperF, stats
return c, gcBehavior_Continue, nil
}

func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (mt *memTable) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
var remaining bool
for i, r := range reqs {
data := mt.chunks[*r.a]
if data != nil {
if keeper != nil && keeper(*r.a) {
return true, gcBehavior_Block, nil
}
c := chunks.NewChunkWithHash(hash.Hash(*r.a), data)
reqs[i].found = true
found(ctx, &c)
found(ctx, memToChunker{h: *r.a, rawChunk: data})
} else {
remaining = true
}
Expand Down Expand Up @@ -270,3 +269,32 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name
func (mt *memTable) close() error {
return nil
}

// memToChunker is a ToChunker that wraps a raw bytes, which haven't been compressed for any storage format.
type memToChunker struct {
h hash.Hash
rawChunk []byte
}

func (mtc memToChunker) FullCompressedChunkLen() uint32 {
// NM4 - This interface is a lie.
panic("implement me")
}

func (mtc memToChunker) IsEmpty() bool {
return len(mtc.rawChunk) == 0
}

func (mtc memToChunker) IsGhost() bool {
return false
}

var _ ToChunker = (*memToChunker)(nil)

func (mtc memToChunker) Hash() hash.Hash {
return mtc.h
}

func (mtc memToChunker) ToChunk() (chunks.Chunk, error) {
return chunks.NewChunk(mtc.rawChunk), nil
}
2 changes: 1 addition & 1 deletion go/store/nbs/mem_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (crg chunkReaderGroup) hasMany(addrs []hasRecord, keeper keeperF) (bool, gc
return true, gcBehavior_Continue, nil
}

func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (crg chunkReaderGroup) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
for _, haver := range crg {
remaining, gcb, err := haver.getMany(ctx, eg, reqs, found, keeper, stats)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ func (nbs *NomsBlockStore) getChunkLocations(ctx context.Context, hashes hash.Ha

}

// string or hash.Hash?!?!?
func (nbs *NomsBlockStore) GetChunkLocations(ctx context.Context, hashes hash.HashSet) (map[string]map[hash.Hash]Range, error) {
sourcesToRanges, err := nbs.getChunkLocations(ctx, hashes)
if err != nil {
Expand Down Expand Up @@ -926,7 +925,15 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
defer span.End()
return nbs.getManyWithFunc(ctx, hashes, gcDependencyMode_TakeDependency,
func(ctx context.Context, cr chunkReader, eg *errgroup.Group, reqs []getRecord, keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return cr.getMany(ctx, eg, reqs, found, keeper, nbs.stats)
wrappedFound := func(ctx context.Context, c ToChunker) {
chk, err := c.ToChunk()
if err != nil {
// Uh oh. NM4.
panic("unexpected error converting chunk to chunk")
}
found(ctx, &chk)
}
return cr.getMany(ctx, eg, reqs, wrappedFound, keeper, nbs.stats)
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type chunkReader interface {

// getMany sets getRecord.found to true, and calls |found| for each present getRecord query.
// It returns true if any getRecord query was not found in this chunkReader.
getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error)
getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error)

// getManyCompressed sets getRecord.found to true, and calls |found| for each present getRecord query.
// It returns true if any getRecord query was not found in this chunkReader.
Expand Down
16 changes: 5 additions & 11 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,11 @@ func (tr tableReader) readCompressedAtOffsets(
func (tr tableReader) readAtOffsets(
ctx context.Context,
rb readBatch,
found func(context.Context, *chunks.Chunk),
found func(context.Context, ToChunker),
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(ctx context.Context, cmp ToChunker) error {
chk, err := cmp.ToChunk()

if err != nil {
return err
}

found(ctx, &chk)
return tr.readAtOffsetsWithCB(ctx, rb, stats, func(ctx context.Context, chk ToChunker) error {
found(ctx, chk)
return nil
})
}
Expand Down Expand Up @@ -403,7 +397,7 @@ func (tr tableReader) getMany(
ctx context.Context,
eg *errgroup.Group,
reqs []getRecord,
found func(context.Context, *chunks.Chunk),
found func(context.Context, ToChunker),
keeper keeperF,
stats *Stats) (bool, gcBehavior, error) {

Expand Down Expand Up @@ -446,7 +440,7 @@ func (tr tableReader) getManyAtOffsets(
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(context.Context, *chunks.Chunk),
found func(context.Context, ToChunker),
stats *Stats,
) error {
return tr.getManyAtOffsetsWithReadFunc(ctx, eg, offsetRecords, stats, func(
Expand Down
3 changes: 1 addition & 2 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -197,7 +196,7 @@ func (ts tableSet) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *
return f(ts.upstream)
}

func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
f := func(css chunkSourceSet) (bool, gcBehavior, error) {
for _, haver := range css {
remaining, gcb, err := haver.getMany(ctx, eg, reqs, found, keeper, stats)
Expand Down
9 changes: 4 additions & 5 deletions go/store/nbs/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -289,8 +288,8 @@ func TestGetMany(t *testing.T) {

eg, ctx := errgroup.WithContext(context.Background())

got := make([]*chunks.Chunk, 0)
_, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, nil, &Stats{})
got := make([]ToChunker, 0)
_, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c ToChunker) { got = append(got, c) }, nil, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())

Expand Down Expand Up @@ -460,8 +459,8 @@ func doTestNGetMany(t *testing.T, count int) {

eg, ctx := errgroup.WithContext(context.Background())

got := make([]*chunks.Chunk, 0)
_, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c *chunks.Chunk) { got = append(got, c) }, nil, &Stats{})
got := make([]ToChunker, 0)
_, _, err = tr.getMany(ctx, eg, getBatch, func(ctx context.Context, c ToChunker) { got = append(got, c) }, nil, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())

Expand Down

0 comments on commit 72e7029

Please sign in to comment.