Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replicate persistent cache task when task needs persistent replicas #3784

Merged
merged 1 commit into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.12
d7y.io/api/v2 v2.1.16
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.12 h1:jFo4TA6sRVSbcjPlFrig8S+7P37pww4bFbeTxcGhd54=
d7y.io/api/v2 v2.1.12/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.16 h1:ql4PaC17eG0NSteu+4cijrQ7vA/P/Xki4we66Q7FbQw=
d7y.io/api/v2 v2.1.16/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
198 changes: 133 additions & 65 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package persistentcache

import (
"context"
"math/rand"
"strconv"
"time"

redis "github.com/redis/go-redis/v9"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
Expand All @@ -51,6 +53,9 @@ type HostManager interface {
// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)

// LoadRandom loads host randomly through the set of redis.
LoadRandom(context.Context, int, set.SafeSet[string]) ([]*Host, error)

// RunGC runs garbage collection.
RunGC() error
}
Expand Down Expand Up @@ -446,74 +451,102 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {

// Store sets host.
func (h *hostManager) Store(ctx context.Context, host *Host) error {
_, err := h.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

return err
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
return err
}

if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil {
return err
}

return nil
}); err != nil {
host.Log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// Delete deletes host by a key.
func (h *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result()
return err
log := logger.WithHostID(hostID)
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil {
return err
}

return nil
}); err != nil {
log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// LoadAll returns all hosts.
Expand All @@ -529,7 +562,7 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
err error
)

hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result()
hostKeys, cursor, err = h.rdb.SScan(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), cursor, "*", 10).Result()
if err != nil {
logger.Error("scan hosts failed")
return nil, err
Expand All @@ -553,6 +586,41 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
return hosts, nil
}

// LoadRandom loads host randomly through the set of redis.
func (h *hostManager) LoadRandom(ctx context.Context, n int, blocklist set.SafeSet[string]) ([]*Host, error) {
hostKeys, err := h.rdb.SMembers(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID)).Result()
if err != nil {
logger.Error("smembers hosts failed")
return nil, err
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(hostKeys), func(i, j int) {
hostKeys[i], hostKeys[j] = hostKeys[j], hostKeys[i]
})

hosts := make([]*Host, 0, n)
for _, hostKey := range hostKeys {
if len(hosts) >= n {
break
}

if blocklist.Contains(hostKey) {
continue
}

host, loaded := h.Load(ctx, hostKey)
if !loaded {
logger.WithHostID(hostKey).Error("load host failed")
continue
}

hosts = append(hosts, host)
}

return hosts, nil
}

// RunGC runs garbage collection.
func (h *hostManager) RunGC() error {
hosts, err := h.LoadAll(context.Background())
Expand Down
16 changes: 16 additions & 0 deletions scheduler/resource/persistentcache/host_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type PeerManager interface {
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)

// LoadPersistentAllByTaskID returns all persistent peers by task id.
LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error)

// DeleteAllByTaskID deletes all peers by task id.
DeleteAllByTaskID(context.Context, string) error

Expand Down Expand Up @@ -242,7 +245,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
return errors.New("getting peer failed from redis")
}
Expand Down Expand Up @@ -343,6 +346,29 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
return peers, nil
}

// LoadPersistentAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
continue
}

peers = append(peers, peer)
}

return peers, nil
}

// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
Expand Down
15 changes: 15 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ type Task struct {
Application string

// Persistet cache task piece length.
PieceLength int32
PieceLength uint64

// ContentLength is persistent cache task total content length.
ContentLength int64
ContentLength uint64

// TotalPieceCount is total piece count.
TotalPieceCount int32
TotalPieceCount uint32

// Persistent cache task state machine.
FSM *fsm.FSM
Expand All @@ -102,8 +102,8 @@ type Task struct {
}

// New persistent cache task instance.
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, pieceLength int32,
contentLength int64, totalPieceCount int32, ttl time.Duration, createdAt, updatedAt time.Time,
func NewTask(id, tag, application, state string, persistentReplicaCount, pieceLength, contentLength uint64,
totalPieceCount uint32, ttl time.Duration, createdAt, updatedAt time.Time,
log *logger.SugaredLoggerOnWith) *Task {
t := &Task{
ID: id,
Expand Down
Loading
Loading