Skip to content

Commit

Permalink
feat: split large files (#67)
Browse files Browse the repository at this point in the history
* feat: splitable upload

* feat: routines config

* feat: cli

* test: splitable upload

* cli: upload dir
  • Loading branch information
MiniFrenchBread authored Nov 12, 2024
1 parent 9d367b1 commit a5e4e5c
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 216 deletions.
22 changes: 19 additions & 3 deletions cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common"
Expand All @@ -20,8 +21,11 @@ type downloadArgument struct {
nodes []string

root string
roots []string
proof bool

routines int

timeout time.Duration
}

Expand All @@ -34,9 +38,14 @@ func bindDownloadFlags(cmd *cobra.Command, args *downloadArgument) {
cmd.MarkFlagsOneRequired("indexer", "node")

cmd.Flags().StringVar(&args.root, "root", "", "Merkle root to download file")
cmd.MarkFlagRequired("root")
cmd.Flags().StringSliceVar(&args.roots, "roots", []string{}, "Merkle roots to download fragments")
cmd.MarkFlagsOneRequired("root", "roots")
cmd.MarkFlagsMutuallyExclusive("root", "roots")

cmd.Flags().BoolVar(&args.proof, "proof", false, "Whether to download with merkle proof for validation")

cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for downloading simutanously")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

Expand Down Expand Up @@ -70,8 +79,14 @@ func download(*cobra.Command, []string) {
}
defer closer()

if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
if downloadArgs.root != "" {
if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
} else {
if err := downloader.DownloadFragments(ctx, downloadArgs.roots, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
}
}

Expand Down Expand Up @@ -100,6 +115,7 @@ func newDownloader(args downloadArgument) (transfer.IDownloader, func(), error)
closer()
return nil, nil, err
}
downloader.WithRoutines(downloadArgs.routines)

return downloader, closer, nil
}
22 changes: 21 additions & 1 deletion cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"context"
"math/big"
"runtime"
"strings"
"time"

