From 7ba3196e4ee6a61705a8a6df89709f194f5bf0e0 Mon Sep 17 00:00:00 2001 From: Michael Smith Date: Thu, 2 Jan 2020 14:54:25 -0800 Subject: [PATCH] Extract invocation setup and run into new method Moves script `InvokeAndWait` behavior to a function on the invocation so we can setup more complicated invocations before running them. Keeps the script `InvokeAndWait` as a helper for the most common case. Signed-off-by: Michael Smith --- plugin/externalPluginEntry.go | 58 ++++++++++-------------------- plugin/externalPluginEntry_test.go | 8 ++--- plugin/externalPluginScript.go | 49 ++++++++++++++----------- 3 files changed, 51 insertions(+), 64 deletions(-) diff --git a/plugin/externalPluginEntry.go b/plugin/externalPluginEntry.go index bbb466dc8..77624d6b0 100644 --- a/plugin/externalPluginEntry.go +++ b/plugin/externalPluginEntry.go @@ -392,27 +392,9 @@ func (e *externalPluginEntry) blockRead(ctx context.Context, size int64, offset } func (e *externalPluginEntry) Write(ctx context.Context, p []byte) error { - // Start the command. inv := e.script.NewInvocation(ctx, "write", e) - inv.command.SetStdout(&inv.stdout) - inv.command.SetStderr(&inv.stderr) - inv.command.SetStdin(bytes.NewReader(p)) - - activity.Record(ctx, "Invoking %v", inv.command) - err := inv.command.Run() - exitCode := inv.command.ExitCode() - if exitCode < 0 { - return newInvokeError(err.Error(), inv) - } - - activity.Record(ctx, "stdout: %v", inv.stdout.String()) - if inv.stderr.Len() != 0 { - activity.Record(ctx, "stderr: %v", inv.stderr.String()) - } - if exitCode != 0 { - return newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), inv) - } - return nil + inv.SetStdin(bytes.NewReader(p)) + return inv.InvokeAndWait(ctx) } func (e *externalPluginEntry) Metadata(ctx context.Context) (JSONObject, error) { @@ -463,17 +445,16 @@ func (e *externalPluginEntry) Delete(ctx context.Context) (deleted bool, err err func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) { inv := e.script.NewInvocation(ctx, "stream", e) - cmd := inv.command - stdoutR, err := cmd.StdoutPipe() + stdoutR, err := inv.StdoutPipe() if err != nil { return nil, err } - stderrR, err := cmd.StderrPipe() + stderrR, err := inv.StderrPipe() if err != nil { return nil, err } - activity.Record(ctx, "Starting %v", cmd) - if err := cmd.Start(); err != nil { + activity.Record(ctx, "Starting %v", inv) + if err := inv.Start(); err != nil { return nil, newInvokeError(err.Error(), inv) } // "wait" will be used in Stream's error handlers. It will be wrapped @@ -481,8 +462,8 @@ func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) // all of stdout/stderr. These are the preconditions specified in // exec.Cmd#Wait's docs. wait := func() { - if err := cmd.Wait(); err != nil { - activity.Record(ctx, "Failed waiting for %v to finish: %v", cmd, err) + if err := inv.Wait(); err != nil { + activity.Record(ctx, "Failed waiting for %v to finish: %v", inv, err) } } @@ -509,7 +490,7 @@ func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) select { case err := <-headerRdrCh: if err != nil { - cmd.Terminate() + inv.Terminate() defer wait() // Try to get more context from stderr n, readErr := inv.stderr.ReadFrom(stderrR) @@ -523,9 +504,9 @@ func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) go func() { _, _ = io.Copy(ioutil.Discard, stderrR) }() - return &stdoutStreamer{cmd, stdoutR}, nil + return &stdoutStreamer{inv, stdoutR}, nil case <-timer: - cmd.Terminate() + inv.Terminate() defer wait() // We timed out while waiting for the streaming header to appear. // Return an appropriate error message using whatever was printed @@ -565,19 +546,18 @@ func (e *externalPluginEntry) Exec(ctx context.Context, cmd string, args []strin // Start the command. inv := e.script.NewInvocation(ctx, "exec", e, append([]string{string(optsJSON), cmd}, args...)...) - cmdObj := inv.command execCmd := NewExecCommand(ctx) - cmdObj.SetStdout(execCmd.Stdout()) - cmdObj.SetStderr(execCmd.Stderr()) + inv.SetStdout(execCmd.Stdout()) + inv.SetStderr(execCmd.Stderr()) if opts.Stdin != nil { - cmdObj.SetStdin(opts.Stdin) + inv.SetStdin(opts.Stdin) } else { // Go's exec.Cmd reads from the null device if no stdin is provided. We instead provide // an empty string for input so plugins can test whether there is content to read. - cmdObj.SetStdin(strings.NewReader("")) + inv.SetStdin(strings.NewReader("")) } - activity.Record(ctx, "Starting %v", cmdObj) - if err := cmdObj.Start(); err != nil { + activity.Record(ctx, "Starting %v", inv) + if err := inv.Start(); err != nil { return nil, err } // internal.Command handles context-cancellation cleanup @@ -585,9 +565,9 @@ func (e *externalPluginEntry) Exec(ctx context.Context, cmd string, args []strin // Asynchronously wait for the command to finish go func() { - err := cmdObj.Wait() + err := inv.Wait() execCmd.CloseStreamsWithError(nil) - exitCode := cmdObj.ExitCode() + exitCode := inv.ExitCode() if exitCode < 0 { execCmd.SetExitCodeErr(err) } else { diff --git a/plugin/externalPluginEntry_test.go b/plugin/externalPluginEntry_test.go index f12ef9c14..0d190710b 100644 --- a/plugin/externalPluginEntry_test.go +++ b/plugin/externalPluginEntry_test.go @@ -325,7 +325,7 @@ func (suite *ExternalPluginEntryTestSuite) TestSetCacheTTLs() { } func mockInvocation(stdout []byte) invocation { - return invocation{command: internal.NewCommand(context.Background(), ""), stdout: *bytes.NewBuffer(stdout)} + return invocation{Command: internal.NewCommand(context.Background(), ""), stdout: *bytes.NewBuffer(stdout)} } // TODO: Add tests for Schema, including when schemaGraph is provided (prefetched) @@ -695,7 +695,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite() { ctx := context.Background() mockCmd := &mockCommand{} - inv := invocation{command: mockCmd} + inv := invocation{Command: mockCmd} mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once() mockCmd.On("SetStdout", &inv.stdout).Once() mockCmd.On("SetStderr", &inv.stderr).Once() @@ -721,7 +721,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite_Error() { ctx := context.Background() mockCmd := &mockCommand{} - inv := invocation{command: mockCmd} + inv := invocation{Command: mockCmd} mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once() mockCmd.On("SetStdout", &inv.stdout).Once() mockCmd.On("SetStderr", &inv.stderr).Once() @@ -748,7 +748,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite_Exited() { ctx := context.Background() mockCmd := &mockCommand{} - inv := invocation{command: mockCmd} + inv := invocation{Command: mockCmd} mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once() mockCmd.On("SetStdout", &inv.stdout).Once() mockCmd.On("SetStderr", &inv.stderr).Once() diff --git a/plugin/externalPluginScript.go b/plugin/externalPluginScript.go index 4e90a48fc..0e4302b29 100644 --- a/plugin/externalPluginScript.go +++ b/plugin/externalPluginScript.go @@ -18,15 +18,37 @@ type externalPluginScript interface { NewInvocation(ctx context.Context, method string, entry *externalPluginEntry, args ...string) invocation } +// An internal.Command object that stores output in separate stdout and stderr buffers. type invocation struct { - command internal.Command + internal.Command stdout, stderr bytes.Buffer } +func (inv *invocation) InvokeAndWait(ctx context.Context) error { + inv.SetStdout(&inv.stdout) + inv.SetStderr(&inv.stderr) + + activity.Record(ctx, "Invoking %v", inv) + err := inv.Run() + exitCode := inv.ExitCode() + if exitCode < 0 { + return newInvokeError(err.Error(), *inv) + } + + activity.Record(ctx, "stdout: %v", inv.stdout.String()) + if inv.stderr.Len() != 0 { + activity.Record(ctx, "stderr: %v", inv.stderr.String()) + } + if exitCode != 0 { + return newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), *inv) + } + return nil +} + func newInvokeError(msg string, inv invocation) error { var builder strings.Builder builder.WriteString(msg) - fmt.Fprintf(&builder, "\nCOMMAND: %s", inv.command) + fmt.Fprintf(&builder, "\nCOMMAND: %s", inv.Command) if inv.stdout.Len() > 0 { fmt.Fprintf(&builder, "\nSTDOUT:\n%s", strings.Trim(inv.stdout.String(), "\n")) } @@ -53,23 +75,8 @@ func (s externalPluginScriptImpl) InvokeAndWait( args ...string, ) (invocation, error) { inv := s.NewInvocation(ctx, method, entry, args...) - inv.command.SetStdout(&inv.stdout) - inv.command.SetStderr(&inv.stderr) - activity.Record(ctx, "Invoking %v", inv.command) - err := inv.command.Run() - exitCode := inv.command.ExitCode() - if exitCode < 0 { - return inv, newInvokeError(err.Error(), inv) - } - - activity.Record(ctx, "stdout: %v", inv.stdout.String()) - if inv.stderr.Len() != 0 { - activity.Record(ctx, "stderr: %v", inv.stderr.String()) - } - if exitCode != 0 { - return inv, newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), inv) - } - return inv, nil + err := inv.InvokeAndWait(ctx) + return inv, err } func (s externalPluginScriptImpl) NewInvocation( @@ -79,13 +86,13 @@ func (s externalPluginScriptImpl) NewInvocation( args ...string, ) invocation { if method == "init" { - return invocation{command: internal.NewCommand(ctx, s.Path(), append([]string{"init"}, args...)...)} + return invocation{Command: internal.NewCommand(ctx, s.Path(), append([]string{"init"}, args...)...)} } if entry == nil { msg := fmt.Sprintf("s.NewInvocation called with method '%v' and entry == nil", method) panic(msg) } - return invocation{command: internal.NewCommand( + return invocation{Command: internal.NewCommand( ctx, s.Path(), append([]string{method, entry.id(), entry.state}, args...)...,