-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.go
178 lines (148 loc) · 3.77 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package bagit
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
"sync"
"sync/atomic"
"time"
"github.com/kluctl/go-embed-python/python"
)
// pyRunner manages the execution of the Python script wrapping bagit-python.
// It ensures that only one command is executed at a time and provides
// mechanisms to send commands and receive responses.
type pyRunner struct {
py *python.EmbeddedPython // Instance of EmbeddedPython.
entryPoint string // Path to the runner wrapper entry point.
cmd *exec.Cmd // Command running Python interpreter.
running atomic.Bool // Tracks whether the command is still running.
wg sync.WaitGroup // Tracks the cmd monitor goroutine.
stdin io.WriteCloser // Standard input stream.
stdout io.ReadCloser // Standard output stream.
stdoutReader *bufio.Reader // Standard output stream (buffered reader).
mu sync.Mutex // Prevents sharing the command (see ErrBusy).
}
func createRunner(py *python.EmbeddedPython, entryPoint string) *pyRunner {
return &pyRunner{
py: py,
entryPoint: entryPoint,
}
}
// ensure that the process is running.
func (r *pyRunner) ensure() error {
if r.running.Load() {
return nil
}
var err error
r.cmd, err = r.py.PythonCmd(r.entryPoint)
if err != nil {
return fmt.Errorf("start runner: %v", err)
}
// Useful for debugging the Python application.
// r.cmd.Stderr = os.Stderr
r.stdin, err = r.cmd.StdinPipe()
if err != nil {
return fmt.Errorf("create stdin pipe: %v", err)
}
r.stdout, err = r.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("create stdout pipe: %v", err)
}
r.stdoutReader = bufio.NewReader(r.stdout)
err = r.cmd.Start()
if err != nil {
return fmt.Errorf("start cmd: %v", err)
}
r.running.Store(true)
// Monitor the command from a dedicated goroutine.
r.wg.Add(1)
go func() {
defer r.wg.Done()
_ = r.cmd.Wait()
r.running.Store(false)
}()
return nil
}
type cmd struct {
Name string `json:"name"` // Name of the command, e.g.: "validate", "make", etc...
Args any `json:"args"` // Payload, e.g. &validateRequest{}.
}
// send a command to the runner.
func (r *pyRunner) send(name string, args any) ([]byte, error) {
if ok := r.mu.TryLock(); !ok {
return nil, ErrBusy
}
defer r.mu.Unlock()
if err := r.ensure(); err != nil {
return nil, err
}
cmd := cmd{Name: name, Args: args}
blob, err := json.Marshal(cmd)
if err != nil {
return nil, fmt.Errorf("encode args: %v", err)
}
blob = append(blob, '\n')
_, err = r.stdin.Write(blob)
if err != nil {
return nil, fmt.Errorf("write blob: %v", err)
}
line := bytes.NewBuffer(nil)
for {
l, p, err := r.stdoutReader.ReadLine()
if err != nil && err != io.EOF {
return nil, fmt.Errorf("read line: %v", err)
}
line.Write(l)
if !p {
break
}
}
if line.Len() < 1 {
return nil, fmt.Errorf("response not received")
}
return line.Bytes(), nil
}
// quit requests the runner to exit gracefully.
func (r *pyRunner) quit() error {
if r.running.Load() {
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
var err error
if r.stdin != nil {
_, err = r.stdin.Write([]byte(`{"name": "exit"}`))
}
return err
}
func (r *pyRunner) stop() error {
var e error
if err := r.quit(); err != nil {
e = errors.Join(e, err)
}
// Wait up to a second, otherwise force to exit immediately.
if closed := wait(&r.wg, time.Second); !closed {
if err := r.cmd.Process.Kill(); err != nil {
e = errors.Join(e, err)
}
}
r.wg.Wait()
return e
}
func wait(wg *sync.WaitGroup, timeout time.Duration) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return true
case <-time.After(timeout):
return false
}
}