-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfunction.go
128 lines (106 loc) · 2.61 KB
/
function.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync"
)
const (
initialBufSize = 65536
maxBufSize = 6553600
)
type Function struct {
Command string
Args []string
Payload []byte
Stdout io.Writer
Stderr io.Writer
}
func NewFunction(command string, args []string, payload []byte) *Function {
f := &Function{
Command: command,
Args: args,
Payload: payload,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
return f
}
func (fn *Function) SetStdout(w io.Writer) {
fn.Stdout = w
}
func (fn *Function) SetStderr(w io.Writer) {
fn.Stderr = w
}
func (fn *Function) Run() (string, string, error) {
cmd := exec.Command(fn.Command, fn.Args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return "", "", fmt.Errorf("failed to open stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return "", "", fmt.Errorf("failed to open stderr pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
return "", "", fmt.Errorf("failed to open stdin pipe: %w", err)
}
var stdout1, stderr1 bytes.Buffer
stdout2 := io.TeeReader(stdout, &stdout1)
stderr2 := io.TeeReader(stderr, &stderr1)
err = cmd.Start()
if err != nil {
return "", "", fmt.Errorf("failed to start command: %w", err)
}
if _, err := stdin.Write(fn.Payload); err != nil {
return "", "", fmt.Errorf("failed to write to stdin: %w", err)
}
stdin.Close()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
scanner := bufio.NewScanner(stdout2)
buf := make([]byte, initialBufSize)
scanner.Buffer(buf, maxBufSize)
for scanner.Scan() {
fmt.Fprintln(fn.Stdout, scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(fn.Stderr, "scan error (stdout):", err)
}
wg.Done()
}()
wg.Add(1)
go func() {
scanner := bufio.NewScanner(stderr2)
buf := make([]byte, initialBufSize)
scanner.Buffer(buf, maxBufSize)
for scanner.Scan() {
fmt.Fprintln(fn.Stderr, scanner.Text())
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(fn.Stderr, "scan error (stderr):", err)
}
wg.Done()
}()
wg.Wait()
err = cmd.Wait()
// Before checking command success, we need to collect all the output
stdoutBytes := stdout1.String()
stderrBytes := stderr1.String()
if err != nil {
var exitError *exec.ExitError
if ok := errors.As(err, &exitError); ok {
return stdoutBytes, stderrBytes, fmt.Errorf("command failed with exit code %d", exitError.ExitCode())
} else {
// If this is not an ExitError, it's a unexpected situation
return stderrBytes, stderrBytes, fmt.Errorf("failed to execute command: %w", err)
}
}
return stdoutBytes, stderrBytes, nil
}