Skip to content

Commit

Permalink
Improve stdin handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tomekjarosik committed Nov 27, 2024
1 parent 4d99030 commit 01b23b9
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 99 deletions.
41 changes: 0 additions & 41 deletions pkg/ctxio/polling.go

This file was deleted.

8 changes: 5 additions & 3 deletions pkg/fugaci/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,19 @@ func NewVM(ctx context.Context,
cst.Name = pod.Spec.Containers[containerIndex].Name
cst.State = v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Creating", Message: "Just initialized"}}

cspec := pod.Spec.Containers[containerIndex]

customLogger := log.New(os.Stdout,
fmt.Sprintf("pod=%s/%s, ", pod.Namespace, pod.Name),
log.LstdFlags|log.Lmsgprefix|log.Lshortfile)

envVars, username, password, err := extractSSHEnvVars(pod.Spec.Containers[containerIndex])
envVars, username, password, err := extractSSHEnvVars(cspec)
if err != nil {
return nil, fmt.Errorf("failed to extract ssh env vars: %w", err)
}

fileStreams, err := streams.NewFilesBasedStreams(containerLogsDirectory,
fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, cst.Name))
lognamePrefix := fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, cst.Name)
fileStreams, err := streams.NewFilesBasedStreams(containerLogsDirectory, lognamePrefix, cspec.Stdin, cspec.TTY)
if err != nil {
return nil, fmt.Errorf("failed to create temporary files based streams: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fugaci_test/testdata/pod6-attach-stdout.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "bash", "-c", 'while true; do echo "Current time: $(date)"; sleep 0.1; done' ]
command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "$i"; sleep 0.1; done' ]
name: curie3
envFrom:
- secretRef:
Expand Down
22 changes: 22 additions & 0 deletions pkg/fugaci_test/testdata/pod7-attach-stdin.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: v1
kind: Pod
metadata:
name: testpod1
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "bash" ]
name: curie3
stdin: true
tty: true
envFrom:
- secretRef:
name: fugaci-ssh-secret
nodeSelector:
kubernetes.io/os: darwin
tolerations:
- key: "fugaci.macvm.io"
operator: "Equal"
value: "true"
effect: "NoSchedule"
1 change: 1 addition & 0 deletions pkg/sshrunner/sshrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func handleSSHWindowResize(session *ssh.Session, resize <-chan api.TermSize) {
// Send the window change request to the SSH session with the new terminal size
if err := session.WindowChange(int(termSize.Height), int(termSize.Width)); err != nil {
log.Printf("failed to change window size: %v\n", err)
break
}
}
log.Printf("window resize terminated")
Expand Down
107 changes: 53 additions & 54 deletions pkg/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,58 @@ import (
"context"
"errors"
"fmt"
"github.com/macvmio/fugaci/pkg/ctxio"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"io"
"os"
"sync"
"time"

"github.com/macvmio/fugaci/pkg/ctxio"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
)

var _ api.AttachIO = (*FilesBasedStreams)(nil)

type FilesBasedStreams struct {
stdinFile *os.File
stdoutFile *os.File
stderrFile *os.File

stdinReader *os.File
stdinWriter *os.File

allocateTTY bool
termSizeCh chan api.TermSize

mu sync.Mutex
cleanupWG sync.WaitGroup
cleanOnce sync.Once
}

func NewFilesBasedStreams(directory, prefix string) (*FilesBasedStreams, error) {
stdinFile, err := os.CreateTemp(directory, prefix+"_vm_stdin_*.log")
if err != nil {
return nil, fmt.Errorf("error creating temporary stdin file: %v", err)
func NewFilesBasedStreams(directory, prefix string, allocateStdin, allocateTTY bool) (*FilesBasedStreams, error) {
var err error
f := FilesBasedStreams{allocateTTY: allocateTTY}
if allocateTTY {
f.termSizeCh = make(chan api.TermSize)
}

stdoutFile, err := os.CreateTemp(directory, prefix+"_vm_stdout_*.log")
f.stdoutFile, err = os.CreateTemp(directory, prefix+"_vm_stdout_*.log")
if err != nil {
stdinFile.Close()
return nil, fmt.Errorf("error creating temporary stdout file: %v", err)
}

stderrFile, err := os.CreateTemp(directory, prefix+"_vm_stderr_*.log")
f.stderrFile, err = os.CreateTemp(directory, prefix+"_vm_stderr_*.log")
if err != nil {
stdinFile.Close()
stdoutFile.Close()
f.stdoutFile.Close()
return nil, fmt.Errorf("error creating temporary stderr file: %v", err)
}

return &FilesBasedStreams{
stdinFile: stdinFile,
stdoutFile: stdoutFile,
stderrFile: stderrFile,
}, nil
if allocateStdin {
f.stdinReader, f.stdinWriter, err = os.Pipe()
if err != nil {
return nil, fmt.Errorf("error creating stdin pipe: %v", err)
}
}
return &f, nil
}

func (f *FilesBasedStreams) Stdin() io.Reader {
return f.stdinFile
return f.stdinReader
}

func (f *FilesBasedStreams) Stdout() io.WriteCloser {
Expand All @@ -64,12 +67,11 @@ func (f *FilesBasedStreams) Stderr() io.WriteCloser {
}

func (f *FilesBasedStreams) TTY() bool {
return false
return f.allocateTTY
}

func (f *FilesBasedStreams) Resize() <-chan api.TermSize {
// TODO: implement if needed
return nil
return f.termSizeCh
}

// Cleanup removes the temporary files created for stdin, stdout, and stderr.
Expand All @@ -84,37 +86,24 @@ func (f *FilesBasedStreams) Cleanup() error {
// Wait for any ongoing operations to finish
f.cleanupWG.Wait()

// Close and remove stdinFile
if f.stdinFile != nil {
if err := f.stdinFile.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close stdin file: %w", err))
}
if err := os.Remove(f.stdinFile.Name()); err != nil {
errs = append(errs, fmt.Errorf("failed to remove stdin file: %w", err))
}
f.stdinFile = nil
if err := f.stdoutFile.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close stdout file: %w", err))
}
if err := os.Remove(f.stdoutFile.Name()); err != nil {
errs = append(errs, fmt.Errorf("failed to remove stdout file: %w", err))
}

// Close and remove stdoutFile
if f.stdoutFile != nil {
if err := f.stdoutFile.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close stdout file: %w", err))
}
if err := os.Remove(f.stdoutFile.Name()); err != nil {
errs = append(errs, fmt.Errorf("failed to remove stdout file: %w", err))
}
f.stdoutFile = nil
if err := f.stderrFile.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close stderr file: %w", err))
}
if err := os.Remove(f.stderrFile.Name()); err != nil {
errs = append(errs, fmt.Errorf("failed to remove stderr file: %w", err))
}

// Close and remove stderrFile
if f.stderrFile != nil {
if err := f.stderrFile.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close stderr file: %w", err))
}
if err := os.Remove(f.stderrFile.Name()); err != nil {
errs = append(errs, fmt.Errorf("failed to remove stderr file: %w", err))
}
f.stderrFile = nil
// Close termSizeCh
if f.termSizeCh != nil {
close(f.termSizeCh)
f.termSizeCh = nil
}
})

Expand Down Expand Up @@ -154,20 +143,30 @@ func (f *FilesBasedStreams) Stream(ctx context.Context, attach api.AttachIO, log
}()

// Handle stdin
if attach.Stdin() != nil {
if f.stdinWriter != nil && attach.Stdin() != nil {
f.cleanupWG.Add(1)
go func() {
defer f.cleanupWG.Done()
_, err := io.Copy(f.stdinFile, ctxio.NewContextPeriodicReader(ctx, 200*time.Millisecond, attach.Stdin()))
_, err := io.Copy(f.stdinWriter, attach.Stdin())
if !allowableError(err) {
loggerPrintf("Error streaming stdin: %v", err)
}
}()
}

if attach.TTY() {
f.cleanupWG.Add(1)
go func() {
defer f.cleanupWG.Done()
for termSize := range attach.Resize() {
f.termSizeCh <- termSize
}
}()
}
// Wait for context cancellation
loggerPrintf("waiting for Stream to finish")
<-ctx.Done()

loggerPrintf("Stream has completed")
return nil
}

Expand Down

0 comments on commit 01b23b9

Please sign in to comment.