Skip to content

Commit

Permalink
feat: shard (#10)
Browse files Browse the repository at this point in the history
* feat: shard

* feat: reorder task
  • Loading branch information
MiniFrenchBread authored Jun 7, 2024
1 parent fcf1e62 commit 98d74b7
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 82 deletions.
8 changes: 4 additions & 4 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var (
node []string

force bool
disperse bool
taskSize uint
}

Expand All @@ -51,7 +50,6 @@ func init() {
uploadCmd.MarkFlagRequired("node")

uploadCmd.Flags().BoolVar(&uploadArgs.force, "force", false, "Force to upload file even already exists")
uploadCmd.Flags().BoolVar(&uploadArgs.disperse, "disperse", false, "Disperse file amoung nodes")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

rootCmd.AddCommand(uploadCmd)
Expand All @@ -70,11 +68,13 @@ func upload(*cobra.Command, []string) {
defer client.Close()
}

uploader := transfer.NewUploader(flow, clients)
uploader, err := transfer.NewUploader(flow, clients)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
opt := transfer.UploadOption{
Tags: hexutil.MustDecode(uploadArgs.tags),
Force: uploadArgs.force,
Disperse: uploadArgs.disperse,
TaskSize: uploadArgs.taskSize,
}

Expand Down
5 changes: 4 additions & 1 deletion gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

uploader := transfer.NewUploaderLight([]*node.Client{allClients[input.Node]})
uploader, err := transfer.NewUploaderLight([]*node.Client{allClients[input.Node]})
if err != nil {
return nil, ErrValidation.WithData(err)
}

filename := getFilePath(input.Path, false)

Expand Down
5 changes: 4 additions & 1 deletion kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ func (b *Batcher) Exec() error {
}

// upload file
uploader := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node})
uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node})
if err != nil {
return err
}
opt := transfer.UploadOption{
Tags: b.BuildTags(),
Force: true,
Expand Down
5 changes: 5 additions & 0 deletions node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (c *ZeroGStorageClient) DownloadSegmentWithProof(root common.Hash, index ui
return
}

func (c *ZeroGStorageClient) GetShardConfig() (shardConfig *ShardConfig, err error) {
err = c.provider.CallContext(context.Background(), &shardConfig, "zgs_getShardConfig")
return
}

// Admin RPCs
type AdminClient struct {
provider *providers.MiddlewarableProvider
Expand Down
5 changes: 5 additions & 0 deletions node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ type KeyValue struct {
Data []byte `json:"data"` // value data
Size uint64 `json:"size"` // value total size
}

type ShardConfig struct {
ShardId uint64 `json:"shardId"`
NumShard uint64 `json:"numShard"`
}
105 changes: 75 additions & 30 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transfer
import (
"math/big"
"runtime"
"sort"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
Expand Down Expand Up @@ -31,32 +32,49 @@ func isDuplicateError(msg string) bool {
type UploadOption struct {
Tags []byte // for kv operations
Force bool // for kv to upload same file
Disperse bool // disperse files to different nodes
TaskSize uint // number of segment to upload in single rpc request
}

type Uploader struct {
flow *contract.FlowContract
clients []*node.ZeroGStorageClient
flow *contract.FlowContract
clients []*node.ZeroGStorageClient
shardConfigs []*node.ShardConfig
}

func NewUploader(flow *contract.FlowContract, clients []*node.Client) *Uploader {
uploader := NewUploaderLight(clients)
func NewUploader(flow *contract.FlowContract, clients []*node.Client) (*Uploader, error) {
uploader, err := NewUploaderLight(clients)
if err != nil {
return nil, err
}
uploader.flow = flow
return uploader
return uploader, nil
}

func NewUploaderLight(clients []*node.Client) *Uploader {
func NewUploaderLight(clients []*node.Client) (*Uploader, error) {
if len(clients) == 0 {
panic("storage node not specified")
}
zgClients := make([]*node.ZeroGStorageClient, 0)
shardConfigs := make([]*node.ShardConfig, 0)
for _, client := range clients {
zgClients = append(zgClients, client.ZeroGStorage())
shardConfig, err := client.ZeroGStorage().GetShardConfig()
if err != nil {
return nil, err
}
if shardConfig.NumShard == 0 {
return nil, errors.New("NumShard is zero")
}
shardConfigs = append(shardConfigs, shardConfig)
}
return &Uploader{
clients: zgClients,
}
clients: zgClients,
shardConfigs: shardConfigs,
}, nil
}

func (uploader *Uploader) needFinality() bool {
return uploader.shardConfigs[0].NumShard == 1
}

// upload data(batchly in 1 blockchain transaction if there are more than one files)
Expand Down Expand Up @@ -127,13 +145,13 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry

for i := 0; i < n; i++ {
// Upload file to storage node
if err := uploader.UploadFile(datas[i], trees[i], 0, opts[i].Disperse, opts[i].TaskSize); err != nil {
if err := uploader.UploadFile(datas[i], trees[i], 0, opts[i].TaskSize); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to upload file")
}

if waitForLogEntry {
// Wait for transaction finality
if err := uploader.waitForLogEntry(trees[i].Root(), !opts[i].Disperse, receipt); err != nil {
if err := uploader.waitForLogEntry(trees[i].Root(), uploader.needFinality(), receipt); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
}
}
Expand Down Expand Up @@ -220,12 +238,12 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption)
}

// Upload file to storage node
if err = uploader.UploadFile(data, tree, segNum, opt.Disperse, opt.TaskSize); err != nil {
if err = uploader.UploadFile(data, tree, segNum, opt.TaskSize); err != nil {
return errors.WithMessage(err, "Failed to upload file")
}

// Wait for transaction finality
if err = uploader.waitForLogEntry(tree.Root(), !opt.Disperse, nil); err != nil {
if err = uploader.waitForLogEntry(tree.Root(), uploader.needFinality(), nil); err != nil {
return errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
}

Expand Down Expand Up @@ -323,8 +341,48 @@ func (uploader *Uploader) waitForLogEntry(root common.Hash, finalityRequired boo
return nil
}

func (uploader *Uploader) NewSegmentUploader(data core.IterableData, tree *merkle.Tree, startSegIndex uint64, taskSize uint) *SegmentUploader {
numSegments := data.NumSegments()
clientTasks := make([][]*UploadTask, 0)
for clientIndex, shardConfig := range uploader.shardConfigs {
var segIndex uint64
r := startSegIndex % shardConfig.NumShard
if r <= shardConfig.ShardId {
segIndex = startSegIndex + shardConfig.ShardId - r
} else {
segIndex = startSegIndex - r + shardConfig.ShardId + shardConfig.NumShard
}
tasks := make([]*UploadTask, 0)
for ; segIndex < numSegments; segIndex += shardConfig.NumShard * uint64(taskSize) {
tasks = append(tasks, &UploadTask{
clientIndex: clientIndex,
segIndex: segIndex,
numShard: shardConfig.NumShard,
})
}
clientTasks = append(clientTasks, tasks)
}
sort.SliceStable(clientTasks, func(i, j int) bool {
return len(clientTasks[i]) > len(clientTasks[j])
})
tasks := make([]*UploadTask, 0)
for taskIndex := 0; taskIndex < len(clientTasks[0]); taskIndex += 1 {
for i := 0; i < len(clientTasks) && taskIndex < len(clientTasks[i]); i += 1 {
tasks = append(tasks, clientTasks[i][taskIndex])
}
}

return &SegmentUploader{
data: data,
tree: tree,
clients: uploader.clients,
tasks: tasks,
taskSize: taskSize,
}
}

// TODO error tolerance
func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree, segIndex uint64, disperse bool, taskSize uint) error {
func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree, segIndex uint64, taskSize uint) error {
stageTimer := time.Now()

if taskSize == 0 {
Expand All @@ -333,32 +391,19 @@ func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree,

logrus.WithFields(logrus.Fields{
"segIndex": segIndex,
"disperse": disperse,
"nodeNum": len(uploader.clients),
}).Info("Begin to upload file")

offset := int64(segIndex * core.DefaultSegmentSize)

numSegments := (data.Size()-offset-1)/core.DefaultSegmentSize + 1
numTasks := int(numSegments-1)/int(taskSize) + 1
segmentUploader := &SegmentUploader{
data: data,
tree: tree,
clients: uploader.clients,
offset: offset,
disperse: disperse,
taskSize: taskSize,
numTasks: numTasks,
}
segmentUploader := uploader.NewSegmentUploader(data, tree, segIndex, taskSize)

err := parallel.Serial(segmentUploader, numTasks, min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0)
err := parallel.Serial(segmentUploader, len(segmentUploader.tasks), min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0)
if err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"duration": time.Since(stageTimer),
"segNum": numSegments,
"segNum": data.NumSegments() - segIndex,
}).Info("Completed to upload file")

return nil
Expand Down
60 changes: 14 additions & 46 deletions transfer/uploader_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import (
"github.com/sirupsen/logrus"
)

type UploadTask struct {
clientIndex int
segIndex uint64
numShard uint64
}

type SegmentUploader struct {
data core.IterableData
tree *merkle.Tree
clients []*node.ZeroGStorageClient
offset int64
disperse bool
tasks []*UploadTask
taskSize uint
numTasks int
}

var _ parallel.Interface = (*SegmentUploader)(nil)
Expand All @@ -28,10 +32,10 @@ func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error

// ParallelDo implements parallel.Interface.
func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) {
offset := uploader.offset + int64(task)*core.DefaultSegmentSize
numChunks := uploader.data.NumChunks()
numSegments := uploader.data.NumSegments()
segIndex := uint64(offset / core.DefaultSegmentSize)
uploadTask := uploader.tasks[task]
segIndex := uploadTask.segIndex
startSegIndex := segIndex
segments := make([]node.SegmentWithProof, 0)
for i := 0; i < int(uploader.taskSize); i++ {
Expand All @@ -43,7 +47,7 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
break
}
// get segment
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, offset, uploader.data.PaddedSize())
segment, err := core.ReadAt(uploader.data, core.DefaultSegmentSize, int64(segIndex*core.DefaultSegmentSize), uploader.data.PaddedSize())
if err != nil {
return nil, err
}
Expand All @@ -66,54 +70,18 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{},
if allDataUploaded {
break
}
segIndex += uint64(uploader.numTasks)
offset += core.DefaultSegmentSize * int64(uploader.numTasks)
segIndex += uploadTask.numShard
}
// upload
if !uploader.disperse {
if _, err := uploader.clients[0].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
} else {
clientIndex := task % (len(uploader.clients))
ok := false
// retry
for i := 0; i < len(uploader.clients); i++ {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
}).Debug("Uploading segment to node..")
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"clientIndex": clientIndex,
"error": err,
}).Warn("Failed to upload segment to node, try next node..")
clientIndex = (clientIndex + 1) % (len(uploader.clients))
} else {
ok = true
break
}
}
if !ok {
if _, err := uploader.clients[clientIndex].UploadSegments(segments); err != nil {
return nil, errors.WithMessage(err, "Failed to upload segment")
}
}
if _, err := uploader.clients[uploadTask.clientIndex].UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) {
return nil, errors.WithMessage(err, "Failed to upload segment")
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(logrus.Fields{
"total": numSegments,
"from_seg_index": startSegIndex,
"to_seg_index": segIndex,
"step": uploader.numTasks,
"step": uploadTask.numShard,
"root": core.SegmentRoot(segments[0].Data),
}).Debug("Segments uploaded")
}
Expand Down

0 comments on commit 98d74b7

Please sign in to comment.