Skip to content

Commit

Permalink
Update nodescheduler.go
Browse files Browse the repository at this point in the history
  • Loading branch information
plebbyd authored Dec 11, 2024
1 parent 3ef6c6a commit e3d9108
Showing 1 changed file with 70 additions and 21 deletions.
91 changes: 70 additions & 21 deletions pkg/nodescheduler/nodescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"sync"
"time"

"github.com/shirou/gopsutil/cpu"

Check failure on line 11 in pkg/nodescheduler/nodescheduler.go

View workflow job for this annotation

GitHub Actions / CI/CD

no required module provides package github.com/shirou/gopsutil/cpu; to add it:
"github.com/shirou/gopsutil/mem"

Check failure on line 12 in pkg/nodescheduler/nodescheduler.go

View workflow job for this annotation

GitHub Actions / CI/CD

no required module provides package github.com/shirou/gopsutil/mem; to add it:

"github.com/looplab/fsm"
"github.com/waggle-sensor/edge-scheduler/pkg/datatype"
"github.com/waggle-sensor/edge-scheduler/pkg/interfacing"
Expand Down Expand Up @@ -40,6 +43,49 @@ type NodeScheduler struct {
chanNeedScheduling chan datatype.Event
}


func (ns *NodeScheduler) checkResourceAvailability() error {
// Get number of CPU cores
cpuCount, err := cpu.Counts(false) // logical CPU count
if err != nil {
return fmt.Errorf("failed to get CPU count: %v", err)
}

// Get per-CPU usage
perCPUPercent, err := cpu.Percent(time.Second, true) // true for per-CPU percentages
if err != nil {
return fmt.Errorf("failed to get CPU usage: %v", err)
}

// Count how many cores are effectively "used up" (over 90% usage)
usedCores := 0
for _, cpuPercent := range perCPUPercent {
if cpuPercent > 90.0 { // Consider a core "used" if it's over 90% utilized
usedCores++
}
}

availableCores := cpuCount - usedCores
if availableCores < 1 { // Hard-coded to keep 1 core available
return fmt.Errorf("insufficient CPU cores available: %d cores free (minimum required: 1 core)",
availableCores)
}

// Check memory usage
vmStat, err := mem.VirtualMemory()
if err != nil {
return fmt.Errorf("failed to get memory usage: %v", err)
}

availableMemoryMB := vmStat.Available / 1024 / 1024 // Convert bytes to MB
if availableMemoryMB < 128 { // Hard-coded to require 128MB free
return fmt.Errorf("insufficient memory available: %d MB (minimum required: 1024 MB)",
availableMemoryMB)
}

return nil
}

// Configure sets up the followings in Kubernetes cluster
//
// - "ses" namespace
Expand Down Expand Up @@ -113,35 +159,38 @@ func (ns *NodeScheduler) Run() {
logger.Debug.Printf("Science rule %q is valid", r)
switch r.ActionType {
case datatype.ScienceRuleActionSchedule:
// TODO: We will need to find a way to pass parameters to the plugin
// For example, schedule(plugin-a, duration=5m) <<
pluginName := r.ActionObject
if pr := ns.GoalManager.GetPluginRuntime(PluginIndex{
name: pluginName,
jobID: sg.JobID,
name: pluginName,
jobID: sg.JobID,
goalID: sg.ID,
}); pr == nil {
logger.Error.Printf("failed to promote plugin: plugin name %q for goal %q not registered", pluginName, goalID)
// TODO: we may want to verify what exist and why this happens
} else if !pr.Status.Is(string(datatype.Inactive)) {
logger.Debug.Printf("plugin %q is already active. no need to activate it", pr.Plugin.Name)
} else if err := pr.Queued(); err != nil {
logger.Error.Printf("plugin %q failed to transition from %s to %s: %s", pr.Plugin.Name, pr.Status.Current(), datatype.Queued, err.Error())
} else {
pr.UpdateWithScienceRule(r)
// TODO: We disable the plugin controller until we actually use it.
// This causes problems of Pods not finishing and hanging in StartError
// pr.SetPluginController(true)
pr.GeneratePodInstance()
msg := datatype.NewSchedulerEventBuilder(datatype.EventPluginStatusQueued).
AddPluginRuntimeMeta(*pr).
AddPluginMeta(pr.Plugin).
AddReason(fmt.Sprintf("triggered by %s", r.Condition)).
Build().(datatype.SchedulerEvent)
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(msg.ToWaggleMessage(), "all")
ns.readyQueue.Push(pr)
triggerScheduling = true
logger.Info.Printf("Plugin %s is queued by %s", pr.Plugin.Name, r.Condition)
// Check resource availability before scheduling
if err := ns.checkResourceAvailability(); err != nil {
logger.Error.Printf("insufficient resources to schedule plugin %q: %v", pr.Plugin.Name, err)
continue
}

if err := pr.Queued(); err != nil {
logger.Error.Printf("plugin %q failed to transition from %s to %s: %s",
pr.Plugin.Name, pr.Status.Current(), datatype.Queued, err.Error())
} else {
pr.UpdateWithScienceRule(r)
pr.GeneratePodInstance()
msg := datatype.NewSchedulerEventBuilder(datatype.EventPluginStatusQueued).
AddPluginRuntimeMeta(*pr).
AddPluginMeta(pr.Plugin).
AddReason(fmt.Sprintf("triggered by %s", r.Condition)).
Build().(datatype.SchedulerEvent)
ns.LogToBeehive.SendWaggleMessageOnNodeAsync(msg.ToWaggleMessage(), "all")
ns.readyQueue.Push(pr)
triggerScheduling = true
logger.Info.Printf("Plugin %s is queued by %s", pr.Plugin.Name, r.Condition)
}
}
case datatype.ScienceRuleActionPublish:
eventName := r.ActionObject
Expand Down

0 comments on commit e3d9108

Please sign in to comment.