diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index f42551834..e7813cc86 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/kubernetes" v1 "github.com/akash-network/akash-api/go/inventory/v1" + types "github.com/akash-network/akash-api/go/node/types/v1beta3" "github.com/akash-network/provider/cluster/kube/builder" ctypes "github.com/akash-network/provider/cluster/types/v1beta3" @@ -408,20 +409,27 @@ func (dp *nodeDiscovery) monitor() error { currLabels = copyManagedLabels(knode.Labels) } - node, err := dp.initNodeInfo(gpusIDs) + node, err := dp.initNodeInfo(gpusIDs, knode) if err != nil { log.Error(err, "unable to init node info") return err } - restartWatcher := func() error { + restartPodsWatcher := func() error { + if podsWatch != nil { + select { + case <-podsWatch.ResultChan(): + default: + } + } + var terr error podsWatch, terr = kc.CoreV1().Pods(corev1.NamespaceAll).Watch(dp.ctx, metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("spec.nodeName", dp.name).String(), }) if terr != nil { - log.Error(terr, "unable to watch start pods") + log.Error(terr, "unable to start pods watcher") return terr } @@ -446,7 +454,7 @@ func (dp *nodeDiscovery) monitor() error { return nil } - err = restartWatcher() + err = restartPodsWatcher() if err != nil { return err } @@ -504,17 +512,25 @@ func (dp *nodeDiscovery) monitor() error { switch obj := evt.Object.(type) { case *corev1.Node: if obj.Name == dp.name { - knode = obj.DeepCopy() switch evt.Type { case watch.Modified: + if nodeAllocatableChanged(knode, obj, &node) { + podsWatch.Stop() + if err = restartPodsWatcher(); err != nil { + return err + } + } + signalLabels() } + + knode = obj.DeepCopy() } } case res, isopen := <-podsWatch.ResultChan(): if !isopen { podsWatch.Stop() - if err = restartWatcher(); err != nil { + if err = restartPodsWatcher(); err != nil { return err } @@ -596,17 +612,30 @@ func (dp *nodeDiscovery) monitor() error { } } -func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors) (v1.Node, error) { - kc := fromctx.MustKubeClientFromCtx(dp.ctx) +func nodeAllocatableChanged(prev *corev1.Node, curr *corev1.Node, node *v1.Node) bool { + changed := len(prev.Status.Allocatable) != len(curr.Status.Allocatable) - cpuInfo := dp.parseCPUInfo(dp.ctx) - gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIDs) + if !changed { + for pres, pval := range prev.Status.Allocatable { + cval, exists := curr.Status.Allocatable[pres] + if !exists || (pval.Value() != cval.Value()) { + changed = true + break + } + } + } - knode, err := kc.CoreV1().Nodes().Get(dp.ctx, dp.name, metav1.GetOptions{}) - if err != nil { - return v1.Node{}, fmt.Errorf("%w: error fetching node %s", err, dp.name) + if changed { + updateNodeInfo(curr, node) } + return changed +} + +func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors, knode *corev1.Node) (v1.Node, error) { + cpuInfo := dp.parseCPUInfo(dp.ctx) + gpuInfo := dp.parseGPUInfo(dp.ctx, gpusIDs) + res := v1.Node{ Name: knode.Name, Resources: v1.NodeResources{ @@ -628,31 +657,50 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors) (v1.Node, erro }, } + updateNodeInfo(knode, &res) + + return res, nil +} + +func updateNodeInfo(knode *corev1.Node, node *v1.Node) { for name, r := range knode.Status.Allocatable { switch name { case corev1.ResourceCPU: - res.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue()) + node.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue()) case corev1.ResourceMemory: - res.Resources.Memory.Quantity.Allocatable.Set(r.Value()) + node.Resources.Memory.Quantity.Allocatable.Set(r.Value()) case corev1.ResourceEphemeralStorage: - res.Resources.EphemeralStorage.Allocatable.Set(r.Value()) + node.Resources.EphemeralStorage.Allocatable.Set(r.Value()) case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - res.Resources.GPU.Quantity.Allocatable.Set(r.Value()) + node.Resources.GPU.Quantity.Allocatable.Set(r.Value()) } } - - return res, nil } +// // trimAllocated ensure allocated does not overrun allocatable +// // Deprecated to be replaced with function from akash-api after sdk-47 upgrade +// func trimAllocated(rp *v1.ResourcePair) { +// allocated := rp.Allocated.Value() +// allocatable := rp.Allocatable.Value() +// +// if allocated <= allocatable { +// return +// } +// +// allocated = allocatable +// +// rp.Allocated.Set(allocated) +// } + func nodeResetAllocated(node *v1.Node) { node.Resources.CPU.Quantity.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) node.Resources.GPU.Quantity.Allocated = resource.NewQuantity(0, resource.DecimalSI) - node.Resources.Memory.Quantity.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) - node.Resources.EphemeralStorage.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) - node.Resources.VolumesAttached.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) - node.Resources.VolumesMounted.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) + node.Resources.Memory.Quantity.Allocated = resource.NewQuantity(0, resource.DecimalSI) + node.Resources.EphemeralStorage.Allocated = resource.NewQuantity(0, resource.DecimalSI) + node.Resources.VolumesAttached.Allocated = resource.NewQuantity(0, resource.DecimalSI) + node.Resources.VolumesMounted.Allocated = resource.NewQuantity(0, resource.DecimalSI) } func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { @@ -669,6 +717,7 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { fallthrough case builder.ResourceGPUAMD: node.Resources.GPU.Quantity.Allocated.Add(quantity) + // GPU overcommit is not allowed, if that happens something is terribly wrong with the inventory } } @@ -686,17 +735,18 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { for _, container := range pod.Spec.Containers { for name, quantity := range container.Resources.Requests { + rv := types.NewResourceValue(uint64(quantity.Value())) switch name { case corev1.ResourceCPU: - node.Resources.CPU.Quantity.Allocated.Sub(quantity) + node.Resources.CPU.Quantity.SubMilliNLZ(rv) case corev1.ResourceMemory: - node.Resources.Memory.Quantity.Allocated.Sub(quantity) + node.Resources.Memory.Quantity.SubNLZ(rv) case corev1.ResourceEphemeralStorage: - node.Resources.EphemeralStorage.Allocated.Sub(quantity) + node.Resources.EphemeralStorage.SubNLZ(rv) case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Allocated.Sub(quantity) + node.Resources.GPU.Quantity.SubNLZ(rv) } } @@ -705,7 +755,9 @@ func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { continue } - node.Resources.Memory.Quantity.Allocated.Sub(*vol.EmptyDir.SizeLimit) + rv := types.NewResourceValue(uint64((*vol.EmptyDir.SizeLimit).Value())) + + node.Resources.Memory.Quantity.SubNLZ(rv) } } }