Skip to content

Commit

Permalink
adding observe commands to ae (#59)
Browse files Browse the repository at this point in the history
* adding observe commands to ae

* test fixes

* add certificate request for user (#53)

* set auth config from flags (#62)

---------

Co-authored-by: Christoph Voigt <[email protected]>
  • Loading branch information
dmah42 and voigt authored Mar 14, 2023
1 parent 7f08923 commit 1f1f075
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 3 deletions.
192 changes: 192 additions & 0 deletions cmd/observe/observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/* -------------------------------------------------------------------------- *\
* Apache 2.0 License Copyright © 2022 The Aurae Authors *
* *
* +--------------------------------------------+ *
* | █████╗ ██╗ ██╗██████╗ █████╗ ███████╗ | *
* | ██╔══██╗██║ ██║██╔══██╗██╔══██╗██╔════╝ | *
* | ███████║██║ ██║██████╔╝███████║█████╗ | *
* | ██╔══██║██║ ██║██╔══██╗██╔══██║██╔══╝ | *
* | ██║ ██║╚██████╔╝██║ ██║██║ ██║███████╗ | *
* | ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ | *
* +--------------------------------------------+ *
* *
* Distributed Systems Runtime *
* *
* -------------------------------------------------------------------------- *
* *
* Licensed under the Apache License, Version 2.0 (the "License"); *
* you may not use this file except in compliance with the License. *
* You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
* See the License for the specific language governing permissions and *
* limitations under the License. *
* *
\* -------------------------------------------------------------------------- */

package observe

import (
"context"
"errors"
"fmt"
"io"
"log"
"net"

"github.com/aurae-runtime/ae/pkg/cli"
"github.com/aurae-runtime/ae/pkg/cli/printer"
"github.com/aurae-runtime/ae/pkg/client"
"github.com/aurae-runtime/ae/pkg/config"
"github.com/spf13/cobra"

aeCMD "github.com/aurae-runtime/ae/cmd"
observev0 "github.com/aurae-runtime/ae/pkg/api/v0/observe"
)

type outputObserve struct{}

type option struct {
aeCMD.Option
ctx context.Context
ip string
logtype string
port uint16
protocol string
verbose bool
writer io.Writer
output *outputObserve
outputFormat *cli.OutputFormat
}

func (o *option) Complete(args []string) error {
log.Println(args)
if len(args) != 2 {
return errors.New("expected ip address and log type to be passed to this command")
}
o.ip = args[0]
o.logtype = args[1]
return nil
}

func (o *option) Validate() error {
if err := o.outputFormat.Validate(); err != nil {
return err
}
if len(o.ip) == 0 {
return errors.New("ip address must be passed to this command")
}
if ip := net.ParseIP(o.ip); ip == nil {
return fmt.Errorf("failed to parse IP %q", o.ip)
}
if o.logtype != "daemon" && o.logtype != "subprocesses" {
return errors.New("either 'daemon' or 'subprocesses' must be passed to the command")
}
return nil
}

func (o *option) Execute(ctx context.Context) error {
o.ctx = ctx
o.output = &outputObserve{}

o.protocol = "tcp4"
if net.ParseIP(o.ip).To4() == nil {
o.protocol = "tcp6"
}
o.observeHost(o.ip)
return o.outputFormat.ToPrinter().Print(o.writer, o.output)
}

func (o *option) SetWriter(writer io.Writer) {
o.writer = writer
}

func (o option) observeHost(ip_str string) bool {
if o.verbose {
log.Printf("connecting to %s:%d using protocol %s\n", ip_str, o.port, o.protocol)
}

c, err := client.New(o.ctx, config.WithSystem(config.System{Protocol: o.protocol, Socket: net.JoinHostPort(ip_str, fmt.Sprintf("%d", o.port))}))
if err != nil {
log.Fatalf("failed to create client: %s", err)
return false
}

obs, err := c.Observe()
if err != nil {
log.Fatalf("failed to dial Observe service: %s", err)
return false
}

// TODO: handle output format
switch o.logtype {
case "daemon":
// TODO: request parameters
req := observev0.GetAuraeDaemonLogStreamRequest{} // TODO
stream, err := obs.GetAuraeDaemonLogStream(o.ctx, &req)
if err != nil {
log.Fatalf("%s", err)
return false
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%s", err)
return false
}
if _, err := o.writer.Write([]byte(resp.Item.Line)); err != nil {
log.Fatalf("%s", err)
return false
}
}
case "subprocesses":
// TODO: request parameters
req := observev0.GetSubProcessStreamRequest{} // TODO
stream, err := obs.GetSubProcessStream(o.ctx, &req)
if err != nil {
log.Fatalf("%s", err)
return false
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%s", err)
return false
}
if _, err := o.writer.Write([]byte(resp.Item.Line)); err != nil {
log.Fatalf("%s", err)
return false
}
}
}

return true
}

func NewCMD(ctx context.Context) *cobra.Command {
o := &option{
outputFormat: cli.NewOutputFormat().WithDefaultFormat(printer.NewJSON().Format()).WithPrinter(printer.NewJSON()),
}
cmd := &cobra.Command{
Use: "observe <ip> <daemon|subprocesses>",
Short: "get a stream of logs either from the aurae daemon or spawned subprocesses running on the given IP address",
Args: cobra.MinimumNArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
return aeCMD.Run(ctx, o, cmd, args)
},
}
o.outputFormat.AddFlags(cmd)
cmd.Flags().Uint16Var(&o.port, "port", o.port, "The port to use when connecting")
cmd.Flags().BoolVar(&o.verbose, "verbose", o.verbose, "Lots of output")
return cmd
}
103 changes: 103 additions & 0 deletions cmd/observe/observe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package observe

