From 01b23b99dba50f171f8ef5220dd005b6d674815a Mon Sep 17 00:00:00 2001 From: Tomasz Jarosik Date: Wed, 27 Nov 2024 22:09:34 +0000 Subject: [PATCH] Improve stdin handling --- pkg/ctxio/polling.go | 41 ------- pkg/fugaci/vm.go | 8 +- .../testdata/pod6-attach-stdout.yaml | 2 +- .../testdata/pod7-attach-stdin.yaml | 22 ++++ pkg/sshrunner/sshrunner.go | 1 + pkg/streams/streams.go | 107 +++++++++--------- 6 files changed, 82 insertions(+), 99 deletions(-) delete mode 100644 pkg/ctxio/polling.go create mode 100644 pkg/fugaci_test/testdata/pod7-attach-stdin.yaml diff --git a/pkg/ctxio/polling.go b/pkg/ctxio/polling.go deleted file mode 100644 index 373707c..0000000 --- a/pkg/ctxio/polling.go +++ /dev/null @@ -1,41 +0,0 @@ -package ctxio - -import ( - "context" - "io" - "time" -) - -// NewContextPeriodicReader Creates Reader that reads until context is cancelled, -// and resumes after 'period' if underlying reader returns EOF -func NewContextPeriodicReader(ctx context.Context, period time.Duration, r io.Reader) *ContextReader { - return &ContextReader{ - ctx: ctx, - reader: r, - period: period, - } -} - -// ContextReader is a custom reader that checks for context cancellation -// and handles EOF without stopping the copy operation. -type ContextReader struct { - ctx context.Context - reader io.Reader - period time.Duration -} - -func (cr *ContextReader) Read(p []byte) (int, error) { - select { - case <-cr.ctx.Done(): - // Context has been canceled - return 0, cr.ctx.Err() - default: - n, err := cr.reader.Read(p) - if err == io.EOF { - // EOF encountered, but we want to keep reading - time.Sleep(cr.period) - return 0, nil // Return no data, no error to continue io.Copy - } - return n, err - } -} diff --git a/pkg/fugaci/vm.go b/pkg/fugaci/vm.go index 409bb75..d3208f2 100644 --- a/pkg/fugaci/vm.go +++ b/pkg/fugaci/vm.go @@ -106,17 +106,19 @@ func NewVM(ctx context.Context, cst.Name = pod.Spec.Containers[containerIndex].Name cst.State = v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Creating", Message: "Just initialized"}} + cspec := pod.Spec.Containers[containerIndex] + customLogger := log.New(os.Stdout, fmt.Sprintf("pod=%s/%s, ", pod.Namespace, pod.Name), log.LstdFlags|log.Lmsgprefix|log.Lshortfile) - envVars, username, password, err := extractSSHEnvVars(pod.Spec.Containers[containerIndex]) + envVars, username, password, err := extractSSHEnvVars(cspec) if err != nil { return nil, fmt.Errorf("failed to extract ssh env vars: %w", err) } - fileStreams, err := streams.NewFilesBasedStreams(containerLogsDirectory, - fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, cst.Name)) + lognamePrefix := fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, cst.Name) + fileStreams, err := streams.NewFilesBasedStreams(containerLogsDirectory, lognamePrefix, cspec.Stdin, cspec.TTY) if err != nil { return nil, fmt.Errorf("failed to create temporary files based streams: %w", err) } diff --git a/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml b/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml index 8137100..a87af66 100644 --- a/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml +++ b/pkg/fugaci_test/testdata/pod6-attach-stdout.yaml @@ -6,7 +6,7 @@ spec: containers: - image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7 imagePullPolicy: Never - command: [ "bash", "-c", 'while true; do echo "Current time: $(date)"; sleep 0.1; done' ] + command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "$i"; sleep 0.1; done' ] name: curie3 envFrom: - secretRef: diff --git a/pkg/fugaci_test/testdata/pod7-attach-stdin.yaml b/pkg/fugaci_test/testdata/pod7-attach-stdin.yaml new file mode 100644 index 0000000..e17ebc7 --- /dev/null +++ b/pkg/fugaci_test/testdata/pod7-attach-stdin.yaml @@ -0,0 +1,22 @@ +apiVersion: v1 +kind: Pod +metadata: + name: testpod1 +spec: + containers: + - image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7 + imagePullPolicy: Never + command: [ "bash" ] + name: curie3 + stdin: true + tty: true + envFrom: + - secretRef: + name: fugaci-ssh-secret + nodeSelector: + kubernetes.io/os: darwin + tolerations: + - key: "fugaci.macvm.io" + operator: "Equal" + value: "true" + effect: "NoSchedule" \ No newline at end of file diff --git a/pkg/sshrunner/sshrunner.go b/pkg/sshrunner/sshrunner.go index 3b176cc..d53f6a9 100644 --- a/pkg/sshrunner/sshrunner.go +++ b/pkg/sshrunner/sshrunner.go @@ -33,6 +33,7 @@ func handleSSHWindowResize(session *ssh.Session, resize <-chan api.TermSize) { // Send the window change request to the SSH session with the new terminal size if err := session.WindowChange(int(termSize.Height), int(termSize.Width)); err != nil { log.Printf("failed to change window size: %v\n", err) + break } } log.Printf("window resize terminated") diff --git a/pkg/streams/streams.go b/pkg/streams/streams.go index 0f44844..4947536 100644 --- a/pkg/streams/streams.go +++ b/pkg/streams/streams.go @@ -4,55 +4,58 @@ import ( "context" "errors" "fmt" + "github.com/macvmio/fugaci/pkg/ctxio" + "github.com/virtual-kubelet/virtual-kubelet/node/api" "io" "os" "sync" - "time" - - "github.com/macvmio/fugaci/pkg/ctxio" - "github.com/virtual-kubelet/virtual-kubelet/node/api" ) var _ api.AttachIO = (*FilesBasedStreams)(nil) type FilesBasedStreams struct { - stdinFile *os.File stdoutFile *os.File stderrFile *os.File + stdinReader *os.File + stdinWriter *os.File + + allocateTTY bool + termSizeCh chan api.TermSize + mu sync.Mutex cleanupWG sync.WaitGroup cleanOnce sync.Once } -func NewFilesBasedStreams(directory, prefix string) (*FilesBasedStreams, error) { - stdinFile, err := os.CreateTemp(directory, prefix+"_vm_stdin_*.log") - if err != nil { - return nil, fmt.Errorf("error creating temporary stdin file: %v", err) +func NewFilesBasedStreams(directory, prefix string, allocateStdin, allocateTTY bool) (*FilesBasedStreams, error) { + var err error + f := FilesBasedStreams{allocateTTY: allocateTTY} + if allocateTTY { + f.termSizeCh = make(chan api.TermSize) } - - stdoutFile, err := os.CreateTemp(directory, prefix+"_vm_stdout_*.log") + f.stdoutFile, err = os.CreateTemp(directory, prefix+"_vm_stdout_*.log") if err != nil { - stdinFile.Close() return nil, fmt.Errorf("error creating temporary stdout file: %v", err) } - stderrFile, err := os.CreateTemp(directory, prefix+"_vm_stderr_*.log") + f.stderrFile, err = os.CreateTemp(directory, prefix+"_vm_stderr_*.log") if err != nil { - stdinFile.Close() - stdoutFile.Close() + f.stdoutFile.Close() return nil, fmt.Errorf("error creating temporary stderr file: %v", err) } - return &FilesBasedStreams{ - stdinFile: stdinFile, - stdoutFile: stdoutFile, - stderrFile: stderrFile, - }, nil + if allocateStdin { + f.stdinReader, f.stdinWriter, err = os.Pipe() + if err != nil { + return nil, fmt.Errorf("error creating stdin pipe: %v", err) + } + } + return &f, nil } func (f *FilesBasedStreams) Stdin() io.Reader { - return f.stdinFile + return f.stdinReader } func (f *FilesBasedStreams) Stdout() io.WriteCloser { @@ -64,12 +67,11 @@ func (f *FilesBasedStreams) Stderr() io.WriteCloser { } func (f *FilesBasedStreams) TTY() bool { - return false + return f.allocateTTY } func (f *FilesBasedStreams) Resize() <-chan api.TermSize { - // TODO: implement if needed - return nil + return f.termSizeCh } // Cleanup removes the temporary files created for stdin, stdout, and stderr. @@ -84,37 +86,24 @@ func (f *FilesBasedStreams) Cleanup() error { // Wait for any ongoing operations to finish f.cleanupWG.Wait() - // Close and remove stdinFile - if f.stdinFile != nil { - if err := f.stdinFile.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close stdin file: %w", err)) - } - if err := os.Remove(f.stdinFile.Name()); err != nil { - errs = append(errs, fmt.Errorf("failed to remove stdin file: %w", err)) - } - f.stdinFile = nil + if err := f.stdoutFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close stdout file: %w", err)) + } + if err := os.Remove(f.stdoutFile.Name()); err != nil { + errs = append(errs, fmt.Errorf("failed to remove stdout file: %w", err)) } - // Close and remove stdoutFile - if f.stdoutFile != nil { - if err := f.stdoutFile.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close stdout file: %w", err)) - } - if err := os.Remove(f.stdoutFile.Name()); err != nil { - errs = append(errs, fmt.Errorf("failed to remove stdout file: %w", err)) - } - f.stdoutFile = nil + if err := f.stderrFile.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close stderr file: %w", err)) + } + if err := os.Remove(f.stderrFile.Name()); err != nil { + errs = append(errs, fmt.Errorf("failed to remove stderr file: %w", err)) } - // Close and remove stderrFile - if f.stderrFile != nil { - if err := f.stderrFile.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close stderr file: %w", err)) - } - if err := os.Remove(f.stderrFile.Name()); err != nil { - errs = append(errs, fmt.Errorf("failed to remove stderr file: %w", err)) - } - f.stderrFile = nil + // Close termSizeCh + if f.termSizeCh != nil { + close(f.termSizeCh) + f.termSizeCh = nil } }) @@ -154,20 +143,30 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log }() // Handle stdin - if attach.Stdin() != nil { + if f.stdinWriter != nil && attach.Stdin() != nil { f.cleanupWG.Add(1) go func() { defer f.cleanupWG.Done() - _, err := io.Copy(f.stdinFile, ctxio.NewContextPeriodicReader(ctx, 200*time.Millisecond, attach.Stdin())) + _, err := io.Copy(f.stdinWriter, attach.Stdin()) if !allowableError(err) { loggerPrintf("Error streaming stdin: %v", err) } }() } + if attach.TTY() { + f.cleanupWG.Add(1) + go func() { + defer f.cleanupWG.Done() + for termSize := range attach.Resize() { + f.termSizeCh <- termSize + } + }() + } // Wait for context cancellation + loggerPrintf("waiting for Stream to finish") <-ctx.Done() - + loggerPrintf("Stream has completed") return nil }