diff --git a/pkg/fugaci_test/provider_e2e_helpers_test.go b/pkg/fugaci_test/provider_e2e_helpers_test.go index 2d0b180..d2f9f0c 100644 --- a/pkg/fugaci_test/provider_e2e_helpers_test.go +++ b/pkg/fugaci_test/provider_e2e_helpers_test.go @@ -17,13 +17,14 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" + "net/http" "os" "path/filepath" "testing" "time" ) -const testNamespace = "jenkins" +const testNamespace = "default" func createPodFromYAML(clientset *kubernetes.Clientset, fileName string) (*v1.Pod, error) { podYAML, err := os.ReadFile(filepath.Join("testdata", fileName)) @@ -126,14 +127,27 @@ func execCommandInPod(t *testing.T, clientset *kubernetes.Clientset, config *res exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) require.NoError(t, err, "error creating executor") var stdout, stderr bytes.Buffer - - err = exec.Stream(remotecommand.StreamOptions{ + ctx := context.Background() + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdout: &stdout, Stderr: &stderr, }) require.NoError(t, err, "error executing command: %v", command) return stdout.String(), stderr.String() } +func attachStreamToPod(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, namespace, podName string, + attachOptions *v1.PodAttachOptions, ctx context.Context, streamOptions remotecommand.StreamOptions) error { + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Namespace(namespace). + Name(podName). + SubResource("attach"). + VersionedParams(attachOptions, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL()) + require.NoError(t, err) + return exec.StreamWithContext(ctx, streamOptions) +} func generateRandomName(baseName string) string { b := make([]byte, 4) // 4 bytes will give us 8 hex characters diff --git a/pkg/fugaci_test/provider_e2e_test.go b/pkg/fugaci_test/provider_e2e_test.go index 2bf4189..170349c 100644 --- a/pkg/fugaci_test/provider_e2e_test.go +++ b/pkg/fugaci_test/provider_e2e_test.go @@ -10,11 +10,17 @@ package fugaci_test */ import ( + "bufio" + "bytes" "context" "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "io" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "os" + "strings" "testing" "time" @@ -33,7 +39,11 @@ type PodTestCase struct { } func TestProviderE2E(t *testing.T) { - config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) + kubeconfigPath := os.Getenv("KUBECONFIG") + if kubeconfigPath == "" { + kubeconfigPath = clientcmd.RecommendedHomeFile + } + config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { t.Fatalf("Failed to build kubeconfig: %v", err) } @@ -81,7 +91,7 @@ func TestProviderE2E(t *testing.T) { assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) { p := pods[0] logs := getContainerLogs(t, clientset, testNamespace, p.Name, "curie3") - assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.6") + assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7") assert.Contains(t, logs, "action=pulling") }, }, @@ -108,7 +118,7 @@ func TestProviderE2E(t *testing.T) { assert.Contains(t, logs, "spec.image=ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.6") assert.Contains(t, logs, "action=pulling") - assert.Contains(t, logs, "imageID=sha256:08bbe35549962a2aef9e79631f93c43a245aa662674cb88298be518aabbaed32") + assert.Contains(t, logs, "imageID=sha256:5e21ef1cd7e667ba8581f2df7bb292b7db23bc62df7137d3a1fa5790a57d3260") assert.Contains(t, logs, "state=created") assert.Contains(t, logs, "state=SSHReady") assert.Contains(t, logs, "action=stop") @@ -133,6 +143,100 @@ func TestProviderE2E(t *testing.T) { assert.Empty(t, stderr, "Stderr should be empty") }, }, + { + name: "TestAttachWithStdoutAndStderr", + podFiles: []string{"pod6-attach-stdout.yaml"}, + postCreate: waitForPodConditionReady, + assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) { + pod := pods[0] + ctx, cancel := context.WithCancel(context.Background()) + stdoutReader, stdoutWriter := io.Pipe() + defer stdoutWriter.Close() + outputChan := make(chan string, 100) + go func() { + defer close(outputChan) + scanner := bufio.NewScanner(stdoutReader) + for scanner.Scan() { + line := scanner.Text() + outputChan <- line + if strings.Contains(line, "counter-5") { + cancel() // Cancel the context once the line is found + return + } + } + if err := scanner.Err(); err != nil { + t.Errorf("Error scanning output: %v", err) + } + }() + // This blocks until stream is completed - it's when context is cancelled + err = attachStreamToPod(t, clientset, config, testNamespace, pod.Name, + &v1.PodAttachOptions{ + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, + ctx, + remotecommand.StreamOptions{ + Stdout: stdoutWriter, + Stderr: stdoutWriter, + Tty: false, + }) + // Collect and validate output + var capturedOutput []string + for line := range outputChan { + capturedOutput = append(capturedOutput, line) + } + + // Assertions for stdout + assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-1") + assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-2") + assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-3") + assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-4") + assert.Contains(t, strings.Join(capturedOutput, "\n"), "counter-5") + }, + }, + { + name: "TestAttachWithStdinAndTTY", + podFiles: []string{"pod7-attach-stdin-auto.yaml"}, + postCreate: waitForPodConditionReady, + assertions: func(t *testing.T, clientset *kubernetes.Clientset, config *rest.Config, pods []*v1.Pod) { + pod := pods[0] + // Use io.Pipe for stdin to control when to close + stdinReader, stdinWriter := io.Pipe() + stdout := &bytes.Buffer{} + ctx := context.Background() + + // Prepare input + input := "Hello, pod!" + go func() { + defer stdinWriter.Close() + _, err := stdinWriter.Write([]byte(input)) + require.NoError(t, err) + time.Sleep(200 * time.Millisecond) + // Close stdinWriter to signal EOF to the remote process + }() + + // This blocks until stream is completed. In this case when stdin is closed + err = attachStreamToPod(t, clientset, config, testNamespace, pod.Name, + &v1.PodAttachOptions{ + Stdin: true, + Stdout: true, + Stderr: false, // Must be false when TTY is true + TTY: true, + }, + ctx, + remotecommand.StreamOptions{ + Stdin: stdinReader, + Stdout: stdout, + Tty: true, + }) + require.NoError(t, err) + + output := stdout.String() + assert.Contains(t, output, input, "The output should contain the input sent via stdin") + }, + }, //{ // // TODO: Figure out a way to delete test image // name: "TestPodPullStrategy_Always", diff --git a/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml b/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml index a87af66..3131e4e 100644 --- a/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml +++ b/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml @@ -6,7 +6,7 @@ spec: containers: - image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7 imagePullPolicy: Never - command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "$i"; sleep 0.1; done' ] + command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "counter-$i"; sleep 0.1; done' ] name: curie3 envFrom: - secretRef: diff --git a/pkg/fugaci_test/testdata/pod7-attach-stdin-auto.yaml b/pkg/fugaci_test/testdata/pod7-attach-stdin-auto.yaml new file mode 100644 index 0000000..436cad6 --- /dev/null +++ b/pkg/fugaci_test/testdata/pod7-attach-stdin-auto.yaml @@ -0,0 +1,22 @@ +apiVersion: v1 +kind: Pod +metadata: + name: testpod1 +spec: + containers: + - image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7 + imagePullPolicy: Never + command: ["sh", "-c", "cat"] + name: curie3 + stdin: true + tty: true + envFrom: + - secretRef: + name: fugaci-ssh-secret + nodeSelector: + kubernetes.io/os: darwin + tolerations: + - key: "fugaci.macvm.io" + operator: "Equal" + value: "true" + effect: "NoSchedule" \ No newline at end of file diff --git a/pkg/streams/streams.go b/pkg/streams/streams.go index dc1961a..f355af7 100644 --- a/pkg/streams/streams.go +++ b/pkg/streams/streams.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/macvmio/fugaci/pkg/ctxio" + "github.com/virtual-kubelet/virtual-kubelet/errdefs" "github.com/virtual-kubelet/virtual-kubelet/node/api" "golang.org/x/sync/errgroup" "io" @@ -109,6 +110,12 @@ func (f *FilesBasedStreams) Close() error { } func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, loggerPrintf func(format string, v ...any)) error { + if attach.Stdin() != nil && f.stdinReader == nil { + return errdefs.InvalidInput("stdin streaming is disabled") + } + if attach.TTY() && !f.allocateTTY { + return errdefs.InvalidInput("TTY is disabled") + } // Create an errgroup with the provided context eg, ctx := errgroup.WithContext(ctx) @@ -142,6 +149,12 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log // TODO: This blocks until if stdin has no data, even if context is cancelled _, err := io.Copy(f.stdinWriter, attach.Stdin()) loggerPrintf("stdin copy completed: %v", err) + eg.Go(func() error { + if err == nil { + return io.EOF + } + return err + }) }() } @@ -172,6 +185,10 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log loggerPrintf("waiting for Stream() to finish") err := eg.Wait() + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + loggerPrintf("Stream() ignoring expected error: %v", err) + err = nil + } loggerPrintf("Stream() has completed") return err } diff --git a/pkg/streams/streams_test.go b/pkg/streams/streams_test.go index b661cd7..57a63a8 100644 --- a/pkg/streams/streams_test.go +++ b/pkg/streams/streams_test.go @@ -3,8 +3,8 @@ package streams import ( "bytes" "context" - "errors" "fmt" + "github.com/stretchr/testify/require" "io" "os" "strings" @@ -50,93 +50,145 @@ func TestNewFilesBasedStreams(t *testing.T) { } func TestStreamStdout(t *testing.T) { - fbs := setupFilesBasedStreams(t, false, false) - defer teardownFilesBasedStreams(t, fbs) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Prepare mock attachIO - stdoutBuf := &bytes.Buffer{} - attachIO := &MockAttachIO{ - stdout: stdoutBuf, + tests := []struct { + name string + writeData bool + expectedOutput string + }{ + { + name: "WithData", + writeData: true, + expectedOutput: "Hello, stdout!", + }, + { + name: "WithoutData", + writeData: false, + expectedOutput: "", + }, } - // Write data to stdoutFile - expectedOutput := "Hello, stdout!" - _, err := fbs.stdoutFile.WriteString(expectedOutput + "\n") - if err != nil { - t.Fatalf("Failed to write to stdoutFile: %v", err) - } - - // Start streaming - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := fbs.Stream(ctx, attachIO, t.Logf) - if !errors.Is(err, context.Canceled) { - t.Errorf("Stream returned error: %v", err) - } - }() - - // Give some time for the data to be streamed - time.Sleep(25 * time.Millisecond) - - // Cancel context to stop streaming - cancel() - wg.Wait() - - // Verify that data was received - output := stdoutBuf.String() - if !strings.Contains(output, expectedOutput) { - t.Errorf("Expected output %q in stdout, got %q", expectedOutput, output) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fbs := setupFilesBasedStreams(t, false, false) + defer teardownFilesBasedStreams(t, fbs) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Prepare mock attachIO + stdoutBuf := &bytes.Buffer{} + attachIO := &MockAttachIO{ + stdout: stdoutBuf, + } + + if tt.writeData { + // Write data to stdoutFile if applicable + _, err := fbs.stdoutFile.WriteString(tt.expectedOutput + "\n") + if err != nil { + t.Fatalf("Failed to write to stdoutFile: %v", err) + } + } + + // Start streaming + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := fbs.Stream(ctx, attachIO, t.Logf) + require.NoError(t, err) + }() + + // Give some time for the data to be streamed + time.Sleep(25 * time.Millisecond) + + // Cancel context to stop streaming + cancel() + wg.Wait() + + // Verify the received data + output := stdoutBuf.String() + if tt.writeData { + if !strings.Contains(output, tt.expectedOutput) { + t.Errorf("Expected output %q in stdout, got %q", tt.expectedOutput, output) + } + } else { + if output != "" { + t.Errorf("Expected no output in stdout, got %q", output) + } + } + }) } } func TestStreamStderr(t *testing.T) { - fbs := setupFilesBasedStreams(t, false, false) - defer teardownFilesBasedStreams(t, fbs) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Prepare mock attachIO - stderrBuf := &bytes.Buffer{} - attachIO := &MockAttachIO{ - stderr: stderrBuf, - } - // Write data to stderrFile - expectedOutput := "Hello, stderr!" - _, err := fbs.stderrFile.WriteString(expectedOutput + "\n") - if err != nil { - t.Fatalf("Failed to write to stderrFile: %v", err) + tests := []struct { + name string + writeData bool + expectedOutput string + }{ + { + name: "WithData", + writeData: true, + expectedOutput: "Hello, stderr!", + }, + { + name: "WithoutData", + writeData: false, + expectedOutput: "", + }, } - // Start streaming - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := fbs.Stream(ctx, attachIO, t.Logf) - if !errors.Is(err, context.Canceled) { - t.Errorf("Stream returned error: %v", err) - } - }() - - // Give some time for the data to be streamed - time.Sleep(25 * time.Millisecond) - - // Cancel context to stop streaming - cancel() - wg.Wait() - - // Verify that data was received - output := stderrBuf.String() - if !strings.Contains(output, expectedOutput) { - t.Errorf("Expected output %q in stderr, got %q", expectedOutput, output) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fbs := setupFilesBasedStreams(t, false, false) + defer teardownFilesBasedStreams(t, fbs) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Prepare mock attachIO + stderrBuf := &bytes.Buffer{} + attachIO := &MockAttachIO{ + stderr: stderrBuf, + } + + if tt.writeData { + // Write data to stderrFile if applicable + _, err := fbs.stderrFile.WriteString(tt.expectedOutput + "\n") + if err != nil { + t.Fatalf("Failed to write to stderrFile: %v", err) + } + } + + // Start streaming + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := fbs.Stream(ctx, attachIO, t.Logf) + require.NoError(t, err) + }() + + // Give some time for the data to be streamed + time.Sleep(25 * time.Millisecond) + + // Cancel context to stop streaming + cancel() + wg.Wait() + + // Verify the received data + output := stderrBuf.String() + if tt.writeData { + if !strings.Contains(output, tt.expectedOutput) { + t.Errorf("Expected output %q in stderr, got %q", tt.expectedOutput, output) + } + } else { + if output != "" { + t.Errorf("Expected no output in stderr, got %q", output) + } + } + }) } - } func TestStreamStdin(t *testing.T) { @@ -159,9 +211,7 @@ func TestStreamStdin(t *testing.T) { go func() { defer wg.Done() err := fbs.Stream(ctx, attachIO, t.Logf) - if !errors.Is(err, context.Canceled) { - t.Errorf("Stream returned error: %v", err) - } + require.NoError(t, err) }() // Read data from stdinReader @@ -199,9 +249,7 @@ func TestContextCancellation(t *testing.T) { go func() { defer close(doneCh) err := fbs.Stream(ctx, attachIO, t.Logf) - if !errors.Is(err, context.Canceled) { - t.Errorf("Stream returned error: %v", err) - } + require.NoError(t, err) }() // Cancel context after a short delay @@ -217,16 +265,9 @@ func TestContextCancellation(t *testing.T) { } } -func TestClose(t *testing.T) { +func TestCloseWithoutAnyStreamCalls(t *testing.T) { fbs := setupFilesBasedStreams(t, true, false) - // Start some operation to ensure cleanup is necessary - fbs.cleanupWG.Add(1) - go func() { - defer fbs.cleanupWG.Done() - time.Sleep(25 * time.Millisecond) - }() - err := fbs.Close() if err != nil { t.Errorf("Close returned error: %v", err) @@ -262,9 +303,7 @@ func TestTTYResizeEvents(t *testing.T) { go func() { defer wg.Done() err := fbs.Stream(ctx, attachIO, t.Logf) - if !errors.Is(err, context.Canceled) { - t.Errorf("Stream returned error: %v", err) - } + require.NoError(t, err) }() // Send a resize event @@ -372,6 +411,59 @@ func TestStreamStderrError(t *testing.T) { } } +func TestStreamStdinDisabled(t *testing.T) { + // Create FilesBasedStreams with allocateStdin set to false (stdin streaming disabled) + fbs := setupFilesBasedStreams(t, false, false) // allocateStdin = false, allocateTTY = false + defer teardownFilesBasedStreams(t, fbs) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Prepare mock attachIO with stdin provided + stdinBuf := bytes.NewBufferString("Test input data") + attachIO := &MockAttachIO{ + stdin: stdinBuf, + } + + // Attempt to start streaming + err := fbs.Stream(ctx, attachIO, t.Logf) + + // Check for the expected error + if err == nil { + t.Error("Expected error when stdin streaming is disabled, but got nil") + } else if !strings.Contains(err.Error(), "stdin streaming is disabled") { + t.Errorf("Expected error 'stdin streaming is disabled', but got: %v", err) + } else { + t.Logf("Received expected error: %v", err) + } +} + +func TestStreamTTYDisabled(t *testing.T) { + // Create FilesBasedStreams with allocateTTY set to false (TTY is disabled) + fbs := setupFilesBasedStreams(t, false, false) // allocateStdin = false, allocateTTY = false + defer teardownFilesBasedStreams(t, fbs) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Prepare mock attachIO with TTY set to true + attachIO := &MockAttachIO{ + tty: true, + } + + // Attempt to start streaming + err := fbs.Stream(ctx, attachIO, t.Logf) + + // Check for the expected error + if err == nil { + t.Error("Expected error when TTY is disabled, but got nil") + } else if !strings.Contains(err.Error(), "TTY is disabled") { + t.Errorf("Expected error 'TTY is disabled', but got: %v", err) + } else { + t.Logf("Received expected error: %v", err) + } +} + // MockAttachIO implements api.AttachIO for testing purposes type MockAttachIO struct { stdin io.Reader