import (
"testing"

"github.com/aurae-runtime/ae/pkg/cli"
"github.com/aurae-runtime/ae/pkg/cli/printer"
)

func TestComplete(t *testing.T) {
ts := []struct {
args []string
wantip string
wantlogtype string
wanterr bool
}{
{
[]string{""},
"", "", true,
},
{
[]string{"foo"},
"", "", true,
},
{
[]string{"foo", "bar"},
"foo", "bar", false,
},
{
[]string{"foo", "bar", "baz"},
"", "", true,
},
}

for _, tt := range ts {
o := &option{}
goterr := o.Complete(tt.args)
if tt.wanterr && goterr == nil {
t.Fatal("want error, got no error")
}
if !tt.wanterr && goterr != nil {
t.Fatal("want no error, got error")
}
if tt.wantip != o.ip {
t.Fatalf("want ip %q, got ip %q", tt.wantip, o.ip)
}
if tt.wantlogtype != o.logtype {
t.Fatalf("want logtype %q, got logtype %q", tt.wantlogtype, o.logtype)
}
}
}

func TestValidate(t *testing.T) {
ts := []struct {
name string
outputFormat *cli.OutputFormat
ip string
logtype string
wanterr bool
}{
{
name: "no output format",
outputFormat: cli.NewOutputFormat(),
wanterr: true,
},
{
name: "no ip or logtype",
outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()),
wanterr: true,
},
{
name: "invalid ip",
outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()),
ip: "invalid ip",
wanterr: true,
},
{
name: "invalid logtype",
outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()),
ip: "10.0.0.0",
logtype: "invalid",
wanterr: true,
},
{
name: "valid ip and logtype",
outputFormat: cli.NewOutputFormat().WithDefaultFormat("json").WithPrinter(printer.NewJSON()),
ip: "10.0.0.0",
logtype: "daemon",
wanterr: false,
},
}

