Skip to content

Commit

Permalink
feat: replace Pipeliner with lua (#3846)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 18, 2025
1 parent 522074b commit 832bd52
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 1,292 deletions.
325 changes: 243 additions & 82 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
}

// Set boolean fields from raw host.
diableShared, err := strconv.ParseBool(rawHost["disable_shared"])
disableShared, err := strconv.ParseBool(rawHost["disable_shared"])
if err != nil {
log.Errorf("parsing disable shared failed: %v", err)
return nil, false
Expand Down Expand Up @@ -435,7 +435,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
int32(port),
int32(downloadPort),
uint64(schedulerClusterID),
diableShared,
disableShared,
pkgtypes.ParseHostType(rawHost["type"]),
cpu,
memory,
Expand All @@ -451,76 +451,212 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {

// Store sets host.
func (h *hostManager) Store(ctx context.Context, host *Host) error {
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval.Nanoseconds(),
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
return err
}

if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil {
return err
}

return nil
}); err != nil {
// Define the Lua script as a string.
const storeHostScript = `
-- Extract keys and arguments
local host_key = KEYS[1] -- Key for the host hash
local hosts_set_key = KEYS[2] -- Key for the set of hosts
-- Extract host fields from arguments
local host_id = ARGV[1]
local host_type = ARGV[2]
local hostname = ARGV[3]
local ip = ARGV[4]
local port = ARGV[5]
local download_port = ARGV[6]
local disable_shared = tonumber(ARGV[7])
local os = ARGV[8]
local platform = ARGV[9]
local platform_family = ARGV[10]
local platform_version = ARGV[11]
local kernel_version = ARGV[12]
local cpu_logical_count = ARGV[13]
local cpu_physical_count = ARGV[14]
local cpu_percent = ARGV[15]
local cpu_process_percent = ARGV[16]
local cpu_times_user = ARGV[17]
local cpu_times_system = ARGV[18]
local cpu_times_idle = ARGV[19]
local cpu_times_nice = ARGV[20]
local cpu_times_iowait = ARGV[21]
local cpu_times_irq = ARGV[22]
local cpu_times_softirq = ARGV[23]
local cpu_times_steal = ARGV[24]
local cpu_times_guest = ARGV[25]
local cpu_times_guest_nice = ARGV[26]
local memory_total = ARGV[27]
local memory_available = ARGV[28]
local memory_used = ARGV[29]
local memory_used_percent = ARGV[30]
local memory_process_used_percent = ARGV[31]
local memory_free = ARGV[32]
local network_tcp_connection_count = ARGV[33]
local network_upload_tcp_connection_count = ARGV[34]
local network_location = ARGV[35]
local network_idc = ARGV[36]
local network_download_rate = ARGV[37]
local network_download_rate_limit = ARGV[38]
local network_upload_rate = ARGV[39]
local network_upload_rate_limit = ARGV[40]
local disk_total = ARGV[41]
local disk_free = ARGV[42]
local disk_used = ARGV[43]
local disk_used_percent = ARGV[44]
local disk_inodes_total = ARGV[45]
local disk_inodes_used = ARGV[46]
local disk_inodes_free = ARGV[47]
local disk_inodes_used_percent = ARGV[48]
local disk_write_bandwidth = ARGV[49]
local disk_read_bandwidth = ARGV[50]
local build_git_version = ARGV[51]
local build_git_commit = ARGV[52]
local build_go_version = ARGV[53]
local build_platform = ARGV[54]
local scheduler_cluster_id = ARGV[55]
local announce_interval = ARGV[56]
local created_at = ARGV[57]
local updated_at = ARGV[58]
-- Perform HSET operation
redis.call("HSET", host_key,
"id", host_id,
"type", host_type,
"hostname", hostname,
"ip", ip,
"port", port,
"download_port", download_port,
"disable_shared", disable_shared,
"os", os,
"platform", platform,
"platform_family", platform_family,
"platform_version", platform_version,
"kernel_version", kernel_version,
"cpu_logical_count", cpu_logical_count,
"cpu_physical_count", cpu_physical_count,
"cpu_percent", cpu_percent,
"cpu_processe_percent", cpu_process_percent,
"cpu_times_user", cpu_times_user,
"cpu_times_system", cpu_times_system,
"cpu_times_idle", cpu_times_idle,
"cpu_times_nice", cpu_times_nice,
"cpu_times_iowait", cpu_times_iowait,
"cpu_times_irq", cpu_times_irq,
"cpu_times_softirq", cpu_times_softirq,
"cpu_times_steal", cpu_times_steal,
"cpu_times_guest", cpu_times_guest,
"cpu_times_guest_nice", cpu_times_guest_nice,
"memory_total", memory_total,
"memory_available", memory_available,
"memory_used", memory_used,
"memory_used_percent", memory_used_percent,
"memory_processe_used_percent", memory_process_used_percent,
"memory_free", memory_free,
"network_tcp_connection_count", network_tcp_connection_count,
"network_upload_tcp_connection_count", network_upload_tcp_connection_count,
"network_location", network_location,
"network_idc", network_idc,
"network_download_rate", network_download_rate,
"network_download_rate_limit", network_download_rate_limit,
"network_upload_rate", network_upload_rate,
"network_upload_rate_limit", network_upload_rate_limit,
"disk_total", disk_total,
"disk_free", disk_free,
"disk_used", disk_used,
"disk_used_percent", disk_used_percent,
"disk_inodes_total", disk_inodes_total,
"disk_inodes_used", disk_inodes_used,
"disk_inodes_free", disk_inodes_free,
"disk_inodes_used_percent", disk_inodes_used_percent,
"disk_write_bandwidth", disk_write_bandwidth,
"disk_read_bandwidth", disk_read_bandwidth,
"build_git_version", build_git_version,
"build_git_commit", build_git_commit,
"build_go_version", build_go_version,
"build_platform", build_platform,
"scheduler_cluster_id", scheduler_cluster_id,
"announce_interval", announce_interval,
"created_at", created_at,
"updated_at", updated_at)
-- Perform SADD operation
redis.call("SADD", hosts_set_key, host_id)
return true
`

// Create a new Redis script.
script := redis.NewScript(storeHostScript)

// Prepare keys.
keys := []string{
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID),
}

