Skip to content

Commit

Permalink
Merge pull request #116 from waggle-sensor/develop
Browse files Browse the repository at this point in the history
Fixed nil-memory error when launching plugin
  • Loading branch information
gemblerz authored Jan 26, 2024
2 parents cb08124 + f0873ef commit 5bb6814
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pkg/nodescheduler/resourcemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,15 +1243,15 @@ func (rm *ResourceManager) CleanUp() error {
func (rm *ResourceManager) LaunchAndWatchPlugin(pr *datatype.PluginRuntime) {
logger.Debug.Printf("Running plugin %q...", pr.Plugin.Name)
pod, err := rm.CreatePodTemplate(pr)
// we override the plugin name to distinguish the same plugin name from different jobs
if pr.Plugin.JobID != "" {
pod.SetName(fmt.Sprintf("%s-%s", pod.GetName(), pr.Plugin.JobID))
}
if err != nil {
logger.Error.Printf("Failed to create Kubernetes Pod for %q: %q", pr.Plugin.Name, err.Error())
rm.Notifier.Notify(datatype.NewEventBuilder(datatype.EventPluginStatusFailed).AddReason(err.Error()).AddPluginMeta(&pr.Plugin).Build())
return
}
// we override the plugin name to distinguish the same plugin name from different jobs
if pr.Plugin.JobID != "" {
pod.SetName(fmt.Sprintf("%s-%s", pod.GetName(), pr.Plugin.JobID))
}
err = rm.CreatePod(pod)
defer rm.TerminatePod(pod.Name)
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions pkg/nodescheduler/resourcemanager_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package nodescheduler

import (
"context"
"testing"
"time"

"github.com/waggle-sensor/edge-scheduler/pkg/datatype"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)

func TestEnvFromSecret(t *testing.T) {
Expand Down Expand Up @@ -56,3 +65,67 @@ func TestEnvFromSecret(t *testing.T) {
out, _ := yaml.Marshal(pluginSpec)
t.Logf("%s", out)
}

// TestFakeClient demonstrates how to use a fake client with SharedInformerFactory in tests.
func TestFakeClient(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

watcherStarted := make(chan struct{})
// Create the fake client.
client := fake.NewSimpleClientset()
// A catch-all watch reactor that allows us to inject the watcherStarted channel.
client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := client.Tracker().Watch(gvr, ns)
if err != nil {
return false, nil, err
}
close(watcherStarted)
return true, watch, nil
})

// We will create an informer that writes added pods to a channel.
pods := make(chan *v1.Pod, 1)
informers := informers.NewSharedInformerFactory(client, 0)
podInformer := informers.Core().V1().Pods().Informer()
podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
pods <- pod
},
})

// Make sure informers are running.
informers.Start(ctx.Done())

// This is not required in tests, but it serves as a proof-of-concept by
// ensuring that the informer goroutine have warmed up and called List before
// we send any events to it.
cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)

// The fake client doesn't support resource version. Any writes to the client
// after the informer's initial LIST and before the informer establishing the
// watcher will be missed by the informer. Therefore we wait until the watcher
// starts.
// Note that the fake client isn't designed to work with informer. It
// doesn't support resource version. It's encouraged to use a real client
// in an integration/E2E test if you need to test complex behavior with
// informer/controllers.
<-watcherStarted
// Inject an event into the fake client.
p := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "my-pod"}}
_, err := client.CoreV1().Pods("test-ns").Create(context.TODO(), p, metav1.CreateOptions{})
if err != nil {
t.Fatalf("error injecting pod add: %v", err)
}

select {
case pod := <-pods:
t.Logf("Got pod from channel: %s/%s", pod.Namespace, pod.Name)
case <-time.After(wait.ForeverTestTimeout):
t.Error("Informer did not get the added pod")
}
}

0 comments on commit 5bb6814

Please sign in to comment.