for _, tt := range ts {
o := &option{ip: tt.ip, logtype: tt.logtype, outputFormat: tt.outputFormat}
goterr := o.Validate()
if tt.wanterr && goterr == nil {
t.Fatalf("[%s] want error, got no error", tt.name)
}
if !tt.wanterr && goterr != nil {
t.Fatalf("[%s] want no error, got error: %s", tt.name, goterr)
}
}
}
8 changes: 5 additions & 3 deletions cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/aurae-runtime/ae/cmd/discovery"
"github.com/aurae-runtime/ae/cmd/health"
"github.com/aurae-runtime/ae/cmd/observe"
"github.com/aurae-runtime/ae/cmd/oci"
"github.com/aurae-runtime/ae/cmd/pki"
"github.com/aurae-runtime/ae/cmd/version"
Expand All @@ -60,9 +61,10 @@ func Execute() {
func init() {
// add subcommands
ctx := context.Background()
rootCmd.AddCommand(oci.NewCMD(ctx))
rootCmd.AddCommand(version.NewCMD(ctx))
rootCmd.AddCommand(pki.NewCMD(ctx))
rootCmd.AddCommand(discovery.NewCMD(ctx))
rootCmd.AddCommand(health.NewCMD(ctx))
rootCmd.AddCommand(observe.NewCMD(ctx))
rootCmd.AddCommand(oci.NewCMD(ctx))
rootCmd.AddCommand(pki.NewCMD(ctx))
rootCmd.AddCommand(version.NewCMD(ctx))
}
11 changes: 11 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ import (
"github.com/aurae-runtime/ae/pkg/config"
"github.com/aurae-runtime/ae/pkg/discovery"
"github.com/aurae-runtime/ae/pkg/health"
"github.com/aurae-runtime/ae/pkg/observe"
)

type Client interface {
Discovery() (discovery.Discovery, error)
Health() (health.Health, error)
Observe() (observe.Observe, error)
}

type client struct {
cfg *config.Configs
conn grpc.ClientConnInterface
discovery discovery.Discovery
health health.Health
observe observe.Observe
}

func New(ctx context.Context, cfg ...config.Config) (Client, error) {
Expand Down Expand Up @@ -54,6 +57,7 @@ func New(ctx context.Context, cfg ...config.Config) (Client, error) {
conn: conn,
discovery: discovery.New(ctx, conn),
health: health.New(ctx, conn),
observe: observe.New(ctx, conn),
}, nil
}

Expand Down Expand Up @@ -95,3 +99,10 @@ func (c *client) Health() (health.Health, error) {
}
return c.health, nil
}

func (c *client) Observe() (observe.Observe, error) {
if c.observe == nil {
return nil, fmt.Errorf("Observe service is not available")
}
return c.observe, nil
}
32 changes: 32 additions & 0 deletions pkg/observe/observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package observe

import (
"context"

"google.golang.org/grpc"

observev0 "github.com/aurae-runtime/ae/pkg/api/v0/observe"
)

type Observe interface {
GetAuraeDaemonLogStream(context.Context, *observev0.GetAuraeDaemonLogStreamRequest) (observev0.ObserveService_GetAuraeDaemonLogStreamClient, error)
GetSubProcessStream(context.Context, *observev0.GetSubProcessStreamRequest) (observev0.ObserveService_GetSubProcessStreamClient, error)
}

type observe struct {
client observev0.ObserveServiceClient
}

func New(ctx context.Context, conn grpc.ClientConnInterface) Observe {
return &observe{
client: observev0.NewObserveServiceClient(conn),
}
}

func (o *observe) GetAuraeDaemonLogStream(ctx context.Context, req *observev0.GetAuraeDaemonLogStreamRequest) (observev0.ObserveService_GetAuraeDaemonLogStreamClient, error) {
return o.client.GetAuraeDaemonLogStream(ctx, req)
}

func (o *observe) GetSubProcessStream(ctx context.Context, req *observev0.GetSubProcessStreamRequest) (observev0.ObserveService_GetSubProcessStreamClient, error) {
return o.client.GetSubProcessStream(ctx, req)
}

0 comments on commit 1f1f075

Please sign in to comment.