Skip to content

Commit

Permalink
Merge pull request #1203 from wzshiming/feat/expose-port
Browse files Browse the repository at this point in the history
[kwokctl] Support for exposing component ports
  • Loading branch information
wzshiming authored Aug 30, 2024
2 parents 35188d2 + ed43f16 commit f80b5ba
Show file tree
Hide file tree
Showing 58 changed files with 1,093 additions and 216 deletions.
3 changes: 2 additions & 1 deletion pkg/kwok/server/debugging_exec_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/exec"
utilsnet "sigs.k8s.io/kwok/pkg/utils/net"
)

func (s *Server) execInContainerWithTTY(ctx context.Context, cmd []string, in io.Reader, out io.WriteCloser, resize <-chan clientremotecommand.TerminalSize) error {
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s *Server) execInContainerWithTTY(ctx context.Context, cmd []string, in io
io.Reader
io.Writer
}{in, out}
err := tunnel(ctx, pty, stm, buf1, buf2)
err := utilsnet.Tunnel(ctx, pty, stm, buf1, buf2)
if err != nil {
logger.Error("failed to tunnel", err)
}
Expand Down
37 changes: 2 additions & 35 deletions pkg/kwok/server/debugging_port_forword.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/log"
"sigs.k8s.io/kwok/pkg/utils/exec"
utilsnet "sigs.k8s.io/kwok/pkg/utils/net"
"sigs.k8s.io/kwok/pkg/utils/slices"
)

Expand Down Expand Up @@ -74,7 +75,7 @@ func (s *Server) PortForward(ctx context.Context, name string, uid types.UID, po
s.bufPool.Put(buf1)
s.bufPool.Put(buf2)
}()
return tunnel(ctx, stream, dial, buf1, buf2)
return utilsnet.Tunnel(ctx, stream, dial, buf1, buf2)
}

return errors.New("no target or command")
Expand Down Expand Up @@ -144,37 +145,3 @@ func findPortInForwards(port int32, forwards []internalversion.Forward) (*intern
}
return defaultForward, defaultForward != nil
}

// tunnel create tunnels for two streams.
func tunnel(ctx context.Context, c1, c2 io.ReadWriter, buf1, buf2 []byte) error {
errCh := make(chan error)
go func() {
_, err := io.CopyBuffer(c2, c1, buf1)
errCh <- err
}()
go func() {
_, err := io.CopyBuffer(c1, c2, buf2)
errCh <- err
}()
select {
case <-ctx.Done():
// Do nothing
case err1 := <-errCh:
select {
case <-ctx.Done():
if err1 != nil {
return err1
}
// Do nothing
case err2 := <-errCh:
if err1 != nil {
return err1
}
return err2
}
}
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
3 changes: 2 additions & 1 deletion pkg/kwokctl/cmd/hack/del/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func runE(ctx context.Context, flags *flagpole, args []string) error {
return nil
}

etcdclient, err := rt.GetEtcdClient(ctx)
etcdclient, cancel, err := rt.GetEtcdClient(ctx)
if err != nil {
return err
}
defer cancel()

var targetGvr schema.GroupVersionResource
var targetName string
Expand Down
3 changes: 2 additions & 1 deletion pkg/kwokctl/cmd/hack/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ func runE(ctx context.Context, flags *flagpole, args []string) error {
return nil
}

etcdclient, err := rt.GetEtcdClient(ctx)
etcdclient, cancel, err := rt.GetEtcdClient(ctx)
if err != nil {
return err
}
defer cancel()

var targetGvr schema.GroupVersionResource
var targetName string
Expand Down
3 changes: 2 additions & 1 deletion pkg/kwokctl/cmd/hack/put/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ func runE(ctx context.Context, flags *flagpole, args []string) error {
}
}

etcdclient, err := rt.GetEtcdClient(ctx)
etcdclient, cancel, err := rt.GetEtcdClient(ctx)
if err != nil {
return err
}
defer cancel()

