Skip to content
This repository has been archived by the owner on Jun 2, 2022. It is now read-only.

Commit

Permalink
Extract invocation setup and run into new method
Browse files Browse the repository at this point in the history
Moves script `InvokeAndWait` behavior to a function on the invocation so
we can setup more complicated invocations before running them. Keeps the
script `InvokeAndWait` as a helper for the most common case.

Signed-off-by: Michael Smith <[email protected]>
  • Loading branch information
MikaelSmith committed Jan 2, 2020
1 parent 08bf6f6 commit 7ba3196
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 64 deletions.
58 changes: 19 additions & 39 deletions plugin/externalPluginEntry.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,27 +392,9 @@ func (e *externalPluginEntry) blockRead(ctx context.Context, size int64, offset
}

func (e *externalPluginEntry) Write(ctx context.Context, p []byte) error {
// Start the command.
inv := e.script.NewInvocation(ctx, "write", e)
inv.command.SetStdout(&inv.stdout)
inv.command.SetStderr(&inv.stderr)
inv.command.SetStdin(bytes.NewReader(p))

activity.Record(ctx, "Invoking %v", inv.command)
err := inv.command.Run()
exitCode := inv.command.ExitCode()
if exitCode < 0 {
return newInvokeError(err.Error(), inv)
}

activity.Record(ctx, "stdout: %v", inv.stdout.String())
if inv.stderr.Len() != 0 {
activity.Record(ctx, "stderr: %v", inv.stderr.String())
}
if exitCode != 0 {
return newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), inv)
}
return nil
inv.SetStdin(bytes.NewReader(p))
return inv.InvokeAndWait(ctx)
}