zg_common "github.com/0glabs/0g-storage-client/common"
Expand Down Expand Up @@ -51,6 +53,9 @@ type uploadArgument struct {
skipTx bool
finalityRequired bool
taskSize uint
routines int

fragmentSize int64

timeout time.Duration
}
Expand All @@ -71,6 +76,10 @@ func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) {
cmd.Flags().BoolVar(&args.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
cmd.Flags().UintVar(&args.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

cmd.Flags().Int64Var(&args.fragmentSize, "fragment-size", 1024*1024*1024*4, "the size of fragment to split into when file is too large")

cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for uploading simutanously")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

Expand Down Expand Up @@ -136,10 +145,21 @@ func upload(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()
uploader.WithRoutines(uploadArgs.routines)

if _, err := uploader.Upload(ctx, file, opt); err != nil {
_, roots, err := uploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt)
if err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
if len(roots) == 1 {
logrus.Infof("file uploaded, root = %v", roots[0])
} else {
s := make([]string, len(roots))
for i, root := range roots {
s[i] = root.String()
}
logrus.Infof("file uploaded in %v fragments, roots = %v", len(roots), strings.Join(s, ","))
}
}

func newUploader(ctx context.Context, segNum uint64, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/upload_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func uploadDir(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()
uploader.WithRoutines(uploadArgs.routines)

txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt)
if err != nil {
Expand Down
10 changes: 2 additions & 8 deletions common/shard/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package shard

import (
"sort"
"time"

"golang.org/x/exp/rand"
"github.com/0glabs/0g-storage-client/common/util"
)

type ShardConfig struct {
Expand Down Expand Up @@ -130,12 +129,7 @@ func CheckReplica(shardConfigs []*ShardConfig, expectedReplica uint) bool {
// Helper function to pre-process (sort or shuffle) the nodes before selection
func prepareSelectionNodes(nodes []*ShardedNode, random bool) []*ShardedNode {
if random {
// Shuffle the nodes randomly if needed
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
for i := range nodes {
j := rng.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
util.Shuffle(nodes)
} else {
// Sort nodes based on NumShard and ShardId
sort.Slice(nodes, func(i, j int) bool {
Expand Down
15 changes: 15 additions & 0 deletions common/util/shuffle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package util

import (
"time"

"golang.org/x/exp/rand"
)

func Shuffle[T any](items []T) {
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
for i := range items {
j := rng.Intn(i + 1)
items[i], items[j] = items[j], items[i]
}
}
3 changes: 2 additions & 1 deletion core/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ var (
type IterableData interface {
NumChunks() uint64
NumSegments() uint64
Offset() int64
Size() int64
PaddedSize() uint64
Iterate(offset int64, batch int64, flowPadding bool) Iterator
Read(buf []byte, offset int64) (int, error)
Split(fragmentSize int64) []IterableData
}

// MerkleTree create merkle tree of the data.
Expand Down
102 changes: 23 additions & 79 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type File struct {
os.FileInfo
underlying *os.File
paddedSize uint64
offset int64
size int64
}

var _ IterableData = (*File)(nil)

func (file *File) Read(buf []byte, offset int64) (int, error) {
n, err := file.underlying.ReadAt(buf, offset)
n, err := file.underlying.ReadAt(buf, file.offset+offset)
// unexpected IO error
if !errors.Is(err, io.EOF) {
return 0, err
Expand Down Expand Up @@ -72,6 +74,8 @@ func Open(name string) (*File, error) {
return &File{
FileInfo: info,
underlying: file,
offset: 0,
size: info.Size(),
paddedSize: IteratorPaddedSize(info.Size(), true),
}, nil
}
Expand Down Expand Up @@ -110,86 +114,26 @@ func (file *File) PaddedSize() uint64 {
return file.paddedSize
}

func (file *File) Iterate(offset int64, batch int64, flowPadding bool) Iterator {
if batch%DefaultChunkSize > 0 {
panic("batch size should align with chunk size")
}
dataSize := file.Size()
return &FileIterator{
file: file.underlying,
buf: make([]byte, batch),
offset: offset,
fileSize: dataSize,
paddedSize: IteratorPaddedSize(dataSize, flowPadding),
}
}

type FileIterator struct {
file *os.File
buf []byte // buffer to read data from file
bufSize int // actual data size in buffer
fileSize int64
paddedSize uint64
offset int64 // offset to read data
}

var _ Iterator = (*FileIterator)(nil)

func (it *FileIterator) Next() (bool, error) {
// Reject invalid offset
if it.offset < 0 || uint64(it.offset) >= it.paddedSize {
return false, nil
}

var expectedBufSize int
maxAvailableLength := it.paddedSize - uint64(it.offset)
if maxAvailableLength >= uint64(len(it.buf)) {
expectedBufSize = len(it.buf)
} else {
expectedBufSize = int(maxAvailableLength)
}

it.clearBuffer()

if it.offset >= it.fileSize {
it.paddingZeros(expectedBufSize)
return true, nil
}

n, err := it.file.ReadAt(it.buf, it.offset)
it.bufSize = n
it.offset += int64(n)

// not reach EOF
if n == expectedBufSize {
return true, nil
}

// unexpected IO error
if !errors.Is(err, io.EOF) {
return false, err
}

if n > expectedBufSize {
// should never happen
panic("load more data from file than expected")
}

it.paddingZeros(expectedBufSize - n)

return true, nil
}

func (it *FileIterator) clearBuffer() {
it.bufSize = 0
func (file *File) Size() int64 {
return file.size
}

func (it *FileIterator) paddingZeros(length int) {
paddingZeros(it.buf, it.bufSize, length)
it.bufSize += length
it.offset += int64(length)
func (file *File) Offset() int64 {
return file.offset
}

func (it *FileIterator) Current() []byte {
return it.buf[:it.bufSize]
func (file *File) Split(fragmentSize int64) []IterableData {
fragments := make([]IterableData, 0)
for offset := file.offset; offset < file.offset+file.size; offset += fragmentSize {
size := min(file.size-offset, fragmentSize)
fragment := &File{
FileInfo: file.FileInfo,
underlying: file.underlying,
offset: offset,
size: size,
paddedSize: IteratorPaddedSize(size, true),
}
fragments = append(fragments, fragment)
}
return fragments
}
4 changes: 2 additions & 2 deletions core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
return &submission, nil
}

func nextPow2(input uint64) uint64 {
func NextPow2(input uint64) uint64 {
x := input
x -= 1
x |= x >> 32
Expand All @@ -57,7 +57,7 @@ func nextPow2(input uint64) uint64 {
}

func ComputePaddedSize(chunks uint64) (uint64, uint64) {
chunksNextPow2 := nextPow2(chunks)
chunksNextPow2 := NextPow2(chunks)
if chunksNextPow2 == chunks {
return chunksNextPow2, chunksNextPow2
}
Expand Down
Loading

0 comments on commit a5e4e5c

Please sign in to comment.