kubeconfigPath := rt.GetWorkdirPath(runtime.InHostKubeconfigName)
clientset, err := client.NewClientset("", kubeconfigPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kwokctl/cmd/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
flags := &flagpole{}

cmd := &cobra.Command{
Use: "logs [command]",
Use: "logs [component]",
Short: "Logs one of [audit, etcd, kube-apiserver, kube-controller-manager, kube-scheduler, kwok-controller, dashboard, metrics-server, prometheus, jaeger]",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
Expand Down
101 changes: 101 additions & 0 deletions pkg/kwokctl/cmd/port_forward/port_forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package port_forward implements the `port-forward` command
package port_forward

import (
"context"
"errors"
"fmt"
"os"
"path"
"strconv"
"strings"

"github.com/spf13/cobra"

"sigs.k8s.io/kwok/pkg/config"
"sigs.k8s.io/kwok/pkg/kwokctl/runtime"
"sigs.k8s.io/kwok/pkg/log"
)

type flagpole struct {
Name string
}

// NewCommand returns a new cobra.Command for forwarding the port
func NewCommand(ctx context.Context) *cobra.Command {
flags := &flagpole{}
cmd := &cobra.Command{
Args: cobra.ExactArgs(2),
Use: "port-forward [component] [local-port]:[port-name]",
Short: "Forward one local ports to a component",
RunE: func(cmd *cobra.Command, args []string) error {
flags.Name = config.DefaultCluster
return runE(ctx, flags, args)
},
}
return cmd
}

func runE(ctx context.Context, flags *flagpole, args []string) error {
name := config.ClusterName(flags.Name)
workdir := path.Join(config.ClustersDir, flags.Name)

logger := log.FromContext(ctx).With("cluster", flags.Name)
ctx = log.NewContext(ctx, logger)

rt, err := runtime.DefaultRegistry.Load(ctx, name, workdir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
logger.Warn("Cluster does not exist")
}
return err
}

hostPort, containerPort, err := splitParts(args[1])
if err != nil {
return err
}

port, err := strconv.ParseUint(hostPort, 0, 0)
if err != nil {
return err
}

cancel, err := rt.PortForward(ctx, args[0], containerPort, uint32(port))
if err != nil {
return err
}

defer cancel()
<-ctx.Done()

return nil
}

func splitParts(rawport string) (hostPort string, containerPort string, err error) {
parts := strings.Split(rawport, ":")
n := len(parts)

switch n {
case 2:
return parts[0], parts[1], nil
default:
return "", "", fmt.Errorf("unsupported port format: %s", rawport)
}
}
2 changes: 2 additions & 0 deletions pkg/kwokctl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/hack"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/kubectl"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/logs"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/port_forward"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/scale"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/snapshot"
"sigs.k8s.io/kwok/pkg/kwokctl/cmd/start"
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
snapshot.NewCommand(ctx),
export.NewCommand(ctx),
hack.NewCommand(ctx),
port_forward.NewCommand(ctx),
)
return cmd
}
3 changes: 2 additions & 1 deletion pkg/kwokctl/cmd/snapshot/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ func runE(ctx context.Context, flags *flagpole) error {
return err
}

etcdclient, err := rt.GetEtcdClient(ctx)
etcdclient, cancel, err := rt.GetEtcdClient(ctx)
if err != nil {
return err
}
defer cancel()

clientset, err := rt.GetClientset(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kwokctl/cmd/snapshot/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ func runE(ctx context.Context, flags *flagpole) error {
}
}()

etcdclient, err := rt.GetEtcdClient(ctx)
etcdclient, cancel, err := rt.GetEtcdClient(ctx)
if err != nil {
return err
}
defer cancel()

clientset, err := rt.GetClientset(ctx)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kwokctl/components/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ func BuildDashboardComponent(conf BuildDashboardComponentConfig) (component inte
"--kubeconfig="+conf.KubeconfigPath,
"--insecure-port="+format.String(conf.Port),
)
ports = append(ports,
internalversion.Port{
Name: "http",
Port: conf.Port,
HostPort: 0,
Protocol: internalversion.ProtocolTCP,
},
)
}

component = internalversion.Component{
Expand Down
61 changes: 32 additions & 29 deletions pkg/kwokctl/components/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,6 @@ type BuildEtcdComponentConfig struct {

// BuildEtcdComponent builds an etcd component.
func BuildEtcdComponent(conf BuildEtcdComponentConfig) (component internalversion.Component, err error) {
exposePeerPort := true
if conf.PeerPort == 0 {
conf.PeerPort = 2380
exposePeerPort = false
}
exposePort := true
if conf.Port == 0 {
conf.Port = 2379
exposePort = false
}

var volumes []internalversion.Volume
var ports []internalversion.Port

Expand All @@ -78,24 +67,21 @@ func BuildEtcdComponent(conf BuildEtcdComponentConfig) (component internalversio
"--data-dir=/etcd-data",
)

if exposePeerPort {
ports = append(
ports,
internalversion.Port{
HostPort: conf.PeerPort,
Port: 2380,
},
)
}
if exposePort {
ports = append(
ports,
internalversion.Port{
HostPort: conf.Port,
Port: 2379,
},
)
}
ports = append(
ports,
internalversion.Port{
Name: "peer-http",
HostPort: conf.PeerPort,
Port: 2380,
Protocol: internalversion.ProtocolTCP,
},
internalversion.Port{
Name: "http",
HostPort: conf.Port,
Port: 2379,
Protocol: internalversion.ProtocolTCP,
},
)
etcdArgs = append(etcdArgs,
"--initial-advertise-peer-urls=http://"+conf.BindAddress+":2380",
"--listen-peer-urls=http://"+conf.BindAddress+":2380",
Expand All @@ -112,6 +98,23 @@ func BuildEtcdComponent(conf BuildEtcdComponentConfig) (component internalversio
} else {
etcdPeerPortStr := format.String(conf.PeerPort)
etcdClientPortStr := format.String(conf.Port)

ports = append(
ports,
internalversion.Port{
Name: "peer-http",
HostPort: 0,
Port: conf.PeerPort,
Protocol: internalversion.ProtocolTCP,
},
internalversion.Port{
Name: "http",
HostPort: 0,
Port: conf.Port,
Protocol: internalversion.ProtocolTCP,
},
)

etcdArgs = append(etcdArgs,
"--data-dir="+conf.DataPath,
"--initial-advertise-peer-urls=http://"+conf.BindAddress+":"+etcdPeerPortStr,
Expand Down
Loading

0 comments on commit f80b5ba

Please sign in to comment.