Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for "kubectl attach" #1

Merged
merged 4 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tailored to their specific macOS deployment needs.
```yaml
nodeName: mac-m1
kubeConfigPath: /Users/your_username/.kube/config
containerLogsDirectory: /var/logs/fugaci
curieVirtualization:
binaryPath: /usr/local/bin/curie
dataRootPath: /Users/your_username/.curie
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,6 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/macvmio/geranos v0.7.0 h1:dvMxg0Ql0UkCf5a5opxBWk3Cun6opYZeEldua9iA2nk=
github.com/macvmio/geranos v0.7.0/go.mod h1:1h+kJVmPysFRdrxu8c5+4twYUloD/Gofyvr5c553JeI=
github.com/macvmio/geranos v0.7.5 h1:27mxpatl5dJA91NfFDXe1Oar9iWRb/HmXjjcIgoyorw=
github.com/macvmio/geranos v0.7.5/go.mod h1:Z5v2gyJ65rClu0hNA2CWAL+02So2VgIlhbgXnsmRSuI=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand All @@ -398,8 +396,6 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mobileinf/geranos v0.6.2 h1:OP0ODIFFPN0bufNzdOPsx6O7dZFU7iHomt6DDCArNEk=
github.com/mobileinf/geranos v0.6.2/go.mod h1:7taEFHyZcZA08EeD9hYvclCVj3393Gu89FOMVL7ot8w=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/term v0.0.0-20200312100748-672ec06f55cd/go.mod h1:DdlQx2hp0Ss5/fLikoLlEeIYiATotOjgB//nb973jeo=
Expand Down
28 changes: 28 additions & 0 deletions pkg/ctxio/multicontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ctxio

import (
"context"
"sync"
)

// MultiContext returns a context that is canceled when any of the provided contexts are canceled.
func MultiContext(ctxs ...context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

var once sync.Once
for _, c := range ctxs {
c := c // Capture range variable
go func() {
select {
case <-c.Done():
once.Do(func() {
cancel()
})
case <-ctx.Done():
// The merged context was canceled elsewhere
}
}()
}

return ctx, cancel
}
98 changes: 98 additions & 0 deletions pkg/ctxio/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package ctxio

import (
"context"
"io"
"os"

"github.com/fsnotify/fsnotify"
)

type TailReader struct {
ctx context.Context
file *os.File
watcher *fsnotify.Watcher
filename string
}

func NewTailReader(ctx context.Context, filename string) (*TailReader, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}

// Ensure the watcher is closed if an error occurs
defer func() {
if err != nil {
watcher.Close()
}
}()

err = watcher.Add(filename)
if err != nil {
return nil, err
}

file, err := os.Open(filename)
if err != nil {
return nil, err
}

return &TailReader{
ctx: ctx,
file: file,
watcher: watcher,
filename: filename,
}, nil
}

func (tr *TailReader) Read(p []byte) (int, error) {
for {
select {
case <-tr.ctx.Done():
return 0, tr.ctx.Err()
default:
n, err := tr.file.Read(p)
if err == io.EOF {
// Wait for a write event
select {
case event := <-tr.watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
// Continue reading after write event
continue
}
// Handle file truncation (e.g., log rotation)
if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename {
// Reopen the file
tr.file.Close()
file, err := os.Open(tr.filename)
if err != nil {
return 0, err
}
tr.file = file
continue
}
case err := <-tr.watcher.Errors:
return 0, err
case <-tr.ctx.Done():
return 0, tr.ctx.Err()
}
}
return n, err
}
}
}

func (tr *TailReader) Close() error {
var err1, err2 error
if tr.watcher != nil {
err1 = tr.watcher.Close()
}
if tr.file != nil {
err2 = tr.file.Close()
}
if err1 != nil {
return err1
}
return err2
}
20 changes: 11 additions & 9 deletions pkg/fugaci/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
)

type Config struct {
NodeName string `mapstructure:"nodeName"`
KubeConfigPath string `mapstructure:"kubeConfigPath"`
LogLevel string `mapstructure:"logLevel"`
CurieVirtualization struct {
NodeName string `mapstructure:"nodeName"`
KubeConfigPath string `mapstructure:"kubeConfigPath"`
LogLevel string `mapstructure:"logLevel"`
ContainerLogsDirectory string `mapstructure:"containerLogsDirectory"`
CurieVirtualization struct {
BinaryPath string `mapstructure:"binaryPath"`
DataRootPath string `mapstructure:"dataRootPath"`
} `mapstructure:"curieVirtualization"`
Expand All @@ -35,11 +36,12 @@ func (c *Config) Validate() error {

// Validate the paths
paths := map[string]string{
"KubeConfigPath": c.KubeConfigPath,
"CurieBinaryPath": c.CurieVirtualization.BinaryPath,
"CurieDataRootPath": c.CurieVirtualization.DataRootPath,
"TLS.KeyPath": c.TLS.KeyPath,
"TLS.CertPath": c.TLS.CertPath,
"KubeConfigPath": c.KubeConfigPath,
"CurieBinaryPath": c.CurieVirtualization.BinaryPath,
"CurieDataRootPath": c.CurieVirtualization.DataRootPath,
"ContainerLogsDirectory": c.ContainerLogsDirectory,
"TLS.KeyPath": c.TLS.KeyPath,
"TLS.CertPath": c.TLS.CertPath,
}

for name, path := range paths {
Expand Down
12 changes: 11 additions & 1 deletion pkg/fugaci/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/macvmio/fugaci/pkg/ctxio"
"github.com/macvmio/fugaci/pkg/curie"
"github.com/macvmio/fugaci/pkg/portforwarder"
"github.com/macvmio/fugaci/pkg/sshrunner"
Expand Down Expand Up @@ -63,6 +64,7 @@ func (s *Provider) allocateVM(pod *v1.Pod) (*VM, error) {
}
vm, err := NewVM(s.appContext, s.virt, s.puller,
sshrunner.NewRunner(), portforwarder.NewPortForwarder(),
s.cfg.ContainerLogsDirectory,
pod, 0)
if err != nil {
return nil, err
Expand Down Expand Up @@ -225,11 +227,19 @@ func (s *Provider) PortForward(ctx context.Context, namespace, podName string, p
if err != nil {
return fmt.Errorf("failed to find VM for pod %s/%s: %w", namespace, podName, err)
}
ctx, cancel := ctxio.MultiContext(ctx, vm.cmdLifetimeCtx)
defer cancel()
return vm.PortForward(ctx, port, stream)
}

func (s *Provider) AttachToContainer(ctx context.Context, namespace, podName, containerName string, attach api.AttachIO) error {
return ErrNotImplemented
vm, err := s.findVMByNames(namespace, podName, containerName)
if err != nil {
return fmt.Errorf("failed to find VM for pod %s/%s: %w", namespace, podName, err)
}
ctx, cancel := ctxio.MultiContext(ctx, vm.cmdLifetimeCtx)
defer cancel()
return vm.AttachToContainer(ctx, attach)
}

func (s *Provider) GetStatsSummary(ctx context.Context) (*statsv1alpha1.Summary, error) {
Expand Down
38 changes: 34 additions & 4 deletions pkg/fugaci/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"github.com/macvmio/fugaci/pkg/streams"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"io"
"log"
"net"
Expand Down Expand Up @@ -75,11 +77,19 @@ type VM struct {
sshDialInfo sshrunner.DialInfo
env []v1.EnvVar

streams *streams.FilesBasedStreams
logger *log.Logger
storyLine *storyline.StoryLine
}

func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SSHRunner, portForwarder PortForwarder, pod *v1.Pod, containerIndex int) (*VM, error) {
func NewVM(ctx context.Context,
virt Virtualization,
puller Puller,
sshRunner SSHRunner,
portForwarder PortForwarder,
containerLogsDirectory string,
pod *v1.Pod,
containerIndex int) (*VM, error) {
if containerIndex < 0 || containerIndex >= len(pod.Spec.Containers) {
return nil, errors.New("invalid container index")
}
Expand All @@ -96,15 +106,23 @@ func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SS
cst.Name = pod.Spec.Containers[containerIndex].Name
cst.State = v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "Creating", Message: "Just initialized"}}

cspec := pod.Spec.Containers[containerIndex]

customLogger := log.New(os.Stdout,
fmt.Sprintf("pod=%s/%s, ", pod.Namespace, pod.Name),
log.LstdFlags|log.Lmsgprefix|log.Lshortfile)

envVars, username, password, err := extractSSHEnvVars(pod.Spec.Containers[containerIndex])
envVars, username, password, err := extractSSHEnvVars(cspec)
if err != nil {
return nil, fmt.Errorf("failed to extract ssh env vars: %w", err)
}

lognamePrefix := fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, cst.Name)
fileStreams, err := streams.NewFilesBasedStreams(containerLogsDirectory, lognamePrefix, cspec.Stdin, cspec.TTY)
if err != nil {
return nil, fmt.Errorf("failed to create temporary files based streams: %w", err)
}

vm := &VM{
virt: virt,
puller: puller,
Expand All @@ -122,6 +140,7 @@ func NewVM(ctx context.Context, virt Virtualization, puller Puller, sshRunner SS
Password: password,
},
env: envVars,
streams: fileStreams,
logger: customLogger,
storyLine: storyline.New(),
}
Expand Down Expand Up @@ -237,7 +256,8 @@ func (s *VM) waitAndRunCommandInside(ctx context.Context, startedAt time.Time, c
s.logger.Printf("successfully established SSH session")
command := s.GetCommand()
s.storyLine.Add("container_command", command)
err = s.RunCommand(ctx, command, sshrunner.WithEnv(s.GetEnvVars()))

err = s.RunCommand(ctx, command, sshrunner.WithEnv(s.GetEnvVars()), sshrunner.WithAttachIO(s.streams))
if err != nil {
s.storyLine.Add("container_command_run_err", err)
s.logger.Printf("command '%v' finished with error: %v", s.GetCommand(), err)
Expand Down Expand Up @@ -353,12 +373,14 @@ func (s *VM) Run() {

err = s.waitAndRunCommandInside(s.cmdLifetimeCtx, startedAt.Time, containerID)

s.wg.Add(1)
// This needs to be done on separate thread, because otherwise will result in defunct process,
// and Stop() method will keep running
s.wg.Add(1)
go s.stopContainer(containerID, startedAt.Time)

err = runCmd.Wait()
s.cmdCancelFunc(nil)

s.storyLine.Add("container_exitcode", runCmd.ProcessState.ExitCode())
s.storyLine.Add("container_process_state", runCmd.ProcessState)
if err != nil && runCmd.ProcessState.ExitCode() != 0 {
Expand All @@ -368,6 +390,10 @@ func (s *VM) Run() {
return
}

err = s.streams.Close()
if err != nil {
s.storyLine.Add("streamsClosingErr", err)
}
s.logger.Printf("container '%v' finished successfully: %v, exit code=%d\n", containerID, runCmd, runCmd.ProcessState.ExitCode())
s.safeUpdateState(v1.ContainerState{Terminated: &v1.ContainerStateTerminated{
ExitCode: int32(runCmd.ProcessState.ExitCode()),
Expand Down Expand Up @@ -509,6 +535,10 @@ func (s *VM) PortForward(ctx context.Context, port int32, stream io.ReadWriteClo
return s.portForwarder.PortForward(ctx, fmt.Sprintf("%s:%d", s.safeGetPod().Status.PodIP, port), stream)
}

func (s *VM) AttachToContainer(ctx context.Context, attach api.AttachIO) error {
return s.streams.Stream(ctx, attach, s.logger.Printf)
}

// Below are functions which are safe to call in multiple goroutines

func (s *VM) IsReady() bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/fugaci/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func setupCommonTestVM(t *testing.T, podOverride func(*v1.Pod)) (*VM, *MockVirtu
}
podOverride(pod)

vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, mockPortFortwarder, pod, 0)
vm, err := NewVM(context.Background(), mockVirt, mockPuller, mockSSHRunner, mockPortFortwarder, t.TempDir(), pod, 0)
require.NoError(t, err)
require.NotNil(t, vm)

Expand Down
20 changes: 20 additions & 0 deletions pkg/fugaci_test/testdata/pod6-attach-stdout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: Pod
metadata:
name: testpod1
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "bash", "-c", 'for ((i=1;;i++)); do echo "$i"; sleep 0.1; done' ]
name: curie3
envFrom:
- secretRef:
name: fugaci-ssh-secret
nodeSelector:
kubernetes.io/os: darwin
tolerations:
- key: "fugaci.macvm.io"
operator: "Equal"
value: "true"
effect: "NoSchedule"
22 changes: 22 additions & 0 deletions pkg/fugaci_test/testdata/pod7-attach-stdin.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: v1
kind: Pod
metadata:
name: testpod1
spec:
containers:
- image: ghcr.io/macvmio/macos-sonoma:14.5-agent-v1.7
imagePullPolicy: Never
command: [ "bash" ]
name: curie3
stdin: true
tty: true
envFrom:
- secretRef:
name: fugaci-ssh-secret
nodeSelector:
kubernetes.io/os: darwin
tolerations:
- key: "fugaci.macvm.io"
operator: "Equal"
value: "true"
effect: "NoSchedule"
1 change: 1 addition & 0 deletions pkg/sshrunner/sshrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func handleSSHWindowResize(session *ssh.Session, resize <-chan api.TermSize) {
// Send the window change request to the SSH session with the new terminal size
if err := session.WindowChange(int(termSize.Height), int(termSize.Width)); err != nil {
log.Printf("failed to change window size: %v\n", err)
break
}
}
log.Printf("window resize terminated")
Expand Down
Loading
Loading