Skip to content

Commit

Permalink
feat: select using segment tree (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread authored Jul 1, 2024
1 parent 31bccd4 commit 07307f1
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
func download(*cobra.Command, []string) {
nodes := node.MustNewClients(downloadArgs.nodes)

downloader, err := transfer.NewDownloader(nodes...)
downloader, err := transfer.NewDownloader(nodes)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize downloader")
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func downloadFileLocal(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

downloader, err := transfer.NewDownloader(allClients[input.Node])
downloader, err := transfer.NewDownloader([]*node.Client{allClients[input.Node]})
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions indexer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ func (api *IndexerApi) GetNodes() ([]ShardedNode, error) {
if err != nil {
return nil, errors.WithMessage(err, "Failed to query shard config from storage node")
}
if config.IsValid() {
result = append(result, ShardedNode{
URL: v.URL(),
Config: config,
})
}

result = append(result, ShardedNode{
URL: v.URL(),
Config: config,
})
}

return result, nil
Expand Down
80 changes: 63 additions & 17 deletions indexer/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package indexer

import (
"math/rand"
"sort"

"github.com/0glabs/0g-storage-client/node"
)
Expand All @@ -15,29 +15,75 @@ type Interface interface {
GetNodes() ([]ShardedNode, error)
}

func Select(nodes []ShardedNode, segmentIndex uint64, replica int) []ShardedNode {
var matched []ShardedNode
type shardSegmentTreeNode struct {
childs []*shardSegmentTreeNode
numShard uint
lazyTags uint
replica uint
}

for _, v := range nodes {
if v.Config.HasSegment(segmentIndex) {
matched = append(matched, v)
func (node *shardSegmentTreeNode) pushdown() {
if node.childs == nil {
node.childs = make([]*shardSegmentTreeNode, 2)
for i := 0; i < 2; i += 1 {
node.childs[i] = &shardSegmentTreeNode{
numShard: node.numShard << 1,
replica: 0,
lazyTags: 0,
}
}
}

numMatched := len(matched)
if numMatched == 0 {
return nil
for i := 0; i < 2; i += 1 {
node.childs[i].replica += node.lazyTags
node.childs[i].lazyTags += node.lazyTags
}
node.lazyTags = 0
}

perm := rand.Perm(numMatched)
result := make([]ShardedNode, numMatched)
for i := 0; i < numMatched; i++ {
result[i] = matched[perm[i]]
// insert a shard if it contributes to the replica
func (node *shardSegmentTreeNode) insert(numShard uint, shardId uint, expectedReplica uint) bool {
if node.replica >= expectedReplica {
return false
}
if node.numShard == numShard {
node.replica += 1
node.lazyTags += 1
return true
}
node.pushdown()
inserted := node.childs[shardId%2].insert(numShard, shardId>>1, expectedReplica)
node.replica = min(node.childs[0].replica, node.childs[1].replica)
return inserted
}

if replica < numMatched {
result = result[:replica]
// select a set of given sharded node and make the data is replicated at least expctedReplica times
// return the selected nodes and if selection is successful
func Select(nodes []ShardedNode, expectedReplica uint) ([]ShardedNode, bool) {
selected := make([]ShardedNode, 0)
if expectedReplica == 0 {
return selected, true
}
// sort by shard size from large to small
sort.Slice(nodes, func(i, j int) bool {
if nodes[i].Config.NumShard == nodes[j].Config.NumShard {
return nodes[i].Config.ShardId < nodes[j].Config.ShardId
}
return nodes[i].Config.NumShard < nodes[j].Config.NumShard
})
// build segment tree to select proper nodes
root := shardSegmentTreeNode{
numShard: 1,
replica: 0,
lazyTags: 0,
}

return result
for _, node := range nodes {
if root.insert(uint(node.Config.NumShard), uint(node.Config.ShardId), expectedReplica) {
selected = append(selected, node)
}
if root.replica >= expectedReplica {
return selected, true
}
}
return make([]ShardedNode, 0), false
}
73 changes: 73 additions & 0 deletions indexer/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package indexer

import (
"fmt"
"testing"

"github.com/0glabs/0g-storage-client/node"
"gotest.tools/assert"
)

func makeShardNode(numShard uint, shardId uint) ShardedNode {
return ShardedNode{Config: node.ShardConfig{
NumShard: uint64(numShard),
ShardId: uint64(shardId),
}}
}

func TestSelect(t *testing.T) {
shardedNodes := []ShardedNode{
makeShardNode(4, 0),
makeShardNode(4, 2),
makeShardNode(4, 3),
makeShardNode(1, 0),
makeShardNode(2, 0),
makeShardNode(8, 1),
makeShardNode(8, 5),
makeShardNode(16, 0),
makeShardNode(16, 1),
makeShardNode(16, 2),
makeShardNode(16, 3),
makeShardNode(16, 4),
makeShardNode(16, 5),
makeShardNode(16, 6),
makeShardNode(16, 7),
makeShardNode(16, 8),
makeShardNode(16, 9),
makeShardNode(16, 10),
makeShardNode(16, 11),
makeShardNode(16, 12),
makeShardNode(16, 13),
makeShardNode(16, 14),
makeShardNode(16, 15),
}
selected, found := Select(shardedNodes, 2)
assert.Equal(t, found, true)
fmt.Println(selected)
assert.Equal(t, len(selected), 5)
assert.DeepEqual(t, selected[0], makeShardNode(1, 0))
assert.DeepEqual(t, selected[1], makeShardNode(2, 0))
assert.DeepEqual(t, selected[2], makeShardNode(4, 3))
assert.DeepEqual(t, selected[3], makeShardNode(8, 1))
assert.DeepEqual(t, selected[4], makeShardNode(8, 5))
selected, found = Select(shardedNodes, 3)
assert.Equal(t, found, true)
assert.Equal(t, len(selected), 15)
assert.DeepEqual(t, selected[0], makeShardNode(1, 0))
assert.DeepEqual(t, selected[1], makeShardNode(2, 0))
assert.DeepEqual(t, selected[2], makeShardNode(4, 0))
assert.DeepEqual(t, selected[3], makeShardNode(4, 2))
assert.DeepEqual(t, selected[4], makeShardNode(4, 3))
assert.DeepEqual(t, selected[5], makeShardNode(8, 1))
assert.DeepEqual(t, selected[6], makeShardNode(8, 5))
assert.DeepEqual(t, selected[7], makeShardNode(16, 1))
assert.DeepEqual(t, selected[8], makeShardNode(16, 3))
assert.DeepEqual(t, selected[9], makeShardNode(16, 5))
assert.DeepEqual(t, selected[10], makeShardNode(16, 7))
assert.DeepEqual(t, selected[11], makeShardNode(16, 9))
assert.DeepEqual(t, selected[12], makeShardNode(16, 11))
assert.DeepEqual(t, selected[13], makeShardNode(16, 13))
assert.DeepEqual(t, selected[14], makeShardNode(16, 15))
_, found = Select(shardedNodes, 4)
assert.Equal(t, found, false)
}
5 changes: 5 additions & 0 deletions node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ type ShardConfig struct {
func (config *ShardConfig) HasSegment(segmentIndex uint64) bool {
return config.NumShard < 2 || segmentIndex%config.NumShard == config.ShardId
}

func (config *ShardConfig) IsValid() bool {
// NumShard should be larger than zero and be power of 2
return config.NumShard > 0 && (config.NumShard&(config.NumShard-1) == 0) && config.ShardId < config.NumShard
}
26 changes: 12 additions & 14 deletions transfer/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,17 @@ import (
)

type Downloader struct {
clients []*node.Client
shardConfigs []*node.ShardConfig
clients []*node.Client
}

func NewDownloader(clients ...*node.Client) (*Downloader, error) {
func NewDownloader(clients []*node.Client) (*Downloader, error) {
if len(clients) == 0 {
panic("storage node not specified")
return nil, errors.New("storage node not specified")
}

shardConfigs, err := getShardConfigs(clients)
if err != nil {
return nil, err
downloader := &Downloader{
clients: clients,
}

return &Downloader{
clients: clients,
shardConfigs: shardConfigs,
}, nil
return downloader, nil
}

func (downloader *Downloader) Download(root, filename string, withProof bool) error {
Expand Down Expand Up @@ -111,7 +104,12 @@ func (downloader *Downloader) downloadFile(filename string, root common.Hash, si

logrus.WithField("clients", len(downloader.clients)).Info("Begin to download file from storage node")

sd, err := NewSegmentDownloader(downloader.clients, downloader.shardConfigs, file, withProof)
shardConfigs, err := getShardConfigs(downloader.clients)
if err != nil {
return err
}

sd, err := NewSegmentDownloader(downloader.clients, shardConfigs, file, withProof)
if err != nil {
return errors.WithMessage(err, "Failed to create segment downloader")
}
Expand Down
Loading

0 comments on commit 07307f1

Please sign in to comment.