// Prepare arguments.
args := []interface{}{
host.ID,
host.Type.Name(),
host.Hostname,
host.IP,
host.Port,
host.DownloadPort,
host.DisableShared,
host.OS,
host.Platform,
host.PlatformFamily,
host.PlatformVersion,
host.KernelVersion,
host.CPU.LogicalCount,
host.CPU.PhysicalCount,
host.CPU.Percent,
host.CPU.ProcessPercent,
host.CPU.Times.User,
host.CPU.Times.System,
host.CPU.Times.Idle,
host.CPU.Times.Nice,
host.CPU.Times.Iowait,
host.CPU.Times.Irq,
host.CPU.Times.Softirq,
host.CPU.Times.Steal,
host.CPU.Times.Guest,
host.CPU.Times.GuestNice,
host.Memory.Total,
host.Memory.Available,
host.Memory.Used,
host.Memory.UsedPercent,
host.Memory.ProcessUsedPercent,
host.Memory.Free,
host.Network.TCPConnectionCount,
host.Network.UploadTCPConnectionCount,
host.Network.Location,
host.Network.IDC,
host.Network.DownloadRate,
host.Network.DownloadRateLimit,
host.Network.UploadRate,
host.Network.UploadRateLimit,
host.Disk.Total,
host.Disk.Free,
host.Disk.Used,
host.Disk.UsedPercent,
host.Disk.InodesTotal,
host.Disk.InodesUsed,
host.Disk.InodesFree,
host.Disk.InodesUsedPercent,
host.Disk.WriteBandwidth,
host.Disk.ReadBandwidth,
host.Build.GitVersion,
host.Build.GitCommit,
host.Build.GoVersion,
host.Build.Platform,
host.SchedulerClusterID,
host.AnnounceInterval.Nanoseconds(),
host.CreatedAt.Format(time.RFC3339),
host.UpdatedAt.Format(time.RFC3339),
}

// Execute the script.
if err := script.Run(ctx, h.rdb, keys, args...).Err(); err != nil {
host.Log.Errorf("store host failed: %v", err)
return err
}
Expand All @@ -530,19 +666,44 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error {

// Delete deletes host by a key.
func (h *hostManager) Delete(ctx context.Context, hostID string) error {
// Define the Lua script as a string.
const deleteHostScript = `
-- Extract keys
local host_key = KEYS[1] -- Key for the host hash
local hosts_set_key = KEYS[2] -- Key for the set of hosts
-- Extract arguments
local host_id = ARGV[1]
-- Perform DEL operation to delete the host hash
redis.call("DEL", host_key)
-- Perform SREM operation to remove the host ID from the set
redis.call("SREM", hosts_set_key, host_id)
return true
`

log := logger.WithHostID(hostID)
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil {
return err
}
// Create a new Redis script.
script := redis.NewScript(deleteHostScript)

return nil
}); err != nil {
log.Errorf("store host failed: %v", err)
// Prepare keys.
keys := []string{
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID),
pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID),
}

// Prepare arguments.
args := []interface{}{
hostID,
}

// Execute the script.
err := script.Run(ctx, h.rdb, keys, args...).Err()
if err != nil {
log.Errorf("delete host failed: %v", err)
return err
}

Expand Down
Loading

0 comments on commit 832bd52

Please sign in to comment.