From 1f1f0750effc142e2b3fbde437ca3941a0582446 Mon Sep 17 00:00:00 2001 From: dominic <510002+dmah42@users.noreply.github.com> Date: Tue, 14 Mar 2023 14:54:05 +0000 Subject: [PATCH] adding observe commands to ae (#59) * adding observe commands to ae * test fixes * add certificate request for user (#53) * set auth config from flags (#62) --------- Co-authored-by: Christoph Voigt --- cmd/observe/observe.go | 192 ++++++++++++++++++++++++++++++++++++ cmd/observe/observe_test.go | 103 +++++++++++++++++++ cmd/root/root.go | 8 +- pkg/client/client.go | 11 +++ pkg/observe/observe.go | 32 ++++++ 5 files changed, 343 insertions(+), 3 deletions(-) create mode 100644 cmd/observe/observe.go create mode 100644 cmd/observe/observe_test.go create mode 100644 pkg/observe/observe.go diff --git a/cmd/observe/observe.go b/cmd/observe/observe.go new file mode 100644 index 0000000..cc5c40b --- /dev/null +++ b/cmd/observe/observe.go @@ -0,0 +1,192 @@ +/* -------------------------------------------------------------------------- *\ + * Apache 2.0 License Copyright © 2022 The Aurae Authors * + * * + * +--------------------------------------------+ * + * | █████╗ ██╗ ██╗██████╗ █████╗ ███████╗ | * + * | ██╔══██╗██║ ██║██╔══██╗██╔══██╗██╔════╝ | * + * | ███████║██║ ██║██████╔╝███████║█████╗ | * + * | ██╔══██║██║ ██║██╔══██╗██╔══██║██╔══╝ | * + * | ██║ ██║╚██████╔╝██║ ██║██║ ██║███████╗ | * + * | ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ | * + * +--------------------------------------------+ * + * * + * Distributed Systems Runtime * + * * + * -------------------------------------------------------------------------- * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + * * +\* -------------------------------------------------------------------------- */ + +package observe + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net" + + "github.com/aurae-runtime/ae/pkg/cli" + "github.com/aurae-runtime/ae/pkg/cli/printer" + "github.com/aurae-runtime/ae/pkg/client" + "github.com/aurae-runtime/ae/pkg/config" + "github.com/spf13/cobra" + + aeCMD "github.com/aurae-runtime/ae/cmd" + observev0 "github.com/aurae-runtime/ae/pkg/api/v0/observe" +) + +type outputObserve struct{} + +type option struct { + aeCMD.Option + ctx context.Context + ip string + logtype string + port uint16 + protocol string + verbose bool + writer io.Writer + output *outputObserve + outputFormat *cli.OutputFormat +} + +func (o *option) Complete(args []string) error { + log.Println(args) + if len(args) != 2 { + return errors.New("expected ip address and log type to be passed to this command") + } + o.ip = args[0] + o.logtype = args[1] + return nil +} + +func (o *option) Validate() error { + if err := o.outputFormat.Validate(); err != nil { + return err + } + if len(o.ip) == 0 { + return errors.New("ip address must be passed to this command") + } + if ip := net.ParseIP(o.ip); ip == nil { + return fmt.Errorf("failed to parse IP %q", o.ip) + } + if o.logtype != "daemon" && o.logtype != "subprocesses" { + return errors.New("either 'daemon' or 'subprocesses' must be passed to the command") + } + return nil +} + +func (o *option) Execute(ctx context.Context) error { + o.ctx = ctx + o.output = &outputObserve{} + + o.protocol = "tcp4" + if net.ParseIP(o.ip).To4() == nil { + o.protocol = "tcp6" + } + o.observeHost(o.ip) + return o.outputFormat.ToPrinter().Print(o.writer, o.output) +} + +func (o *option) SetWriter(writer io.Writer) { + o.writer = writer +} + +func (o option) observeHost(ip_str string) bool { + if o.verbose { + log.Printf("connecting to %s:%d using protocol %s\n", ip_str, o.port, o.protocol) + } + + c, err := client.New(o.ctx, config.WithSystem(config.System{Protocol: o.protocol, Socket: net.JoinHostPort(ip_str, fmt.Sprintf("%d", o.port))})) + if err != nil { + log.Fatalf("failed to create client: %s", err) + return false + } + + obs, err := c.Observe() + if err != nil { + log.Fatalf("failed to dial Observe service: %s", err) + return false + } + + // TODO: handle output format + switch o.logtype { + case "daemon": + // TODO: request parameters + req := observev0.GetAuraeDaemonLogStreamRequest{} // TODO + stream, err := obs.GetAuraeDaemonLogStream(o.ctx, &req) + if err != nil { + log.Fatalf("%s", err) + return false + } + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("%s", err) + return false + } + if _, err := o.writer.Write([]byte(resp.Item.Line)); err != nil { + log.Fatalf("%s", err) + return false + } + } + case "subprocesses": + // TODO: request parameters + req := observev0.GetSubProcessStreamRequest{} // TODO + stream, err := obs.GetSubProcessStream(o.ctx, &req) + if err != nil { + log.Fatalf("%s", err) + return false + } + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("%s", err) + return false + } + if _, err := o.writer.Write([]byte(resp.Item.Line)); err != nil { + log.Fatalf("%s", err) + return false + } + } + } + + return true +} + +func NewCMD(ctx context.Context) *cobra.Command { + o := &option{ + outputFormat: cli.NewOutputFormat().WithDefaultFormat(printer.NewJSON().Format()).WithPrinter(printer.NewJSON()), + } + cmd := &cobra.Command{ + Use: "observe ", + Short: "get a stream of logs either from the aurae daemon or spawned subprocesses running on the given IP address", + Args: cobra.MinimumNArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + return aeCMD.Run(ctx, o, cmd, args) + }, + } + o.outputFormat.AddFlags(cmd) + cmd.Flags().Uint16Var(&o.port, "port", o.port, "The port to use when connecting") + cmd.Flags().BoolVar(&o.verbose, "verbose", o.verbose, "Lots of output") + return cmd +} diff --git a/cmd/observe/observe_test.go b/cmd/observe/observe_test.go new file mode 100644 index 0000000..eb9640a --- /dev/null +++ b/cmd/observe/observe_test.go @@ -0,0 +1,103 @@ +package observe + +import ( + "testing" + + "github.com/aurae-runtime/ae/pkg/cli" + "github.com/aurae-runtime/ae/pkg/cli/printer" +) + +func TestComplete(t *testing.T) { + ts := []struct { + args []string + wantip string + wantlogtype string + wanterr bool + }{ + { + []string{""}, + "", "", true, + }, + { + []string{"foo"}, + "", "", true, + }, + { + []string{"foo", "bar"}, + "foo", "bar", false, + }, + { + []string{"foo", "bar", "baz"}, + "", "", true, + }, + } + + for _, tt := range ts { + o := &option{} + goterr := o.Complete(tt.args) + if tt.wanterr && goterr == nil { + t.Fatal("want error, got no error") + } + if !tt.wanterr && goterr != nil { + t.Fatal("want no error, got error") + } + if tt.wantip != o.ip { + t.Fatalf("want ip %q, got ip %q", tt.wantip, o.ip) + } + if tt.wantlogtype != o.logtype { + t.Fatalf("want logtype %q, got logtype %q", tt.wantlogtype, o.logtype) + } + } +} + +func TestValidate(t *testing.T) { + ts := []struct { + name string + outputFormat *cli.OutputFormat + ip string + logtype string + wanterr bool + }{ + { + name: "no output format", + outputFormat: cli.NewOutputFormat(), + wanterr: true, + }, + { + name: "no ip or logtype", + outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()), + wanterr: true, + }, + { + name: "invalid ip", + outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()), + ip: "invalid ip", + wanterr: true, + }, + { + name: "invalid logtype", + outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()), + ip: "10.0.0.0", + logtype: "invalid", + wanterr: true, + }, + { + name: "valid ip and logtype", + outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()), + ip: "10.0.0.0", + logtype: "daemon", + wanterr: false, + }, + } + + for _, tt := range ts { + o := &option{ip: tt.ip, logtype: tt.logtype, outputFormat: tt.outputFormat} + goterr := o.Validate() + if tt.wanterr && goterr == nil { + t.Fatalf("[%s] want error, got no error", tt.name) + } + if !tt.wanterr && goterr != nil { + t.Fatalf("[%s] want no error, got error: %s", tt.name, goterr) + } + } +} diff --git a/cmd/root/root.go b/cmd/root/root.go index c4b3d0c..fe5f4c1 100644 --- a/cmd/root/root.go +++ b/cmd/root/root.go @@ -36,6 +36,7 @@ import ( "github.com/aurae-runtime/ae/cmd/discovery" "github.com/aurae-runtime/ae/cmd/health" + "github.com/aurae-runtime/ae/cmd/observe" "github.com/aurae-runtime/ae/cmd/oci" "github.com/aurae-runtime/ae/cmd/pki" "github.com/aurae-runtime/ae/cmd/version" @@ -60,9 +61,10 @@ func Execute() { func init() { // add subcommands ctx := context.Background() - rootCmd.AddCommand(oci.NewCMD(ctx)) - rootCmd.AddCommand(version.NewCMD(ctx)) - rootCmd.AddCommand(pki.NewCMD(ctx)) rootCmd.AddCommand(discovery.NewCMD(ctx)) rootCmd.AddCommand(health.NewCMD(ctx)) + rootCmd.AddCommand(observe.NewCMD(ctx)) + rootCmd.AddCommand(oci.NewCMD(ctx)) + rootCmd.AddCommand(pki.NewCMD(ctx)) + rootCmd.AddCommand(version.NewCMD(ctx)) } diff --git a/pkg/client/client.go b/pkg/client/client.go index 3409e73..ee34ddd 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -14,11 +14,13 @@ import ( "github.com/aurae-runtime/ae/pkg/config" "github.com/aurae-runtime/ae/pkg/discovery" "github.com/aurae-runtime/ae/pkg/health" + "github.com/aurae-runtime/ae/pkg/observe" ) type Client interface { Discovery() (discovery.Discovery, error) Health() (health.Health, error) + Observe() (observe.Observe, error) } type client struct { @@ -26,6 +28,7 @@ type client struct { conn grpc.ClientConnInterface discovery discovery.Discovery health health.Health + observe observe.Observe } func New(ctx context.Context, cfg ...config.Config) (Client, error) { @@ -54,6 +57,7 @@ func New(ctx context.Context, cfg ...config.Config) (Client, error) { conn: conn, discovery: discovery.New(ctx, conn), health: health.New(ctx, conn), + observe: observe.New(ctx, conn), }, nil } @@ -95,3 +99,10 @@ func (c *client) Health() (health.Health, error) { } return c.health, nil } + +func (c *client) Observe() (observe.Observe, error) { + if c.observe == nil { + return nil, fmt.Errorf("Observe service is not available") + } + return c.observe, nil +} diff --git a/pkg/observe/observe.go b/pkg/observe/observe.go new file mode 100644 index 0000000..25eb3c9 --- /dev/null +++ b/pkg/observe/observe.go @@ -0,0 +1,32 @@ +package observe + +import ( + "context" + + "google.golang.org/grpc" + + observev0 "github.com/aurae-runtime/ae/pkg/api/v0/observe" +) + +type Observe interface { + GetAuraeDaemonLogStream(context.Context, *observev0.GetAuraeDaemonLogStreamRequest) (observev0.ObserveService_GetAuraeDaemonLogStreamClient, error) + GetSubProcessStream(context.Context, *observev0.GetSubProcessStreamRequest) (observev0.ObserveService_GetSubProcessStreamClient, error) +} + +type observe struct { + client observev0.ObserveServiceClient +} + +func New(ctx context.Context, conn grpc.ClientConnInterface) Observe { + return &observe{ + client: observev0.NewObserveServiceClient(conn), + } +} + +func (o *observe) GetAuraeDaemonLogStream(ctx context.Context, req *observev0.GetAuraeDaemonLogStreamRequest) (observev0.ObserveService_GetAuraeDaemonLogStreamClient, error) { + return o.client.GetAuraeDaemonLogStream(ctx, req) +} + +func (o *observe) GetSubProcessStream(ctx context.Context, req *observev0.GetSubProcessStreamRequest) (observev0.ObserveService_GetSubProcessStreamClient, error) { + return o.client.GetSubProcessStream(ctx, req) +}