From 5495ca9dcd1ea258ddb786a437f91f9e7a8b06e9 Mon Sep 17 00:00:00 2001 From: Yar Kravtsov Date: Mon, 4 Nov 2024 07:28:43 +0200 Subject: [PATCH] feat: Implement Docker image synchronization module --- pkg/dockersync/dockersync.go | 435 ++++++++++++++++++++++++++ pkg/dockersync/dockersync_test.go | 194 ++++++++++++ pkg/dockersync/testdata/Dockerfile | 41 +++ pkg/dockersync/testdata/entrypoint.sh | 18 ++ pkg/executor/ssh/ssh.go | 24 ++ 5 files changed, 712 insertions(+) create mode 100644 pkg/dockersync/dockersync.go create mode 100644 pkg/dockersync/dockersync_test.go create mode 100644 pkg/dockersync/testdata/Dockerfile create mode 100644 pkg/dockersync/testdata/entrypoint.sh diff --git a/pkg/dockersync/dockersync.go b/pkg/dockersync/dockersync.go new file mode 100644 index 0000000..8ca0b40 --- /dev/null +++ b/pkg/dockersync/dockersync.go @@ -0,0 +1,435 @@ +package dockersync + +import ( + "archive/tar" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "github.com/yarlson/ftl/pkg/executor/ssh" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" +) + +// Config holds the configuration for the Docker image sync operation. +type Config struct { + ImageName string + LocalStore string + RemoteStore string + MaxParallel int +} + +// ImageSync handles Docker image synchronization operations. +type ImageSync struct { + cfg Config + client *ssh.Client +} + +// ImageData represents Docker image metadata. +type ImageData struct { + Config struct { + Hostname string `json:"Hostname"` + Domainname string `json:"Domainname"` + User string `json:"User"` + AttachStdin bool `json:"AttachStdin"` + AttachStdout bool `json:"AttachStdout"` + AttachStderr bool `json:"AttachStderr"` + ExposedPorts struct{} `json:"ExposedPorts"` + Tty bool `json:"Tty"` + OpenStdin bool `json:"OpenStdin"` + StdinOnce bool `json:"StdinOnce"` + Env []string `json:"Env"` + Cmd []string `json:"Cmd"` + Image string `json:"Image"` + Volumes struct{} `json:"Volumes"` + WorkingDir string `json:"WorkingDir"` + Entrypoint []string `json:"Entrypoint"` + OnBuild []string `json:"OnBuild"` + Labels struct{} `json:"Labels"` + } `json:"Config"` + RootFS struct { + Type string `json:"Type"` + Layers []string `json:"Layers"` + DiffIDs []string `json:"DiffIDs"` + } `json:"RootFS"` + Architecture string `json:"Architecture"` + Os string `json:"Os"` +} + +// NewImageSync creates a new ImageSync instance with the provided configuration and SSH client. +func NewImageSync(cfg Config, client *ssh.Client) *ImageSync { + if cfg.MaxParallel <= 0 { + cfg.MaxParallel = 4 + } + if cfg.LocalStore == "" { + cfg.LocalStore = filepath.Join(os.Getenv("HOME"), "docker-images") + } + + return &ImageSync{ + cfg: cfg, + client: client, + } +} + +// Sync performs the Docker image synchronization process. +func (s *ImageSync) Sync(ctx context.Context) error { + needsSync, err := s.compareImages(ctx) + if err != nil { + return fmt.Errorf("failed to compare images: %w", err) + } + + if !needsSync { + return nil // Images are identical + } + + if err := s.prepareDirectories(ctx); err != nil { + return fmt.Errorf("failed to prepare directories: %w", err) + } + + if err := s.exportAndExtractImage(ctx); err != nil { + return fmt.Errorf("failed to export and extract image: %w", err) + } + + if err := s.transferMetadata(ctx); err != nil { + return fmt.Errorf("failed to transfer metadata: %w", err) + } + + if err := s.syncBlobs(ctx); err != nil { + return fmt.Errorf("failed to sync blobs: %w", err) + } + + if err := s.loadRemoteImage(ctx); err != nil { + return fmt.Errorf("failed to load remote image: %w", err) + } + + return nil +} + +// compareImages checks if the image needs to be synced by comparing local and remote versions. +func (s *ImageSync) compareImages(ctx context.Context) (bool, error) { + localInspect, err := s.inspectLocalImage() + if err != nil { + return false, fmt.Errorf("failed to inspect local image: %w", err) + } + + remoteInspect, err := s.inspectRemoteImage(ctx) + if err != nil { + return true, nil // Assume sync needed if remote inspection fails + } + + // Compare normalized JSON data + return !compareImageData(localInspect, remoteInspect), nil +} + +func (s *ImageSync) inspectLocalImage() (*ImageData, error) { + cmd := exec.Command("docker", "inspect", s.cfg.ImageName) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("docker inspect failed: %w", err) + } + + var data []ImageData + if err := json.Unmarshal(output, &data); err != nil { + return nil, fmt.Errorf("failed to parse inspect data: %w", err) + } + + if len(data) == 0 { + return nil, fmt.Errorf("no image data found") + } + + return &data[0], nil +} + +func (s *ImageSync) inspectRemoteImage(ctx context.Context) (*ImageData, error) { + output, err := s.client.RunCommandOutput(fmt.Sprintf("docker inspect %s", s.cfg.ImageName)) + if err != nil { + return nil, err + } + + var data []ImageData + if err := json.Unmarshal([]byte(output), &data); err != nil { + return nil, fmt.Errorf("failed to parse remote inspect data: %w", err) + } + + if len(data) == 0 { + return nil, fmt.Errorf("no remote image data found") + } + + return &data[0], nil +} + +func (s *ImageSync) prepareDirectories(ctx context.Context) error { + if err := os.MkdirAll(s.cfg.LocalStore, 0755); err != nil { + return fmt.Errorf("failed to create local store: %w", err) + } + + if _, err := s.client.RunCommand(ctx, fmt.Sprintf("mkdir -p %s", s.cfg.RemoteStore)); err != nil { + return fmt.Errorf("failed to create remote store: %w", err) + } + + return nil +} + +func (s *ImageSync) exportAndExtractImage(ctx context.Context) error { + imageDir := strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName) + localPath := filepath.Join(s.cfg.LocalStore, imageDir) + + if err := os.MkdirAll(localPath, 0755); err != nil { + return fmt.Errorf("failed to create image directory: %w", err) + } + + tarPath := filepath.Join(localPath, "image.tar") + cmd := exec.Command("docker", "save", s.cfg.ImageName, "-o", tarPath) + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to save image: %w", err) + } + + if err := extractTar(ctx, tarPath, localPath); err != nil { + return fmt.Errorf("failed to extract tar: %w", err) + } + + return os.Remove(tarPath) +} + +func extractTar(ctx context.Context, tarPath, destPath string) error { + file, err := os.Open(tarPath) + if err != nil { + return fmt.Errorf("failed to open tar file: %w", err) + } + defer file.Close() + + var tr *tar.Reader + + // Check if the file is gzipped + gzr, err := gzip.NewReader(file) + if err == nil { + defer gzr.Close() + tr = tar.NewReader(gzr) + } else { + // If not gzipped, reset the file pointer and read as a regular tar + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file pointer: %w", err) + } + tr = tar.NewReader(file) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + header, err := tr.Next() + if err == io.EOF { + break // End of archive + } + if err != nil { + return fmt.Errorf("tar reading error: %w", err) + } + + target := filepath.Join(destPath, header.Name) + + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(target, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", target, err) + } + case tar.TypeReg: + if err := extractFile(tr, target); err != nil { + return err + } + default: + return fmt.Errorf("unsupported file type %b in %s", header.Typeflag, header.Name) + } + } + + return nil +} + +func extractFile(tr *tar.Reader, target string) error { + f, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", target, err) + } + defer f.Close() + + if _, err := io.Copy(f, tr); err != nil { + return fmt.Errorf("failed to write file %s: %w", target, err) + } + return nil +} + +func (s *ImageSync) syncBlobs(ctx context.Context) error { + localBlobs, err := s.listLocalBlobs() + if err != nil { + return err + } + + remoteBlobs, err := s.listRemoteBlobs(ctx) + if err != nil { + return err + } + + // Determine blobs to transfer + var blobsToTransfer []string + for _, blob := range localBlobs { + if !contains(remoteBlobs, blob) { + blobsToTransfer = append(blobsToTransfer, blob) + } + } + + // Transfer blobs in parallel batches + return s.transferBlobs(ctx, blobsToTransfer) +} + +func (s *ImageSync) transferBlobs(ctx context.Context, blobs []string) error { + if len(blobs) == 0 { + return nil + } + + var wg sync.WaitGroup + errChan := make(chan error, len(blobs)) + semaphore := make(chan struct{}, s.cfg.MaxParallel) + + for _, blob := range blobs { + wg.Add(1) + go func(blob string) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + if err := s.transferBlob(ctx, blob); err != nil { + errChan <- fmt.Errorf("failed to transfer blob %s: %w", blob, err) + } + }(blob) + } + + go func() { + wg.Wait() + close(errChan) + }() + + for err := range errChan { + if err != nil { + return err + } + } + + return nil +} + +func (s *ImageSync) loadRemoteImage(ctx context.Context) error { + cmd := fmt.Sprintf("cd %s && tar -cf - . | docker load", + filepath.Join(s.cfg.RemoteStore, strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName))) + + _, err := s.client.RunCommand(ctx, cmd) + return err +} + +// Helper functions + +func compareImageData(local, remote *ImageData) bool { + localJSON, err := json.Marshal(local) + if err != nil { + return false + } + + remoteJSON, err := json.Marshal(remote) + if err != nil { + return false + } + + return string(localJSON) == string(remoteJSON) +} + +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +// listLocalBlobs returns a list of blob hashes from the local blob directory. +func (s *ImageSync) listLocalBlobs() ([]string, error) { + imageDir := strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName) + blobPath := filepath.Join(s.cfg.LocalStore, imageDir, "blobs", "sha256") + + entries, err := os.ReadDir(blobPath) + if err != nil { + return nil, err + } + + var blobs []string + for _, entry := range entries { + if !entry.IsDir() { + blobs = append(blobs, entry.Name()) + } + } + + return blobs, nil +} + +// listRemoteBlobs returns a list of blob hashes from the remote blob directory. +func (s *ImageSync) listRemoteBlobs(ctx context.Context) ([]string, error) { + imageDir := strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName) + blobPath := filepath.Join(s.cfg.RemoteStore, imageDir, "blobs", "sha256") + + output, err := s.client.RunCommand(ctx, "ls", blobPath) + if err != nil { + return nil, nil // Return empty list if directory doesn't exist + } + + data, err := io.ReadAll(output) + if err != nil { + return nil, err + } + + blobs := strings.Fields(string(data)) + return blobs, nil +} + +// transferBlob copies a single blob to the remote host. +func (s *ImageSync) transferBlob(ctx context.Context, blob string) error { + imageDir := strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName) + localPath := filepath.Join(s.cfg.LocalStore, imageDir, "blobs", "sha256", blob) + remotePath := filepath.Join(s.cfg.RemoteStore, imageDir, "blobs", "sha256", blob) + + _, err := s.client.RunCommand(ctx, "mkdir", "-p", filepath.Dir(remotePath)) + if err != nil { + return err + } + + return s.client.CopyFile(ctx, localPath, remotePath) +} + +// transferMetadata copies the image metadata files to the remote host. +func (s *ImageSync) transferMetadata(ctx context.Context) error { + imageDir := strings.NewReplacer(":", "_", "/", "_").Replace(s.cfg.ImageName) + localDir := filepath.Join(s.cfg.LocalStore, imageDir) + remoteDir := filepath.Join(s.cfg.RemoteStore, imageDir) + + metadataFiles := []string{"index.json", "manifest.json", "oci-layout"} + + for _, file := range metadataFiles { + localPath := filepath.Join(localDir, file) + remotePath := filepath.Join(remoteDir, file) + + _, err := s.client.RunCommand(ctx, "mkdir", "-p", filepath.Dir(remotePath)) + if err != nil { + return err + } + + if err := s.client.CopyFile(ctx, localPath, remotePath); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/dockersync/dockersync_test.go b/pkg/dockersync/dockersync_test.go new file mode 100644 index 0000000..f1236cf --- /dev/null +++ b/pkg/dockersync/dockersync_test.go @@ -0,0 +1,194 @@ +package dockersync + +import ( + "context" + "fmt" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/client" + "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "io" + "os" + "path/filepath" + "testing" + + "github.com/yarlson/ftl/pkg/executor/ssh" +) + +const ( + testImage = "golang:1.21-alpine" + sshPort = "22/tcp" +) + +type testContainer struct { + container testcontainers.Container + sshPort nat.Port +} + +func setupTestContainer(t *testing.T) (*testContainer, error) { + ctx := context.Background() + + // Build the test container image + buildCtx, err := createBuildContext(t) + require.NoError(t, err) + defer os.RemoveAll(buildCtx) + + req := testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + FromDockerfile: testcontainers.FromDockerfile{ + Context: buildCtx, + Dockerfile: "Dockerfile", + }, + ExposedPorts: []string{sshPort}, + Privileged: true, // Required for Docker daemon + WaitingFor: wait.ForAll( + wait.ForListeningPort(sshPort), + ), + Env: map[string]string{ + "DOCKER_TLS_CERTDIR": "", // Disable TLS for testing + }, + }, + Started: true, + } + + container, err := testcontainers.GenericContainer(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to start container: %w", err) + } + + mappedPort, err := container.MappedPort(ctx, nat.Port(sshPort)) + if err != nil { + return nil, fmt.Errorf("failed to get mapped port: %w", err) + } + + return &testContainer{ + container: container, + sshPort: mappedPort, + }, nil +} + +func createBuildContext(t *testing.T) (string, error) { + dir, err := os.MkdirTemp("", "dockersync-test") + if err != nil { + return "", err + } + + // Copy Dockerfile + dockerfile := filepath.Join(dir, "Dockerfile") + if err := copyFile("testdata/Dockerfile", dockerfile); err != nil { + os.RemoveAll(dir) + return "", err + } + + // Copy entrypoint script + entrypoint := filepath.Join(dir, "entrypoint.sh") + if err := copyFile("testdata/entrypoint.sh", entrypoint); err != nil { + os.RemoveAll(dir) + return "", err + } + + return dir, nil +} + +func copyFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(dst) + if err != nil { + return err + } + defer destFile.Close() + + _, err = io.Copy(destFile, sourceFile) + return err +} + +func setupTestImage(t *testing.T, dockerClient *client.Client) error { + ctx := context.Background() + + // Pull test image + reader, err := dockerClient.ImagePull(ctx, testImage, image.PullOptions{}) + if err != nil { + return err + } + defer reader.Close() + _, _ = io.Copy(io.Discard, reader) + + return nil +} + +func TestImageSync(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Set up test container + t.Log("Setting up test container...") + tc, err := setupTestContainer(t) + require.NoError(t, err) + defer func() { _ = tc.container.Terminate(context.Background()) }() + + // Create SSH client + t.Log("Creating SSH client...") + sshClient, err := ssh.ConnectWithUserPassword("127.0.0.1", tc.sshPort.Port(), "root", "testpassword") + require.NoError(t, err) + defer sshClient.Close() + + // Create Docker client + t.Log("Creating Docker client...") + dockerClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) + defer dockerClient.Close() + + // Set up test image + t.Log("Setting up test image...") + err = setupTestImage(t, dockerClient) + require.NoError(t, err) + + // Create temporary directories for test + t.Log("Creating temporary directories...") + localStore, err := os.MkdirTemp("", "dockersync-local") + require.NoError(t, err) + defer os.RemoveAll(localStore) + + remoteStore := "/tmp/dockersync-remote" + + // Initialize ImageSync + cfg := Config{ + ImageName: testImage, + LocalStore: localStore, + RemoteStore: remoteStore, + MaxParallel: 4, + } + + sync := NewImageSync(cfg, sshClient) + + // Run sync + t.Log("Running sync...") + ctx := context.Background() + err = sync.Sync(ctx) + require.NoError(t, err) + + // Verify image exists on remote + t.Log("Verifying image exists on remote...") + output, err := sshClient.RunCommandOutput("docker images --format '{{.Repository}}:{{.Tag}}'") + require.NoError(t, err) + require.Contains(t, output, testImage) + + // Test image comparison + t.Log("Comparing images...") + needsSync, err := sync.compareImages(ctx) + require.NoError(t, err) + require.False(t, needsSync, "Images should be identical after sync") + + // Test re-sync with no changes + t.Log("Re-syncing...") + err = sync.Sync(ctx) + require.NoError(t, err) +} diff --git a/pkg/dockersync/testdata/Dockerfile b/pkg/dockersync/testdata/Dockerfile new file mode 100644 index 0000000..905aca2 --- /dev/null +++ b/pkg/dockersync/testdata/Dockerfile @@ -0,0 +1,41 @@ +FROM ubuntu:22.04 + +# Install necessary packages +RUN apt-get update && apt-get install -y \ + openssh-server \ + curl \ + ca-certificates \ + gnupg \ + && rm -rf /var/lib/apt/lists/* + +# Set up SSH server +RUN mkdir /var/run/sshd +RUN echo 'root:testpassword' | chpasswd +RUN sed -i 's/#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config +RUN sed -i 's/#PubkeyAuthentication yes/PubkeyAuthentication yes/' /etc/ssh/sshd_config +RUN mkdir -p /root/.ssh + +# Install Docker +RUN install -m 0755 -d /etc/apt/keyrings +RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg +RUN chmod a+r /etc/apt/keyrings/docker.gpg +RUN echo \ + "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \ + "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \ + tee /etc/apt/sources.list.d/docker.list > /dev/null + +RUN apt-get update && apt-get install -y \ + docker-ce \ + docker-ce-cli \ + containerd.io \ + docker-buildx-plugin \ + docker-compose-plugin \ + && rm -rf /var/lib/apt/lists/* + +# Expose SSH port +EXPOSE 22 + +# Start SSH server and Docker daemon +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/pkg/dockersync/testdata/entrypoint.sh b/pkg/dockersync/testdata/entrypoint.sh new file mode 100644 index 0000000..55afaca --- /dev/null +++ b/pkg/dockersync/testdata/entrypoint.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Start Docker daemon +dockerd & + +# Wait for Docker daemon to be ready +timeout=30 +while ! docker info >/dev/null 2>&1; do + timeout=$((timeout - 1)) + if [ $timeout -le 0 ]; then + echo "Failed to start Docker daemon" + exit 1 + fi + sleep 1 +done + +# Start SSH server +/usr/sbin/sshd -D diff --git a/pkg/executor/ssh/ssh.go b/pkg/executor/ssh/ssh.go index c159262..4e2e609 100644 --- a/pkg/executor/ssh/ssh.go +++ b/pkg/executor/ssh/ssh.go @@ -50,6 +50,30 @@ func ConnectWithUser(host string, port int, user string, key []byte) (*Client, e }, nil } +func ConnectWithUserPassword(host string, port string, user string, password string) (*Client, error) { + config := &ssh.ClientConfig{ + User: user, + Auth: []ssh.AuthMethod{ + ssh.Password(password), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: 10 * time.Second, + } + + addr := fmt.Sprintf("%s:%s", host, port) + + client, err := ssh.Dial("tcp", addr, config) + if err != nil { + return nil, fmt.Errorf("failed to connect: %v", err) + } + + return &Client{ + sshClient: client, + config: config, + addr: addr, + }, nil +} + func (c *Client) ensureConnected() error { if c.sshClient != nil { _, _, err := c.sshClient.SendRequest("keepalive@golang.org", true, nil)