func (e *externalPluginEntry) Metadata(ctx context.Context) (JSONObject, error) {
Expand Down Expand Up @@ -463,26 +445,25 @@ func (e *externalPluginEntry) Delete(ctx context.Context) (deleted bool, err err

func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error) {
inv := e.script.NewInvocation(ctx, "stream", e)
cmd := inv.command
stdoutR, err := cmd.StdoutPipe()
stdoutR, err := inv.StdoutPipe()
if err != nil {
return nil, err
}
stderrR, err := cmd.StderrPipe()
stderrR, err := inv.StderrPipe()
if err != nil {
return nil, err
}
activity.Record(ctx, "Starting %v", cmd)
if err := cmd.Start(); err != nil {
activity.Record(ctx, "Starting %v", inv)
if err := inv.Start(); err != nil {
return nil, newInvokeError(err.Error(), inv)
}
// "wait" will be used in Stream's error handlers. It will be wrapped
// in a "defer" call to ensure that cmd.Wait()'s called once we've read
// all of stdout/stderr. These are the preconditions specified in
// exec.Cmd#Wait's docs.
wait := func() {
if err := cmd.Wait(); err != nil {
activity.Record(ctx, "Failed waiting for %v to finish: %v", cmd, err)
if err := inv.Wait(); err != nil {
activity.Record(ctx, "Failed waiting for %v to finish: %v", inv, err)
}
}

Expand All @@ -509,7 +490,7 @@ func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error)
select {
case err := <-headerRdrCh:
if err != nil {
cmd.Terminate()
inv.Terminate()
defer wait()
// Try to get more context from stderr
n, readErr := inv.stderr.ReadFrom(stderrR)
Expand All @@ -523,9 +504,9 @@ func (e *externalPluginEntry) Stream(ctx context.Context) (io.ReadCloser, error)
go func() {
_, _ = io.Copy(ioutil.Discard, stderrR)
}()
return &stdoutStreamer{cmd, stdoutR}, nil
return &stdoutStreamer{inv, stdoutR}, nil
case <-timer:
cmd.Terminate()
inv.Terminate()
defer wait()
// We timed out while waiting for the streaming header to appear.
// Return an appropriate error message using whatever was printed
Expand Down Expand Up @@ -565,29 +546,28 @@ func (e *externalPluginEntry) Exec(ctx context.Context, cmd string, args []strin

// Start the command.
inv := e.script.NewInvocation(ctx, "exec", e, append([]string{string(optsJSON), cmd}, args...)...)
cmdObj := inv.command
execCmd := NewExecCommand(ctx)
cmdObj.SetStdout(execCmd.Stdout())
cmdObj.SetStderr(execCmd.Stderr())
inv.SetStdout(execCmd.Stdout())
inv.SetStderr(execCmd.Stderr())
if opts.Stdin != nil {
cmdObj.SetStdin(opts.Stdin)
inv.SetStdin(opts.Stdin)
} else {
// Go's exec.Cmd reads from the null device if no stdin is provided. We instead provide
// an empty string for input so plugins can test whether there is content to read.
cmdObj.SetStdin(strings.NewReader(""))
inv.SetStdin(strings.NewReader(""))
}
activity.Record(ctx, "Starting %v", cmdObj)
if err := cmdObj.Start(); err != nil {
activity.Record(ctx, "Starting %v", inv)
if err := inv.Start(); err != nil {
return nil, err
}
// internal.Command handles context-cancellation cleanup
// for us, so we don't have to use execCmd.SetStopFunc.

// Asynchronously wait for the command to finish
go func() {
err := cmdObj.Wait()
err := inv.Wait()
execCmd.CloseStreamsWithError(nil)
exitCode := cmdObj.ExitCode()
exitCode := inv.ExitCode()
if exitCode < 0 {
execCmd.SetExitCodeErr(err)
} else {
Expand Down
8 changes: 4 additions & 4 deletions plugin/externalPluginEntry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (suite *ExternalPluginEntryTestSuite) TestSetCacheTTLs() {
}

func mockInvocation(stdout []byte) invocation {
return invocation{command: internal.NewCommand(context.Background(), ""), stdout: *bytes.NewBuffer(stdout)}
return invocation{Command: internal.NewCommand(context.Background(), ""), stdout: *bytes.NewBuffer(stdout)}
}

// TODO: Add tests for Schema, including when schemaGraph is provided (prefetched)
Expand Down Expand Up @@ -695,7 +695,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite() {

ctx := context.Background()
mockCmd := &mockCommand{}
inv := invocation{command: mockCmd}
inv := invocation{Command: mockCmd}
mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once()
mockCmd.On("SetStdout", &inv.stdout).Once()
mockCmd.On("SetStderr", &inv.stderr).Once()
Expand All @@ -721,7 +721,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite_Error() {

ctx := context.Background()
mockCmd := &mockCommand{}
inv := invocation{command: mockCmd}
inv := invocation{Command: mockCmd}
mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once()
mockCmd.On("SetStdout", &inv.stdout).Once()
mockCmd.On("SetStderr", &inv.stderr).Once()
Expand All @@ -748,7 +748,7 @@ func (suite *ExternalPluginEntryTestSuite) TestWrite_Exited() {

ctx := context.Background()
mockCmd := &mockCommand{}
inv := invocation{command: mockCmd}
inv := invocation{Command: mockCmd}
mockScript.On("NewInvocation", ctx, "write", entry, []string(nil)).Return(inv).Once()
mockCmd.On("SetStdout", &inv.stdout).Once()
mockCmd.On("SetStderr", &inv.stderr).Once()
Expand Down
49 changes: 28 additions & 21 deletions plugin/externalPluginScript.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,37 @@ type externalPluginScript interface {
NewInvocation(ctx context.Context, method string, entry *externalPluginEntry, args ...string) invocation
}

// An internal.Command object that stores output in separate stdout and stderr buffers.
type invocation struct {
command internal.Command
internal.Command
stdout, stderr bytes.Buffer
}

func (inv *invocation) InvokeAndWait(ctx context.Context) error {
inv.SetStdout(&inv.stdout)
inv.SetStderr(&inv.stderr)

activity.Record(ctx, "Invoking %v", inv)
err := inv.Run()
exitCode := inv.ExitCode()
if exitCode < 0 {
return newInvokeError(err.Error(), *inv)
}

activity.Record(ctx, "stdout: %v", inv.stdout.String())
if inv.stderr.Len() != 0 {
activity.Record(ctx, "stderr: %v", inv.stderr.String())
}
if exitCode != 0 {
return newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), *inv)
}
return nil
}

func newInvokeError(msg string, inv invocation) error {
var builder strings.Builder
builder.WriteString(msg)
fmt.Fprintf(&builder, "\nCOMMAND: %s", inv.command)
fmt.Fprintf(&builder, "\nCOMMAND: %s", inv.Command)
if inv.stdout.Len() > 0 {
fmt.Fprintf(&builder, "\nSTDOUT:\n%s", strings.Trim(inv.stdout.String(), "\n"))
}
Expand All @@ -53,23 +75,8 @@ func (s externalPluginScriptImpl) InvokeAndWait(
args ...string,
) (invocation, error) {
inv := s.NewInvocation(ctx, method, entry, args...)
inv.command.SetStdout(&inv.stdout)
inv.command.SetStderr(&inv.stderr)
activity.Record(ctx, "Invoking %v", inv.command)
err := inv.command.Run()
exitCode := inv.command.ExitCode()
if exitCode < 0 {
return inv, newInvokeError(err.Error(), inv)
}

activity.Record(ctx, "stdout: %v", inv.stdout.String())
if inv.stderr.Len() != 0 {
activity.Record(ctx, "stderr: %v", inv.stderr.String())
}
if exitCode != 0 {
return inv, newInvokeError(fmt.Sprintf("script returned a non-zero exit code of %v", exitCode), inv)
}
return inv, nil
err := inv.InvokeAndWait(ctx)
return inv, err
}

func (s externalPluginScriptImpl) NewInvocation(
Expand All @@ -79,13 +86,13 @@ func (s externalPluginScriptImpl) NewInvocation(
args ...string,
) invocation {
if method == "init" {
return invocation{command: internal.NewCommand(ctx, s.Path(), append([]string{"init"}, args...)...)}
return invocation{Command: internal.NewCommand(ctx, s.Path(), append([]string{"init"}, args...)...)}
}
if entry == nil {
msg := fmt.Sprintf("s.NewInvocation called with method '%v' and entry == nil", method)
panic(msg)
}
return invocation{command: internal.NewCommand(
return invocation{Command: internal.NewCommand(
ctx,
s.Path(),
append([]string{method, entry.id(), entry.state}, args...)...,
Expand Down

0 comments on commit 7ba3196

Please sign in to comment.