From 832bd529f6983b2de01915ee8baa494002dca045 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 18 Feb 2025 22:39:17 +0800 Subject: [PATCH] feat: replace Pipeliner with lua (#3846) Signed-off-by: Gaius --- .../resource/persistentcache/host_manager.go | 325 +++++++--- .../persistentcache/host_manager_test.go | 372 ----------- .../resource/persistentcache/peer_manager.go | 231 ++++--- .../persistentcache/peer_manager_test.go | 581 ------------------ .../resource/persistentcache/task_manager.go | 92 ++- .../persistentcache/task_manager_test.go | 149 ----- 6 files changed, 458 insertions(+), 1292 deletions(-) diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index 9ddcd66fb8d..acd91eaa49c 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -119,7 +119,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { } // Set boolean fields from raw host. - diableShared, err := strconv.ParseBool(rawHost["disable_shared"]) + disableShared, err := strconv.ParseBool(rawHost["disable_shared"]) if err != nil { log.Errorf("parsing disable shared failed: %v", err) return nil, false @@ -435,7 +435,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { int32(port), int32(downloadPort), uint64(schedulerClusterID), - diableShared, + disableShared, pkgtypes.ParseHostType(rawHost["type"]), cpu, memory, @@ -451,76 +451,212 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Store sets host. func (h *hostManager) Store(ctx context.Context, host *Host) error { - if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - if _, err := pipe.HSet(ctx, - pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID), - "id", host.ID, - "type", host.Type.Name(), - "hostname", host.Hostname, - "ip", host.IP, - "port", host.Port, - "download_port", host.DownloadPort, - "disable_shared", host.DisableShared, - "os", host.OS, - "platform", host.Platform, - "platform_family", host.PlatformFamily, - "platform_version", host.PlatformVersion, - "kernel_version", host.KernelVersion, - "cpu_logical_count", host.CPU.LogicalCount, - "cpu_physical_count", host.CPU.PhysicalCount, - "cpu_percent", host.CPU.Percent, - "cpu_processe_percent", host.CPU.ProcessPercent, - "cpu_times_user", host.CPU.Times.User, - "cpu_times_system", host.CPU.Times.System, - "cpu_times_idle", host.CPU.Times.Idle, - "cpu_times_nice", host.CPU.Times.Nice, - "cpu_times_iowait", host.CPU.Times.Iowait, - "cpu_times_irq", host.CPU.Times.Irq, - "cpu_times_softirq", host.CPU.Times.Softirq, - "cpu_times_steal", host.CPU.Times.Steal, - "cpu_times_guest", host.CPU.Times.Guest, - "cpu_times_guest_nice", host.CPU.Times.GuestNice, - "memory_total", host.Memory.Total, - "memory_available", host.Memory.Available, - "memory_used", host.Memory.Used, - "memory_used_percent", host.Memory.UsedPercent, - "memory_processe_used_percent", host.Memory.ProcessUsedPercent, - "memory_free", host.Memory.Free, - "network_tcp_connection_count", host.Network.TCPConnectionCount, - "network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount, - "network_location", host.Network.Location, - "network_idc", host.Network.IDC, - "network_download_rate", host.Network.DownloadRate, - "network_download_rate_limit", host.Network.DownloadRateLimit, - "network_upload_rate", host.Network.UploadRate, - "network_upload_rate_limit", host.Network.UploadRateLimit, - "disk_total", host.Disk.Total, - "disk_free", host.Disk.Free, - "disk_used", host.Disk.Used, - "disk_used_percent", host.Disk.UsedPercent, - "disk_inodes_total", host.Disk.InodesTotal, - "disk_inodes_used", host.Disk.InodesUsed, - "disk_inodes_free", host.Disk.InodesFree, - "disk_inodes_used_percent", host.Disk.InodesUsedPercent, - "disk_write_bandwidth", host.Disk.WriteBandwidth, - "disk_read_bandwidth", host.Disk.ReadBandwidth, - "build_git_version", host.Build.GitVersion, - "build_git_commit", host.Build.GitCommit, - "build_go_version", host.Build.GoVersion, - "build_platform", host.Build.Platform, - "scheduler_cluster_id", host.SchedulerClusterID, - "announce_interval", host.AnnounceInterval.Nanoseconds(), - "created_at", host.CreatedAt.Format(time.RFC3339), - "updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { - return err - } - - if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil { - return err - } - - return nil - }); err != nil { + // Define the Lua script as a string. + const storeHostScript = ` +-- Extract keys and arguments +local host_key = KEYS[1] -- Key for the host hash +local hosts_set_key = KEYS[2] -- Key for the set of hosts + +-- Extract host fields from arguments +local host_id = ARGV[1] +local host_type = ARGV[2] +local hostname = ARGV[3] +local ip = ARGV[4] +local port = ARGV[5] +local download_port = ARGV[6] +local disable_shared = tonumber(ARGV[7]) +local os = ARGV[8] +local platform = ARGV[9] +local platform_family = ARGV[10] +local platform_version = ARGV[11] +local kernel_version = ARGV[12] +local cpu_logical_count = ARGV[13] +local cpu_physical_count = ARGV[14] +local cpu_percent = ARGV[15] +local cpu_process_percent = ARGV[16] +local cpu_times_user = ARGV[17] +local cpu_times_system = ARGV[18] +local cpu_times_idle = ARGV[19] +local cpu_times_nice = ARGV[20] +local cpu_times_iowait = ARGV[21] +local cpu_times_irq = ARGV[22] +local cpu_times_softirq = ARGV[23] +local cpu_times_steal = ARGV[24] +local cpu_times_guest = ARGV[25] +local cpu_times_guest_nice = ARGV[26] +local memory_total = ARGV[27] +local memory_available = ARGV[28] +local memory_used = ARGV[29] +local memory_used_percent = ARGV[30] +local memory_process_used_percent = ARGV[31] +local memory_free = ARGV[32] +local network_tcp_connection_count = ARGV[33] +local network_upload_tcp_connection_count = ARGV[34] +local network_location = ARGV[35] +local network_idc = ARGV[36] +local network_download_rate = ARGV[37] +local network_download_rate_limit = ARGV[38] +local network_upload_rate = ARGV[39] +local network_upload_rate_limit = ARGV[40] +local disk_total = ARGV[41] +local disk_free = ARGV[42] +local disk_used = ARGV[43] +local disk_used_percent = ARGV[44] +local disk_inodes_total = ARGV[45] +local disk_inodes_used = ARGV[46] +local disk_inodes_free = ARGV[47] +local disk_inodes_used_percent = ARGV[48] +local disk_write_bandwidth = ARGV[49] +local disk_read_bandwidth = ARGV[50] +local build_git_version = ARGV[51] +local build_git_commit = ARGV[52] +local build_go_version = ARGV[53] +local build_platform = ARGV[54] +local scheduler_cluster_id = ARGV[55] +local announce_interval = ARGV[56] +local created_at = ARGV[57] +local updated_at = ARGV[58] + +-- Perform HSET operation +redis.call("HSET", host_key, + "id", host_id, + "type", host_type, + "hostname", hostname, + "ip", ip, + "port", port, + "download_port", download_port, + "disable_shared", disable_shared, + "os", os, + "platform", platform, + "platform_family", platform_family, + "platform_version", platform_version, + "kernel_version", kernel_version, + "cpu_logical_count", cpu_logical_count, + "cpu_physical_count", cpu_physical_count, + "cpu_percent", cpu_percent, + "cpu_processe_percent", cpu_process_percent, + "cpu_times_user", cpu_times_user, + "cpu_times_system", cpu_times_system, + "cpu_times_idle", cpu_times_idle, + "cpu_times_nice", cpu_times_nice, + "cpu_times_iowait", cpu_times_iowait, + "cpu_times_irq", cpu_times_irq, + "cpu_times_softirq", cpu_times_softirq, + "cpu_times_steal", cpu_times_steal, + "cpu_times_guest", cpu_times_guest, + "cpu_times_guest_nice", cpu_times_guest_nice, + "memory_total", memory_total, + "memory_available", memory_available, + "memory_used", memory_used, + "memory_used_percent", memory_used_percent, + "memory_processe_used_percent", memory_process_used_percent, + "memory_free", memory_free, + "network_tcp_connection_count", network_tcp_connection_count, + "network_upload_tcp_connection_count", network_upload_tcp_connection_count, + "network_location", network_location, + "network_idc", network_idc, + "network_download_rate", network_download_rate, + "network_download_rate_limit", network_download_rate_limit, + "network_upload_rate", network_upload_rate, + "network_upload_rate_limit", network_upload_rate_limit, + "disk_total", disk_total, + "disk_free", disk_free, + "disk_used", disk_used, + "disk_used_percent", disk_used_percent, + "disk_inodes_total", disk_inodes_total, + "disk_inodes_used", disk_inodes_used, + "disk_inodes_free", disk_inodes_free, + "disk_inodes_used_percent", disk_inodes_used_percent, + "disk_write_bandwidth", disk_write_bandwidth, + "disk_read_bandwidth", disk_read_bandwidth, + "build_git_version", build_git_version, + "build_git_commit", build_git_commit, + "build_go_version", build_go_version, + "build_platform", build_platform, + "scheduler_cluster_id", scheduler_cluster_id, + "announce_interval", announce_interval, + "created_at", created_at, + "updated_at", updated_at) + +-- Perform SADD operation +redis.call("SADD", hosts_set_key, host_id) + +return true +` + + // Create a new Redis script. + script := redis.NewScript(storeHostScript) + + // Prepare keys. + keys := []string{ + pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID), + pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), + } + + // Prepare arguments. + args := []interface{}{ + host.ID, + host.Type.Name(), + host.Hostname, + host.IP, + host.Port, + host.DownloadPort, + host.DisableShared, + host.OS, + host.Platform, + host.PlatformFamily, + host.PlatformVersion, + host.KernelVersion, + host.CPU.LogicalCount, + host.CPU.PhysicalCount, + host.CPU.Percent, + host.CPU.ProcessPercent, + host.CPU.Times.User, + host.CPU.Times.System, + host.CPU.Times.Idle, + host.CPU.Times.Nice, + host.CPU.Times.Iowait, + host.CPU.Times.Irq, + host.CPU.Times.Softirq, + host.CPU.Times.Steal, + host.CPU.Times.Guest, + host.CPU.Times.GuestNice, + host.Memory.Total, + host.Memory.Available, + host.Memory.Used, + host.Memory.UsedPercent, + host.Memory.ProcessUsedPercent, + host.Memory.Free, + host.Network.TCPConnectionCount, + host.Network.UploadTCPConnectionCount, + host.Network.Location, + host.Network.IDC, + host.Network.DownloadRate, + host.Network.DownloadRateLimit, + host.Network.UploadRate, + host.Network.UploadRateLimit, + host.Disk.Total, + host.Disk.Free, + host.Disk.Used, + host.Disk.UsedPercent, + host.Disk.InodesTotal, + host.Disk.InodesUsed, + host.Disk.InodesFree, + host.Disk.InodesUsedPercent, + host.Disk.WriteBandwidth, + host.Disk.ReadBandwidth, + host.Build.GitVersion, + host.Build.GitCommit, + host.Build.GoVersion, + host.Build.Platform, + host.SchedulerClusterID, + host.AnnounceInterval.Nanoseconds(), + host.CreatedAt.Format(time.RFC3339), + host.UpdatedAt.Format(time.RFC3339), + } + + // Execute the script. + if err := script.Run(ctx, h.rdb, keys, args...).Err(); err != nil { host.Log.Errorf("store host failed: %v", err) return err } @@ -530,19 +666,44 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error { // Delete deletes host by a key. func (h *hostManager) Delete(ctx context.Context, hostID string) error { + // Define the Lua script as a string. + const deleteHostScript = ` +-- Extract keys +local host_key = KEYS[1] -- Key for the host hash +local hosts_set_key = KEYS[2] -- Key for the set of hosts + +-- Extract arguments +local host_id = ARGV[1] + +-- Perform DEL operation to delete the host hash +redis.call("DEL", host_key) + +-- Perform SREM operation to remove the host ID from the set +redis.call("SREM", hosts_set_key, host_id) + +return true +` + log := logger.WithHostID(hostID) - if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil { - return err - } - if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil { - return err - } + // Create a new Redis script. + script := redis.NewScript(deleteHostScript) - return nil - }); err != nil { - log.Errorf("store host failed: %v", err) + // Prepare keys. + keys := []string{ + pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID), + pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), + } + + // Prepare arguments. + args := []interface{}{ + hostID, + } + + // Execute the script. + err := script.Run(ctx, h.rdb, keys, args...).Err() + if err != nil { + log.Errorf("delete host failed: %v", err) return err } diff --git a/scheduler/resource/persistentcache/host_manager_test.go b/scheduler/resource/persistentcache/host_manager_test.go index c242dc9cad0..c6970188d97 100644 --- a/scheduler/resource/persistentcache/host_manager_test.go +++ b/scheduler/resource/persistentcache/host_manager_test.go @@ -368,304 +368,6 @@ func TestHostManager_Load(t *testing.T) { } } -func TestHostManager_Store(t *testing.T) { - tests := []struct { - name string - host *Host - mockRedis func(mock redismock.ClientMock) - expectedError bool - errorMsg string - }{ - { - name: "store host successfully", - host: &Host{ - ID: "host1", - Type: pkgtypes.HostTypeNormal, - Hostname: "hostname1", - IP: "127.0.0.1", - Port: 8080, - DownloadPort: 8081, - DisableShared: false, - OS: "linux", - Platform: "x86_64", - PlatformFamily: "debian", - PlatformVersion: "11", - KernelVersion: "5.10", - CPU: CPU{LogicalCount: 4, PhysicalCount: 2, Percent: 50.0, ProcessPercent: 25.0, Times: CPUTimes{User: 10.0, System: 5.0, Idle: 100.0, Nice: 0.0, Iowait: 1.0, Irq: 0.5, Softirq: 0.2, Steal: 0.1, Guest: 0.0, GuestNice: 0.0}}, - Memory: Memory{Total: 8000000000, Available: 4000000000, Used: 4000000000, UsedPercent: 50.0, ProcessUsedPercent: 25.0, Free: 2000000000}, - Network: Network{TCPConnectionCount: 100, UploadTCPConnectionCount: 50, Location: "location1", IDC: "idc1", DownloadRate: 1000000, DownloadRateLimit: 2000000, UploadRate: 500000, UploadRateLimit: 1000000}, - Disk: Disk{Total: 100000000000, Free: 50000000000, Used: 50000000000, UsedPercent: 50.0, InodesTotal: 100000, InodesUsed: 50000, InodesFree: 50000, InodesUsedPercent: 50.0, WriteBandwidth: 10000000, ReadBandwidth: 20000000}, - Build: Build{GitVersion: "v1.0.0", GitCommit: "commit1", GoVersion: "1.16", Platform: "linux/amd64"}, - SchedulerClusterID: 1, - AnnounceInterval: time.Duration(300), - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - Log: logger.WithHost("host1", "hostname1", "127.0.0.1"), - }, - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectTxPipeline() - mock.ExpectHSet(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host1"), - "id", "host1", - "type", "normal", - "hostname", "hostname1", - "ip", "127.0.0.1", - "port", int32(8080), - "download_port", int32(8081), - "disable_shared", false, - "os", "linux", - "platform", "x86_64", - "platform_family", "debian", - "platform_version", "11", - "kernel_version", "5.10", - "cpu_logical_count", uint32(4), - "cpu_physical_count", uint32(2), - "cpu_percent", float64(50), - "cpu_processe_percent", float64(25), - "cpu_times_user", float64(10), - "cpu_times_system", float64(5), - "cpu_times_idle", float64(100), - "cpu_times_nice", float64(0), - "cpu_times_iowait", float64(1), - "cpu_times_irq", float64(0.5), - "cpu_times_softirq", float64(0.2), - "cpu_times_steal", float64(0.1), - "cpu_times_guest", float64(0), - "cpu_times_guest_nice", float64(0), - "memory_total", uint64(8000000000), - "memory_available", uint64(4000000000), - "memory_used", uint64(4000000000), - "memory_used_percent", float64(50), - "memory_processe_used_percent", float64(25), - "memory_free", uint64(2000000000), - "network_tcp_connection_count", uint32(100), - "network_upload_tcp_connection_count", uint32(50), - "network_location", "location1", - "network_idc", "idc1", - "network_download_rate", uint64(1000000), - "network_download_rate_limit", uint64(2000000), - "network_upload_rate", uint64(500000), - "network_upload_rate_limit", uint64(1000000), - "disk_total", uint64(100000000000), - "disk_free", uint64(50000000000), - "disk_used", uint64(50000000000), - "disk_used_percent", float64(50), - "disk_inodes_total", uint64(100000), - "disk_inodes_used", uint64(50000), - "disk_inodes_free", uint64(50000), - "disk_inodes_used_percent", float64(50), - "disk_write_bandwidth", uint64(10000000), - "disk_read_bandwidth", uint64(20000000), - "build_git_version", "v1.0.0", - "build_git_commit", "commit1", - "build_go_version", "1.16", - "build_platform", "linux/amd64", - "scheduler_cluster_id", uint64(1), - "announce_interval", int64(300), - "created_at", time.Now().Format(time.RFC3339), - "updated_at", time.Now().Format(time.RFC3339)).SetVal(1) - mock.ExpectSAdd(pkgredis.MakePersistentCacheHostsInScheduler(1), "host1").SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectedError: false, - }, - { - name: "store host fails", - host: &Host{ - ID: "host2", - Type: pkgtypes.HostTypeNormal, - Hostname: "hostname2", - IP: "127.0.0.2", - Port: 8080, - DownloadPort: 8081, - DisableShared: false, - OS: "linux", - Platform: "x86_64", - PlatformFamily: "debian", - PlatformVersion: "11", - KernelVersion: "5.10", - CPU: CPU{LogicalCount: 4, PhysicalCount: 2, Percent: 50.0, ProcessPercent: 25.0, Times: CPUTimes{User: 10.0, System: 5.0, Idle: 100.0, Nice: 0.0, Iowait: 1.0, Irq: 0.5, Softirq: 0.2, Steal: 0.1, Guest: 0.0, GuestNice: 0.0}}, - Memory: Memory{Total: 8000000000, Available: 4000000000, Used: 4000000000, UsedPercent: 50.0, ProcessUsedPercent: 25.0, Free: 2000000000}, - Network: Network{TCPConnectionCount: 100, UploadTCPConnectionCount: 50, Location: "location2", IDC: "idc2", DownloadRate: 1000000, DownloadRateLimit: 2000000, UploadRate: 500000, UploadRateLimit: 1000000}, - Disk: Disk{Total: 100000000000, Free: 50000000000, Used: 50000000000, UsedPercent: 50.0, InodesTotal: 100000, InodesUsed: 50000, InodesFree: 50000, InodesUsedPercent: 50.0, WriteBandwidth: 10000000, ReadBandwidth: 20000000}, - Build: Build{GitVersion: "v1.0.0", GitCommit: "commit2", GoVersion: "1.16", Platform: "linux/amd64"}, - SchedulerClusterID: 1, - AnnounceInterval: time.Duration(300), - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - Log: logger.WithHost("host2", "hostname2", "127.0.0.2"), - }, - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectTxPipeline() - mock.ExpectHSet(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host2"), - "id", "host2", - "type", "normal", - "hostname", "hostname2", - "ip", "127.0.0.2", - "port", int32(8080), - "download_port", int32(8081), - "disable_shared", false, - "os", "linux", - "platform", "x86_64", - "platform_family", "debian", - "platform_version", "11", - "kernel_version", "5.10", - "cpu_logical_count", uint32(4), - "cpu_physical_count", uint32(2), - "cpu_percent", float64(50), - "cpu_processe_percent", float64(25), - "cpu_times_user", float64(10), - "cpu_times_system", float64(5), - "cpu_times_idle", float64(100), - "cpu_times_nice", float64(0), - "cpu_times_iowait", float64(1), - "cpu_times_irq", float64(0.5), - "cpu_times_softirq", float64(0.2), - "cpu_times_steal", float64(0.1), - "cpu_times_guest", float64(0), - "cpu_times_guest_nice", float64(0), - "memory_total", uint64(8000000000), - "memory_available", uint64(4000000000), - "memory_used", uint64(4000000000), - "memory_used_percent", float64(50), - "memory_processe_used_percent", float64(25), - "memory_free", uint64(2000000000), - "network_tcp_connection_count", uint32(100), - "network_upload_tcp_connection_count", uint32(50), - "network_location", "location2", - "network_idc", "idc2", - "network_download_rate", uint64(1000000), - "network_download_rate_limit", uint64(2000000), - "network_upload_rate", uint64(500000), - "network_upload_rate_limit", uint64(1000000), - "disk_total", uint64(100000000000), - "disk_free", uint64(50000000000), - "disk_used", uint64(50000000000), - "disk_used_percent", float64(50), - "disk_inodes_total", uint64(100000), - "disk_inodes_used", uint64(50000), - "disk_inodes_free", uint64(50000), - "disk_inodes_used_percent", float64(50), - "disk_write_bandwidth", uint64(10000000), - "disk_read_bandwidth", uint64(20000000), - "build_git_version", "v1.0.0", - "build_git_commit", "commit2", - "build_go_version", "1.16", - "build_platform", "linux/amd64", - "scheduler_cluster_id", uint64(1), - "announce_interval", int64(300), - "created_at", time.Now().Format(time.RFC3339), - "updated_at", time.Now().Format(time.RFC3339)).SetErr(fmt.Errorf("Redis error")) - }, - expectedError: true, - errorMsg: "Redis error", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - rdb, mock := redismock.NewClientMock() - tt.mockRedis(mock) - - h := &hostManager{ - config: &config.Config{ - Manager: config.ManagerConfig{ - SchedulerClusterID: 1, - }, - }, - rdb: rdb, - } - - err := h.Store(context.Background(), tt.host) - if tt.expectedError { - assert.Error(t, err) - assert.Contains(t, err.Error(), tt.errorMsg) - } else { - assert.NoError(t, err) - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("there were unfulfilled expectations: %s", err) - } - }) - } -} -func TestHostManager_Delete(t *testing.T) { - tests := []struct { - name string - hostID string - mockRedis func(mock redismock.ClientMock) - expectedError bool - errorMsg string - }{ - { - name: "delete host successfully", - hostID: "host1", - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectTxPipeline() - mock.ExpectDel(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host1")).SetVal(1) - mock.ExpectSRem(pkgredis.MakePersistentCacheHostsInScheduler(1), "host1").SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectedError: false, - }, - { - name: "delete host fails on Del", - hostID: "host2", - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectTxPipeline() - mock.ExpectDel(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host2")).SetErr(fmt.Errorf("Redis error")) - }, - expectedError: true, - errorMsg: "Redis error", - }, - { - name: "delete host fails on SRem", - hostID: "host3", - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectTxPipeline() - mock.ExpectDel(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host3")).SetVal(1) - mock.ExpectSRem(pkgredis.MakePersistentCacheHostsInScheduler(1), "host3").SetErr(fmt.Errorf("Redis error")) - }, - expectedError: true, - errorMsg: "Redis error", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - rdb, mock := redismock.NewClientMock() - tt.mockRedis(mock) - - h := &hostManager{ - config: &config.Config{ - Manager: config.ManagerConfig{ - SchedulerClusterID: 1, - }, - }, - rdb: rdb, - } - - err := h.Delete(context.Background(), tt.hostID) - if tt.expectedError { - assert.Error(t, err) - assert.Contains(t, err.Error(), tt.errorMsg) - } else { - assert.NoError(t, err) - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("there were unfulfilled expectations: %s", err) - } - }) - } -} - func TestHostManager_LoadAll(t *testing.T) { tests := []struct { name string @@ -1077,80 +779,6 @@ func TestHostManager_RunGC(t *testing.T) { }, expectErr: false, }, - { - name: "one host older than 2 intervals => delete", - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectSScan(pkgredis.MakePersistentCacheHostsInScheduler(1), 0, "*", 10). - SetVal([]string{"host1"}, 0) - mock.ExpectHGetAll(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host1")). - SetVal(map[string]string{ - "id": "host1", - "type": "normal", - "hostname": "hostname1", - "ip": "127.0.0.1", - "port": "8080", - "download_port": "8081", - "disable_shared": "false", - "os": "linux", - "platform": "x86_64", - "platform_family": "debian", - "platform_version": "11", - "kernel_version": "5.10", - "cpu_logical_count": "4", - "cpu_physical_count": "2", - "cpu_percent": "50.0", - "cpu_processe_percent": "25.0", - "cpu_times_user": "10.0", - "cpu_times_system": "5.0", - "cpu_times_idle": "100.0", - "cpu_times_nice": "0.0", - "cpu_times_iowait": "1.0", - "cpu_times_irq": "0.5", - "cpu_times_softirq": "0.2", - "cpu_times_steal": "0.1", - "cpu_times_guest": "0.0", - "cpu_times_guest_nice": "0.0", - "memory_total": "8000000000", - "memory_available": "4000000000", - "memory_used": "4000000000", - "memory_used_percent": "50.0", - "memory_processe_used_percent": "25.0", - "memory_free": "2000000000", - "network_tcp_connection_count": "100", - "network_upload_tcp_connection_count": "50", - "network_location": "location1", - "network_idc": "idc1", - "network_download_rate": "1000000", - "network_download_rate_limit": "2000000", - "network_upload_rate": "500000", - "network_upload_rate_limit": "1000000", - "disk_total": "100000000000", - "disk_free": "50000000000", - "disk_used": "50000000000", - "disk_used_percent": "50.0", - "disk_inodes_total": "100000", - "disk_inodes_used": "50000", - "disk_inodes_free": "50000", - "disk_inodes_used_percent": "50.0", - "disk_write_bandwidth": "10000000", - "disk_read_bandwidth": "20000000", - "build_git_version": "v1.0.0", - "build_git_commit": "commit1", - "build_go_version": "1.16", - "build_platform": "linux/amd64", - "scheduler_cluster_id": "1", - "announce_interval": "1", - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - - mock.ExpectTxPipeline() - mock.ExpectDel(pkgredis.MakePersistentCacheHostKeyInScheduler(1, "host1")).SetVal(1) - mock.ExpectSRem(pkgredis.MakePersistentCacheHostsInScheduler(1), "host1").SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectErr: false, - }, } for _, tt := range tests { diff --git a/scheduler/resource/persistentcache/peer_manager.go b/scheduler/resource/persistentcache/peer_manager.go index 97effb9dd9f..0adfc8946cd 100644 --- a/scheduler/resource/persistentcache/peer_manager.go +++ b/scheduler/resource/persistentcache/peer_manager.go @@ -172,6 +172,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) { // Store sets persistent cache peer. func (p *peerManager) Store(ctx context.Context, peer *Peer) error { + // Marshal finished pieces and block parents. finishedPieces, err := peer.FinishedPieces.MarshalBinary() if err != nil { peer.Log.Errorf("marshal finished pieces failed: %v", err) @@ -184,67 +185,93 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error { return err } - if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - // Store peer information and set expiration. - if _, err := pipe.HSet(ctx, - pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), - "id", peer.ID, - "persistent", peer.Persistent, - "finished_pieces", finishedPieces, - "state", peer.FSM.Current(), - "block_parents", blockParents, - "task_id", peer.Task.ID, - "host_id", peer.Host.ID, - "cost", peer.Cost.Nanoseconds(), - "created_at", peer.CreatedAt.Format(time.RFC3339), - "updated_at", peer.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { - peer.Log.Errorf("store peer failed: %v", err) - return err - } - - ttl := peer.Task.TTL - time.Since(peer.Task.CreatedAt) - if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), ttl).Result(); err != nil { - peer.Log.Errorf("set peer ttl failed: %v", err) - return err - } - - // Store the joint-set with task and set expiration. - if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID).Result(); err != nil { - peer.Log.Errorf("add peer id to task joint-set failed: %v", err) - return err - } - - if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil { - peer.Log.Errorf("set task joint-set ttl failed: %v", err) - return err - } - - // Store the joint-set with task for persistent peer and set expiration. - if peer.Persistent { - if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID).Result(); err != nil { - peer.Log.Errorf("add persistent peer id to task joint-set failed: %v", err) - return err - } - - if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil { - peer.Log.Errorf("set task joint-set ttl failed: %v", err) - return err - } - } - - // Store the joint-set with host. - if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID).Result(); err != nil { - peer.Log.Errorf("add peer id to host joint-set failed: %v", err) - return err - } + // Calculate remaining TTL in seconds. + ttl := peer.Task.TTL - time.Since(peer.Task.CreatedAt) + remainingTTLSeconds := int64(ttl.Seconds()) + + // Define the Lua script as a string. + const storePeerScript = ` +-- Extract keys +local peer_key = KEYS[1] -- Key for the peer hash +local task_peers_key = KEYS[2] -- Key for the task joint-set +local persistent_task_peers_key = KEYS[3] -- Key for the persistent task joint-set +local host_peers_key = KEYS[4] -- Key for the host joint-set + +-- Extract arguments +local peer_id = ARGV[1] +local persistent = tonumber(ARGV[2]) +local finished_pieces = ARGV[3] +local state = ARGV[4] +local block_parents = ARGV[5] +local task_id = ARGV[6] +local host_id = ARGV[7] +local cost = ARGV[8] +local created_at = ARGV[9] +local updated_at = ARGV[10] +local ttl_seconds = tonumber(ARGV[11]) + +-- Store peer information +redis.call("HSET", peer_key, + "id", peer_id, + "persistent", persistent, + "finished_pieces", finished_pieces, + "state", state, + "block_parents", block_parents, + "task_id", task_id, + "host_id", host_id, + "cost", cost, + "created_at", created_at, + "updated_at", updated_at) + +-- Set expiration for the peer key +redis.call("EXPIRE", peer_key, ttl_seconds) + +-- Add peer ID to the task joint-set +redis.call("SADD", task_peers_key, peer_id) +redis.call("EXPIRE", task_peers_key, ttl_seconds) + +-- Add peer ID to the persistent task joint-set if persistent +if persistent == 1 then + redis.call("SADD", persistent_task_peers_key, peer_id) + redis.call("EXPIRE", persistent_task_peers_key, ttl_seconds) +end + +-- Add peer ID to the host joint-set +redis.call("SADD", host_peers_key, peer_id) +redis.call("EXPIRE", host_peers_key, ttl_seconds) + +return true +` + + // Create a new Redis script. + script := redis.NewScript(storePeerScript) + + // Prepare keys. + keys := []string{ + pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), + pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), + pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), + pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), + } - if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil { - peer.Log.Errorf("set task joint-set ttl failed: %v", err) - return err - } + // Prepare arguments. + args := []interface{}{ + peer.ID, + peer.Persistent, + string(finishedPieces), + peer.FSM.Current(), + string(blockParents), + peer.Task.ID, + peer.Host.ID, + peer.Cost.Nanoseconds(), + peer.CreatedAt.Format(time.RFC3339), + peer.UpdatedAt.Format(time.RFC3339), + remainingTTLSeconds, + } - return nil - }); err != nil { + // Execute the script. + err = script.Run(ctx, p.rdb, keys, args...).Err() + if err != nil { peer.Log.Errorf("store peer failed: %v", err) return err } @@ -254,37 +281,73 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error { // Delete deletes persistent cache peer by a key, and it will delete the association with task and host at the same time. func (p *peerManager) Delete(ctx context.Context, peerID string) error { + // Define the Lua script as a string. + const deletePeerScript = ` +-- Extract keys +local peer_key = KEYS[1] -- Key for the peer hash +local task_peers_key = KEYS[2] -- Key for the task joint-set +local persistent_task_peers_key = KEYS[3] -- Key for the persistent task joint-set +local host_peers_key = KEYS[4] -- Key for the host joint-set + +-- Extract arguments +local peer_id = ARGV[1] +local persistent = tonumber(ARGV[2]) +local task_id = ARGV[3] +local host_id = ARGV[4] + +-- Check if the peer exists +if redis.call("EXISTS", peer_key) == 0 then + return {err = "peer not found"} +end + +-- Delete the peer key +redis.call("DEL", peer_key) + +-- Remove peer ID from the task joint-set +redis.call("SREM", task_peers_key, peer_id) + +-- Remove peer ID from the persistent task joint-set if persistent is 1 +if persistent == 1 then + redis.call("SREM", persistent_task_peers_key, peer_id) +end + +-- Remove peer ID from the host joint-set +redis.call("SREM", host_peers_key, peer_id) + +return true +` + log := logger.WithPeerID(peerID) - if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - peer, found := p.Load(ctx, peerID) - if !found { - return errors.New("getting peer failed from redis") - } - if _, err := pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result(); err != nil { - log.Errorf("delete peer failed: %v", err) - return err - } + // Create a new Redis script. + script := redis.NewScript(deletePeerScript) - if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil { - log.Errorf("delete peer id from task joint-set failed: %v", err) - return err - } + // Load the peer to get its metadata. + peer, found := p.Load(ctx, peerID) + if !found { + log.Errorf("getting peer failed from redis") + return errors.New("getting peer failed from redis") + } - if peer.Persistent { - if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil { - log.Errorf("delete persistent peer id from task joint-set failed: %v", err) - } - } + // Prepare keys. + keys := []string{ + pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID), + pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), + pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), + pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), + } - if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peerID).Result(); err != nil { - log.Errorf("delete peer id from host joint-set failed: %v", err) - return err - } + // Prepare arguments. + args := []interface{}{ + peerID, + peer.Persistent, + peer.Task.ID, + peer.Host.ID, + } - return nil - }); err != nil { - log.Errorf("store peer failed: %v", err) + // Execute the script. + if err := script.Run(ctx, p.rdb, keys, args...).Err(); err != nil { + peer.Log.Errorf("delete peer failed: %v", err) return err } diff --git a/scheduler/resource/persistentcache/peer_manager_test.go b/scheduler/resource/persistentcache/peer_manager_test.go index 0a487afe0e1..84b60d007fa 100644 --- a/scheduler/resource/persistentcache/peer_manager_test.go +++ b/scheduler/resource/persistentcache/peer_manager_test.go @@ -18,7 +18,6 @@ package persistentcache import ( "context" - "encoding/json" "errors" "strconv" "testing" @@ -237,352 +236,6 @@ func TestPeerManager_Load(t *testing.T) { }) } } -func TestPeerManager_Store(t *testing.T) { - type args struct { - peer *Peer - } - - tests := []struct { - name string - args args - mockRedis func(mock redismock.ClientMock) - expectedErr bool - }{ - { - name: "successful store", - args: args{ - peer: NewPeer( - "goodpeer", - PeerStateSucceeded, - true, - bitset.New(2).Set(1), - []string{"parent1", "parent2"}, - &Task{ID: "task1", TTL: 4 * time.Minute, CreatedAt: time.Now().Add(1 * time.Second)}, - &Host{ID: "host1"}, - time.Second, - time.Now(), - time.Now(), - logger.WithPeer("host1", "task1", "goodpeer"), - ), - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - blockParents, err := json.Marshal([]string{"parent1", "parent2"}) - if err != nil { - t.Fatalf("failed to marshal block_parents: %v", err) - } - - mock.ExpectTxPipeline() - mock.ExpectHSet( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "goodpeer"), - "id", "goodpeer", - "persistent", true, - "finished_pieces", finishedPieces, - "state", PeerStateSucceeded, - "block_parents", blockParents, - "task_id", "task1", - "host_id", "host1", - "cost", time.Second.Nanoseconds(), - "created_at", time.Now().Format(time.RFC3339), - "updated_at", time.Now().Format(time.RFC3339), - ).SetVal(1) - mock.ExpectExpire( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "goodpeer"), - 4*time.Minute, - ).SetVal(true) - mock.ExpectSAdd( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - "goodpeer", - ).SetVal(1) - mock.ExpectExpire( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - 4*time.Minute, - ).SetVal(true) - mock.ExpectSAdd( - pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(42, "task1"), - "goodpeer", - ).SetVal(1) - mock.ExpectExpire( - pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(42, "task1"), - 4*time.Minute, - ).SetVal(true) - mock.ExpectSAdd( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "host1"), - "goodpeer", - ).SetVal(1) - mock.ExpectExpire( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "host1"), - 4*time.Minute, - ).SetVal(true) - mock.ExpectTxPipelineExec() - }, - expectedErr: false, - }, - { - name: "redis transaction error", - args: args{ - peer: NewPeer( - "goodpeer", - PeerStateSucceeded, - true, - bitset.New(2).Set(1), - []string{"parent1", "parent2"}, - &Task{ID: "task1", TTL: 5 * time.Minute, CreatedAt: time.Now().Add(-1 * time.Minute)}, - &Host{ID: "host1"}, - time.Second, - time.Now(), - time.Now(), - logger.WithPeer("host1", "task1", "goodpeer"), - ), - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - blockParents, err := json.Marshal([]string{"parent1", "parent2"}) - if err != nil { - t.Fatalf("failed to marshal block_parents: %v", err) - } - - mock.ExpectTxPipeline() - mock.ExpectHSet( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "goodpeer"), - "id", "goodpeer", - "persistent", true, - "finished_pieces", finishedPieces, - "state", PeerStateSucceeded, - "block_parents", blockParents, - "task_id", "task1", - "host_id", "host1", - "cost", time.Second.Nanoseconds(), - "created_at", time.Now().Format(time.RFC3339), - "updated_at", time.Now().Format(time.RFC3339), - ).SetErr(errors.New("redis error")) - }, - expectedErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - rdb, mock := redismock.NewClientMock() - tt.mockRedis(mock) - - hostManager := NewMockHostManager(ctrl) - taskManager := NewMockTaskManager(ctrl) - - pm := &peerManager{ - config: &config.Config{ - Manager: config.ManagerConfig{ - SchedulerClusterID: 42, - }, - }, - rdb: rdb, - hostManager: hostManager, - taskManager: taskManager, - } - - err := pm.Store(context.Background(), tt.args.peer) - if tt.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet redis expectations: %v", err) - } - }) - } -} - -func TestPeerManager_Delete(t *testing.T) { - type args struct { - peerID string - } - - tests := []struct { - name string - args args - mock func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) - mockRedis func(mock redismock.ClientMock) - expectedErr bool - }{ - { - name: "peer not found", - args: args{ - peerID: "notfound", - }, - mockRedis: func(mock redismock.ClientMock) { - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "notfound"), - ).SetVal(map[string]string{}) - }, - expectedErr: true, - }, - { - name: "redis delete error", - args: args{ - peerID: "deleteerror", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "deleteerror"), - ).SetVal(map[string]string{ - "id": "deleteerror", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "host1", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "deleteerror"), - ).SetErr(errors.New("redis delete error")) - }, - expectedErr: true, - }, - { - name: "successful delete", - args: args{ - peerID: "goodpeer", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "goodpeer"), - ).SetVal(map[string]string{ - "id": "goodpeer", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "127.0.0.1-foo", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "goodpeer"), - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - "goodpeer", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(42, "task1"), - "goodpeer", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "127.0.0.1-foo"), - "goodpeer", - ).SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectedErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - rdb, mock := redismock.NewClientMock() - tt.mockRedis(mock) - - hostManager := NewMockHostManager(ctrl) - taskManager := NewMockTaskManager(ctrl) - - if tt.mock != nil { - tt.mock(hostManager, hostManager.EXPECT(), taskManager, taskManager.EXPECT()) - } - - pm := &peerManager{ - config: &config.Config{ - Manager: config.ManagerConfig{ - SchedulerClusterID: 42, - }, - }, - rdb: rdb, - hostManager: hostManager, - taskManager: taskManager, - } - - err := pm.Delete(context.Background(), tt.args.peerID) - if tt.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet redis expectations: %v", err) - } - }) - } -} func TestPeerManager_LoadAll(t *testing.T) { tests := []struct { @@ -1105,123 +758,6 @@ func TestPeerManager_DeleteAllByTaskID(t *testing.T) { }, expectedErr: true, }, - { - name: "delete peer error", - args: args{ - taskID: "task1", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectSMembers( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - ).SetVal([]string{"peer1"}) - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(map[string]string{ - "id": "peer1", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "127.0.0.1-foo", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetErr(errors.New("redis delete error")) - }, - expectedErr: false, - }, - { - name: "successful delete all", - args: args{ - taskID: "task1", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectSMembers( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - ).SetVal([]string{"peer1"}) - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(map[string]string{ - "id": "peer1", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "127.0.0.1-foo", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - "peer1", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(42, "task1"), - "peer1", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "127.0.0.1-foo"), - "peer1", - ).SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectedErr: false, - }, } for _, tt := range tests { @@ -1513,123 +1049,6 @@ func TestPeerManager_DeleteAllByHostID(t *testing.T) { }, expectedErr: true, }, - { - name: "delete peer error", - args: args{ - hostID: "host1", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectSMembers( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "host1"), - ).SetVal([]string{"peer1"}) - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(map[string]string{ - "id": "peer1", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "127.0.0.1-foo", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetErr(errors.New("redis delete error")) - }, - expectedErr: false, - }, - { - name: "successful delete all", - args: args{ - hostID: "host1", - }, - mock: func(hostManager *MockHostManager, mockHostManager *MockHostManagerMockRecorder, taskManager *MockTaskManager, mockTaskManager *MockTaskManagerMockRecorder) { - mockHostManager.Load(gomock.Any(), gomock.Any()).Return(&mockRawHost, true).Times(1) - mockTaskManager.Load(gomock.Any(), gomock.Any()).Return(NewTask( - "task1", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), true).Times(1) - }, - mockRedis: func(mock redismock.ClientMock) { - finishedPieces, err := bitset.New(2).Set(1).MarshalBinary() - if err != nil { - t.Fatalf("failed to marshal bitset: %v", err) - } - - mock.ExpectSMembers( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "host1"), - ).SetVal([]string{"peer1"}) - mock.ExpectHGetAll( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(map[string]string{ - "id": "peer1", - "state": PeerStateSucceeded, - "persistent": "true", - "finished_pieces": string(finishedPieces), - "block_parents": `["parent1", "parent2"]`, - "task_id": "task1", - "host_id": "127.0.0.1-foo", - "cost": strconv.FormatUint(uint64(time.Second.Nanoseconds()), 10), - "created_at": time.Now().Format(time.RFC3339), - "updated_at": time.Now().Format(time.RFC3339), - }) - mock.ExpectTxPipeline() - mock.ExpectDel( - pkgredis.MakePersistentCachePeerKeyInScheduler(42, "peer1"), - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(42, "task1"), - "peer1", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(42, "task1"), - "peer1", - ).SetVal(1) - mock.ExpectSRem( - pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(42, "127.0.0.1-foo"), - "peer1", - ).SetVal(1) - mock.ExpectTxPipelineExec() - }, - expectedErr: false, - }, } for _, tt := range tests { diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go index 1fc013197da..e5c2607f200 100644 --- a/scheduler/resource/persistentcache/task_manager.go +++ b/scheduler/resource/persistentcache/task_manager.go @@ -152,32 +152,76 @@ func (t *taskManager) LoadCurrentPersistentReplicaCount(ctx context.Context, tas // Store sets persistent cache task. func (t *taskManager) Store(ctx context.Context, task *Task) error { - if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - if _, err := pipe.HSet(ctx, - pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), - "id", task.ID, - "persistent_replica_count", task.PersistentReplicaCount, - "tag", task.Tag, - "application", task.Application, - "piece_length", task.PieceLength, - "content_length", task.ContentLength, - "total_piece_count", task.TotalPieceCount, - "state", task.FSM.Current(), - "ttl", task.TTL.Nanoseconds(), - "created_at", task.CreatedAt.Format(time.RFC3339), - "updated_at", task.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { - task.Log.Errorf("store task failed: %v", err) - return err - } + // Calculate remaining TTL in seconds. + ttl := task.TTL - time.Since(task.CreatedAt) + remainingTTLSeconds := int64(ttl.Seconds()) + + // Define the Lua script as a string. + const storeTaskScript = ` +-- Extract keys +local task_key = KEYS[1] -- Key for the task hash + +-- Extract arguments +local task_id = ARGV[1] +local persistent_replica_count = ARGV[2] +local tag = ARGV[3] +local application = ARGV[4] +local piece_length = ARGV[5] +local content_length = ARGV[6] +local total_piece_count = ARGV[7] +local state = ARGV[8] +local created_at = ARGV[9] +local updated_at = ARGV[10] +local ttl = tonumber(ARGV[11]) +local ttl_seconds = tonumber(ARGV[12]) + +-- Perform HSET operation to store task details +redis.call("HSET", task_key, + "id", task_id, + "persistent_replica_count", persistent_replica_count, + "tag", tag, + "application", application, + "piece_length", piece_length, + "content_length", content_length, + "total_piece_count", total_piece_count, + "state", state, + "ttl", ttl, + "created_at", created_at, + "updated_at", updated_at) + +-- Perform EXPIRE operation if TTL is still valid +redis.call("EXPIRE", task_key, ttl_seconds) + +return true +` + + // Create a new Redis script. + script := redis.NewScript(storeTaskScript) + + // Prepare keys. + keys := []string{ + pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), + } - ttl := task.TTL - time.Since(task.CreatedAt) - if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), ttl).Result(); err != nil { - task.Log.Errorf("set task ttl failed: %v", err) - return err - } + // Prepare arguments. + args := []interface{}{ + task.ID, + task.PersistentReplicaCount, + task.Tag, + task.Application, + task.PieceLength, + task.ContentLength, + task.TotalPieceCount, + task.FSM.Current(), + task.CreatedAt.Format(time.RFC3339), + task.UpdatedAt.Format(time.RFC3339), + task.TTL.Nanoseconds(), + remainingTTLSeconds, + } - return nil - }); err != nil { + // Execute the script. + err := script.Run(ctx, t.rdb, keys, args...).Err() + if err != nil { task.Log.Errorf("store task failed: %v", err) return err } diff --git a/scheduler/resource/persistentcache/task_manager_test.go b/scheduler/resource/persistentcache/task_manager_test.go index d4b283c17b6..8feba1b21b2 100644 --- a/scheduler/resource/persistentcache/task_manager_test.go +++ b/scheduler/resource/persistentcache/task_manager_test.go @@ -183,155 +183,6 @@ func TestTaskManager_Load(t *testing.T) { } } -func TestTaskManager_Store(t *testing.T) { - type args struct { - task *Task - } - - tests := []struct { - name string - args args - mockRedis func(mock redismock.ClientMock, task *Task) - expectedErr bool - }{ - { - name: "store success", - args: args{ - task: NewTask( - "store-success", - "test-tag", - "test-app", - TaskStateSucceeded, - 1, - 1024, - 2048, - 2, - 5*time.Minute, - time.Now().Add(-1*time.Minute), - time.Now(), - logger.WithTaskID("store-success"), - ), - }, - mockRedis: func(mock redismock.ClientMock, task *Task) { - mock.ExpectTxPipeline() - mock.ExpectHSet( - pkgredis.MakePersistentCacheTaskKeyInScheduler(42, task.ID), - "id", task.ID, - "persistent_replica_count", task.PersistentReplicaCount, - "tag", task.Tag, - "application", task.Application, - "piece_length", task.PieceLength, - "content_length", task.ContentLength, - "total_piece_count", task.TotalPieceCount, - "state", task.FSM.Current(), - "ttl", task.TTL.Nanoseconds(), - "created_at", task.CreatedAt.Format(time.RFC3339), - "updated_at", task.UpdatedAt.Format(time.RFC3339), - ).SetVal(int64(1)) - mock.ExpectExpire( - pkgredis.MakePersistentCacheTaskKeyInScheduler(42, task.ID), - task.TTL-time.Since(task.CreatedAt), - ).SetVal(true) - mock.ExpectTxPipelineExec() - }, - expectedErr: false, - }, - { - name: "hset error", - args: args{ - task: NewTask( - "store-hset-error", - "", - "", - TaskStatePending, - 0, - 0, - 0, - 0, - time.Minute, - time.Now(), - time.Now(), - logger.WithTaskID("store-hset-error"), - ), - }, - mockRedis: func(mock redismock.ClientMock, task *Task) { - mock.ExpectTxPipeline() - mock.ExpectHSet( - pkgredis.MakePersistentCacheTaskKeyInScheduler(42, task.ID), - "id", task.ID, - "persistent_replica_count", task.PersistentReplicaCount, - "tag", task.Tag, - "application", task.Application, - "piece_length", task.PieceLength, - "content_length", task.ContentLength, - "total_piece_count", task.TotalPieceCount, - "state", task.FSM.Current(), - "ttl", task.TTL.Nanoseconds(), - "created_at", task.CreatedAt.Format(time.RFC3339), - "updated_at", task.UpdatedAt.Format(time.RFC3339), - ).SetErr(errors.New("hset error")) - }, - expectedErr: true, - }, - { - name: "expire error", - args: args{ - task: NewTask( - "store-expire-error", - "", - "", - TaskStatePending, - 0, - 0, - 0, - 0, - time.Minute, - time.Now(), - time.Now(), - logger.WithTaskID("store-expire-error"), - ), - }, - mockRedis: func(mock redismock.ClientMock, task *Task) { - mock.ExpectTxPipeline() - mock.ExpectHSet( - pkgredis.MakePersistentCacheTaskKeyInScheduler(42, task.ID), - "id", task.ID, - "persistent_replica_count", task.PersistentReplicaCount, - "tag", task.Tag, - "application", task.Application, - "piece_length", task.PieceLength, - "content_length", task.ContentLength, - "total_piece_count", task.TotalPieceCount, - "state", task.FSM.Current(), - "ttl", task.TTL.Nanoseconds(), - "created_at", task.CreatedAt.Format(time.RFC3339), - "updated_at", task.UpdatedAt.Format(time.RFC3339), - ).SetVal(int64(1)) - mock.ExpectExpire( - pkgredis.MakePersistentCacheTaskKeyInScheduler(42, task.ID), - task.TTL-time.Since(task.CreatedAt), - ).SetErr(errors.New("expire error")) - }, - expectedErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - rdb, mock := redismock.NewClientMock() - tt.mockRedis(mock, tt.args.task) - - tm := &taskManager{ - config: &config.Config{Manager: config.ManagerConfig{SchedulerClusterID: 42}}, - rdb: rdb, - } - err := tm.Store(context.Background(), tt.args.task) - assert.Equal(t, tt.expectedErr, err != nil, "error mismatch") - assert.NoError(t, mock.ExpectationsWereMet()) - }) - } -} - func TestTaskManager_LoadCorrentReplicaCount(t *testing.T) { type args struct { taskID string