From 470e4738163bf0aef031f2e15db3456d4e0fdbe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:03:02 +0000 Subject: [PATCH 01/15] Rename BagIt fields --- bagit.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/bagit.go b/bagit.go index 4a4cc65..faaa7cc 100644 --- a/bagit.go +++ b/bagit.go @@ -25,10 +25,10 @@ var ErrInvalid = errors.New("invalid") // BagIt is an abstraction to work with BagIt packages that embeds Python and // the bagit-python. type BagIt struct { - tmpDir string // Top-level container for embedded files. - ep *python.EmbeddedPython // Python files. - lib *embed_util.EmbeddedFiles // bagit-python library files. - runner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner). + tmpDir string // Top-level container for embedded files. + embedPython *python.EmbeddedPython // Python files. + embedBagit *embed_util.EmbeddedFiles // bagit-python library files. + embedRunner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner). } // NewBagIt creates and initializes a new BagIt instance. This constructor is @@ -48,15 +48,15 @@ func NewBagIt() (*BagIt, error) { if err != nil { return nil, fmt.Errorf("embed python: %v", err) } - b.ep = ep + b.embedPython = ep - b.lib, err = embed_util.NewEmbeddedFilesWithTmpDir(data.Data, filepath.Join(b.tmpDir, "bagit-lib"), true) + b.embedBagit, err = embed_util.NewEmbeddedFilesWithTmpDir(data.Data, filepath.Join(b.tmpDir, "bagit-lib"), true) if err != nil { return nil, fmt.Errorf("embed bagit: %v", err) } - b.ep.AddPythonPath(b.lib.GetExtractedPath()) + b.embedPython.AddPythonPath(b.embedBagit.GetExtractedPath()) - b.runner, err = embed_util.NewEmbeddedFilesWithTmpDir(runner.Source, filepath.Join(b.tmpDir, "bagit-runner"), true) + b.embedRunner, err = embed_util.NewEmbeddedFilesWithTmpDir(runner.Source, filepath.Join(b.tmpDir, "bagit-runner"), true) if err != nil { return nil, fmt.Errorf("embed runner: %v", err) } @@ -68,7 +68,7 @@ func NewBagIt() (*BagIt, error) { func (b *BagIt) create() (*runnerInstance, error) { i := &runnerInstance{} - cmd, err := b.ep.PythonCmd(filepath.Join(b.runner.GetExtractedPath(), "main.py")) + cmd, err := b.embedPython.PythonCmd(filepath.Join(b.embedRunner.GetExtractedPath(), "main.py")) if err != nil { return nil, fmt.Errorf("create command: %v", err) } @@ -210,15 +210,15 @@ func (b *BagIt) Make(path string) error { func (b *BagIt) Cleanup() error { var e error - if err := b.runner.Cleanup(); err != nil { + if err := b.embedRunner.Cleanup(); err != nil { e = errors.Join(e, fmt.Errorf("clean up runner: %v", err)) } - if err := b.lib.Cleanup(); err != nil { + if err := b.embedBagit.Cleanup(); err != nil { e = errors.Join(e, fmt.Errorf("clean up bagit: %v", err)) } - if err := b.ep.Cleanup(); err != nil { + if err := b.embedPython.Cleanup(); err != nil { e = errors.Join(e, fmt.Errorf("clean up python: %v", err)) } From eb154d13f81f83efeab7339a1cfd768bbddb4d31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:05:29 +0000 Subject: [PATCH 02/15] Create runner.go file --- bagit.go | 82 ------------------------------------------------- runner.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 82 deletions(-) create mode 100644 runner.go diff --git a/bagit.go b/bagit.go index faaa7cc..3d0d04c 100644 --- a/bagit.go +++ b/bagit.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "os" - "os/exec" "path/filepath" "github.com/artefactual-labs/bagit-gython/internal/dist/data" @@ -64,36 +63,6 @@ func NewBagIt() (*BagIt, error) { return b, nil } -// create a Python intepreter running the bagit-python wrapper. -func (b *BagIt) create() (*runnerInstance, error) { - i := &runnerInstance{} - - cmd, err := b.embedPython.PythonCmd(filepath.Join(b.embedRunner.GetExtractedPath(), "main.py")) - if err != nil { - return nil, fmt.Errorf("create command: %v", err) - } - i.cmd = cmd - - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("create stdin pipe: %v", err) - } - i.stdin = stdin - - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("create stdout pipe: %v", err) - } - i.stdout = stdout - - err = cmd.Start() - if err != nil { - return nil, fmt.Errorf("cmd: %v", err) - } - - return i, nil -} - type validateRequest struct { Path string `json:"path"` } @@ -228,54 +197,3 @@ func (b *BagIt) Cleanup() error { return e } - -type args struct { - Cmd string `json:"cmd"` - Opts any `json:"opts"` -} - -// runnerInstance is an instance of a Python interpreter executing the -// bagit-python runner. -type runnerInstance struct { - cmd *exec.Cmd - stdin io.WriteCloser - stdout io.ReadCloser -} - -// send a command to the runner. -func (i *runnerInstance) send(args args) error { - blob, err := json.Marshal(args) - if err != nil { - return fmt.Errorf("encode args: %v", err) - } - blob = append(blob, '\n') - - _, err = i.stdin.Write(blob) - if err != nil { - return fmt.Errorf("write blob: %v", err) - } - - return nil -} - -func (i *runnerInstance) stop() error { - var e error - - if err := i.stdin.Close(); err != nil { - e = errors.Join(e, err) - } - - if err := i.stdout.Close(); err != nil { - e = errors.Join(e, err) - } - - if err := i.cmd.Process.Kill(); err != nil { - e = errors.Join(e, err) - } - - if _, err := i.cmd.Process.Wait(); err != nil { - e = errors.Join(e, err) - } - - return e -} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..5cf3860 --- /dev/null +++ b/runner.go @@ -0,0 +1,91 @@ +package bagit + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "path/filepath" +) + +// create a Python intepreter running the bagit-python wrapper. +func (b *BagIt) create() (*runnerInstance, error) { + i := &runnerInstance{} + + cmd, err := b.embedPython.PythonCmd(filepath.Join(b.embedRunner.GetExtractedPath(), "main.py")) + if err != nil { + return nil, fmt.Errorf("create command: %v", err) + } + i.cmd = cmd + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("create stdin pipe: %v", err) + } + i.stdin = stdin + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("create stdout pipe: %v", err) + } + i.stdout = stdout + + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("cmd: %v", err) + } + + return i, nil +} + +type args struct { + Cmd string `json:"cmd"` + Opts any `json:"opts"` +} + +// runnerInstance is an instance of a Python interpreter executing the +// bagit-python runner. +type runnerInstance struct { + cmd *exec.Cmd + stdin io.WriteCloser + stdout io.ReadCloser +} + +// send a command to the runner. +func (i *runnerInstance) send(args args) error { + blob, err := json.Marshal(args) + if err != nil { + return fmt.Errorf("encode args: %v", err) + } + blob = append(blob, '\n') + + _, err = i.stdin.Write(blob) + if err != nil { + return fmt.Errorf("write blob: %v", err) + } + + return nil +} + +func (i *runnerInstance) stop() error { + var e error + + if err := i.stdin.Close(); err != nil { + e = errors.Join(e, err) + } + + if err := i.stdout.Close(); err != nil { + e = errors.Join(e, err) + } + + if err := i.cmd.Process.Kill(); err != nil { + e = errors.Join(e, err) + } + + if _, err := i.cmd.Process.Wait(); err != nil { + e = errors.Join(e, err) + } + + return e +} From 0cfd20e1169869a2d51e5b1bb8f2ff5bf3a9a702 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:20:56 +0000 Subject: [PATCH 03/15] Persist interpreter --- bagit.go | 78 ++++++++----------------------- bagit_test.go | 20 +++++++- runner.go | 127 +++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 136 insertions(+), 89 deletions(-) diff --git a/bagit.go b/bagit.go index 3d0d04c..eac236c 100644 --- a/bagit.go +++ b/bagit.go @@ -1,12 +1,9 @@ package bagit import ( - "bufio" - "bytes" "encoding/json" "errors" "fmt" - "io" "os" "path/filepath" @@ -28,6 +25,7 @@ type BagIt struct { embedPython *python.EmbeddedPython // Python files. embedBagit *embed_util.EmbeddedFiles // bagit-python library files. embedRunner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner). + runner *runnerInstance } // NewBagIt creates and initializes a new BagIt instance. This constructor is @@ -43,11 +41,10 @@ func NewBagIt() (*BagIt, error) { return nil, fmt.Errorf("make tmpDir: %v", err) } - ep, err := python.NewEmbeddedPythonWithTmpDir(filepath.Join(b.tmpDir, "python"), true) + b.embedPython, err = python.NewEmbeddedPythonWithTmpDir(filepath.Join(b.tmpDir, "python"), true) if err != nil { return nil, fmt.Errorf("embed python: %v", err) } - b.embedPython = ep b.embedBagit, err = embed_util.NewEmbeddedFilesWithTmpDir(data.Data, filepath.Join(b.tmpDir, "bagit-lib"), true) if err != nil { @@ -60,6 +57,11 @@ func NewBagIt() (*BagIt, error) { return nil, fmt.Errorf("embed runner: %v", err) } + b.runner = createRunner( + b.embedPython, + filepath.Join(b.embedRunner.GetExtractedPath(), "main.py"), + ) + return b, nil } @@ -73,41 +75,18 @@ type validateResponse struct { } func (b *BagIt) Validate(path string) error { - i, err := b.create() - if err != nil { - return fmt.Errorf("run python: %v", err) - } - defer i.stop() - - reader := bufio.NewReader(i.stdout) - - if err := i.send(args{ + blob, err := b.runner.send(args{ Cmd: "validate", Opts: &validateRequest{ Path: path, }, - }); err != nil { + }) + if err != nil { return err } - line := bytes.NewBuffer(nil) - for { - l, p, err := reader.ReadLine() - if err != nil && err != io.EOF { - return fmt.Errorf("read line: %v", err) - } - line.Write(l) - if !p { - break - } - } - - if line.Len() < 1 { - return fmt.Errorf("response not received") - } - r := validateResponse{} - err = json.Unmarshal(line.Bytes(), &r) + err = json.Unmarshal(blob, &r) if err != nil { return fmt.Errorf("decode response: %v", err) } @@ -131,41 +110,18 @@ type makeResponse struct { } func (b *BagIt) Make(path string) error { - i, err := b.create() - if err != nil { - return fmt.Errorf("run python: %v", err) - } - defer i.stop() - - reader := bufio.NewReader(i.stdout) - - if err := i.send(args{ + blob, err := b.runner.send(args{ Cmd: "make", Opts: &makeRequest{ Path: path, }, - }); err != nil { + }) + if err != nil { return err } - line := bytes.NewBuffer(nil) - for { - l, p, err := reader.ReadLine() - if err != nil && err != io.EOF { - return fmt.Errorf("read line: %v", err) - } - line.Write(l) - if !p { - break - } - } - - if line.Len() < 1 { - return fmt.Errorf("response not received") - } - r := makeResponse{} - err = json.Unmarshal(line.Bytes(), &r) + err = json.Unmarshal(blob, &r) if err != nil { return fmt.Errorf("decode response: %v", err) } @@ -179,6 +135,10 @@ func (b *BagIt) Make(path string) error { func (b *BagIt) Cleanup() error { var e error + if err := b.runner.stop(); err != nil { + e = errors.Join(e, fmt.Errorf("stop runner: %v", err)) + } + if err := b.embedRunner.Cleanup(); err != nil { e = errors.Join(e, fmt.Errorf("clean up runner: %v", err)) } diff --git a/bagit_test.go b/bagit_test.go index d4de325..cfd08da 100644 --- a/bagit_test.go +++ b/bagit_test.go @@ -46,7 +46,7 @@ func TestValidateBag(t *testing.T) { assert.NilError(t, err) }) - t.Run("Validates bag concurrently", func(t *testing.T) { + t.Run("Returns ErrBusy if the resource is busy", func(t *testing.T) { t.Parallel() b := setUp(t) @@ -54,12 +54,28 @@ func TestValidateBag(t *testing.T) { // This test should pass because each call to Validate() creates its own // distinct Python interpreter instance. var g errgroup.Group - for i := 0; i < 10; i++ { + for i := 0; i < 3; i++ { g.Go(func() error { return b.Validate("internal/testdata/valid-bag") }) } + err := g.Wait() + assert.ErrorIs(t, err, bagit.ErrBusy) + }) + + t.Run("Parallel execution", func(t *testing.T) { + t.Parallel() + + // *bagit.BagIt is not shareable, each goroutine must create its own. + var g errgroup.Group + for i := 0; i < 3; i++ { + g.Go(func() error { + b := setUp(t) + return b.Validate("internal/testdata/valid-bag") + }) + } + err := g.Wait() assert.NilError(t, err) }) diff --git a/runner.go b/runner.go index 5cf3860..ceda8df 100644 --- a/runner.go +++ b/runner.go @@ -1,42 +1,79 @@ package bagit import ( + "bufio" + "bytes" "encoding/json" "errors" "fmt" "io" "os/exec" - "path/filepath" + "sync" + + "github.com/kluctl/go-embed-python/python" ) -// create a Python intepreter running the bagit-python wrapper. -func (b *BagIt) create() (*runnerInstance, error) { - i := &runnerInstance{} +var ErrBusy = errors.New("runner is busy") + +// runnerInstance is an instance of a Python interpreter executing the +// bagit-python runner. +type runnerInstance struct { + py *python.EmbeddedPython + entryPoint string + cmd *exec.Cmd + stdin io.WriteCloser + stdout io.ReadCloser + stdoutReader *bufio.Reader + mu sync.Mutex +} + +func createRunner(py *python.EmbeddedPython, entryPoint string) *runnerInstance { + return &runnerInstance{ + py: py, + entryPoint: entryPoint, + } +} + +// exited determines whether the process has exited. +func (r *runnerInstance) exited() bool { + if r.cmd == nil || r.cmd.ProcessState == nil { + return true + } + + return r.cmd.ProcessState.Exited() +} + +// ensure that the process is running. +func (r *runnerInstance) ensure() error { + if !r.exited() { + return nil + } - cmd, err := b.embedPython.PythonCmd(filepath.Join(b.embedRunner.GetExtractedPath(), "main.py")) + var err error + r.cmd, err = r.py.PythonCmd(r.entryPoint) if err != nil { - return nil, fmt.Errorf("create command: %v", err) + return fmt.Errorf("start runner: %v", err) } - i.cmd = cmd - stdin, err := cmd.StdinPipe() + // r.cmd.Stderr = os.Stderr + + r.stdin, err = r.cmd.StdinPipe() if err != nil { - return nil, fmt.Errorf("create stdin pipe: %v", err) + return fmt.Errorf("create stdin pipe: %v", err) } - i.stdin = stdin - stdout, err := cmd.StdoutPipe() + r.stdout, err = r.cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("create stdout pipe: %v", err) + return fmt.Errorf("create stdout pipe: %v", err) } - i.stdout = stdout + r.stdoutReader = bufio.NewReader(r.stdout) - err = cmd.Start() + err = r.cmd.Start() if err != nil { - return nil, fmt.Errorf("cmd: %v", err) + return fmt.Errorf("start cmd: %v", err) } - return i, nil + return nil } type args struct { @@ -44,33 +81,67 @@ type args struct { Opts any `json:"opts"` } -// runnerInstance is an instance of a Python interpreter executing the -// bagit-python runner. -type runnerInstance struct { - cmd *exec.Cmd - stdin io.WriteCloser - stdout io.ReadCloser -} - // send a command to the runner. -func (i *runnerInstance) send(args args) error { +func (i *runnerInstance) send(args args) ([]byte, error) { + if ok := i.mu.TryLock(); !ok { + return nil, ErrBusy + } + defer i.mu.Unlock() + + if err := i.ensure(); err != nil { + return nil, err + } + blob, err := json.Marshal(args) if err != nil { - return fmt.Errorf("encode args: %v", err) + return nil, fmt.Errorf("encode args: %v", err) } blob = append(blob, '\n') _, err = i.stdin.Write(blob) if err != nil { - return fmt.Errorf("write blob: %v", err) + return nil, fmt.Errorf("write blob: %v", err) } - return nil + line := bytes.NewBuffer(nil) + for { + l, p, err := i.stdoutReader.ReadLine() + if err != nil && err != io.EOF { + return nil, fmt.Errorf("read line: %v", err) + } + line.Write(l) + if !p { + break + } + } + if line.Len() < 1 { + return nil, fmt.Errorf("response not received") + } + + return line.Bytes(), nil +} + +// quit requests the runner to exit gracefully. +func (r *runnerInstance) quit() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.exited() { + return nil + } + + _, err := r.stdin.Write([]byte(`{"name": "exit"}`)) + + return err } func (i *runnerInstance) stop() error { var e error + if err := i.quit(); err != nil { + e = errors.Join(e, err) + } + if err := i.stdin.Close(); err != nil { e = errors.Join(e, err) } From 8ad8d0297053c88447c10b4c6a1fe9c9a424d104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:29:48 +0000 Subject: [PATCH 04/15] Rename runnerInstance as pyRunner --- bagit.go | 2 +- runner.go | 31 ++++++++++++++++--------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/bagit.go b/bagit.go index eac236c..369adf4 100644 --- a/bagit.go +++ b/bagit.go @@ -25,7 +25,7 @@ type BagIt struct { embedPython *python.EmbeddedPython // Python files. embedBagit *embed_util.EmbeddedFiles // bagit-python library files. embedRunner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner). - runner *runnerInstance + runner *pyRunner } // NewBagIt creates and initializes a new BagIt instance. This constructor is diff --git a/runner.go b/runner.go index ceda8df..5c21458 100644 --- a/runner.go +++ b/runner.go @@ -15,9 +15,10 @@ import ( var ErrBusy = errors.New("runner is busy") -// runnerInstance is an instance of a Python interpreter executing the -// bagit-python runner. -type runnerInstance struct { +// pyRunner manages the execution of the Python script wrapping bagit-python. +// It ensures that only one command is executed at a time and provides +// mechanisms to send commands and receive responses. +type pyRunner struct { py *python.EmbeddedPython entryPoint string cmd *exec.Cmd @@ -27,15 +28,15 @@ type runnerInstance struct { mu sync.Mutex } -func createRunner(py *python.EmbeddedPython, entryPoint string) *runnerInstance { - return &runnerInstance{ +func createRunner(py *python.EmbeddedPython, entryPoint string) *pyRunner { + return &pyRunner{ py: py, entryPoint: entryPoint, } } // exited determines whether the process has exited. -func (r *runnerInstance) exited() bool { +func (r *pyRunner) exited() bool { if r.cmd == nil || r.cmd.ProcessState == nil { return true } @@ -44,7 +45,7 @@ func (r *runnerInstance) exited() bool { } // ensure that the process is running. -func (r *runnerInstance) ensure() error { +func (r *pyRunner) ensure() error { if !r.exited() { return nil } @@ -82,13 +83,13 @@ type args struct { } // send a command to the runner. -func (i *runnerInstance) send(args args) ([]byte, error) { - if ok := i.mu.TryLock(); !ok { +func (r *pyRunner) send(args args) ([]byte, error) { + if ok := r.mu.TryLock(); !ok { return nil, ErrBusy } - defer i.mu.Unlock() + defer r.mu.Unlock() - if err := i.ensure(); err != nil { + if err := r.ensure(); err != nil { return nil, err } @@ -98,14 +99,14 @@ func (i *runnerInstance) send(args args) ([]byte, error) { } blob = append(blob, '\n') - _, err = i.stdin.Write(blob) + _, err = r.stdin.Write(blob) if err != nil { return nil, fmt.Errorf("write blob: %v", err) } line := bytes.NewBuffer(nil) for { - l, p, err := i.stdoutReader.ReadLine() + l, p, err := r.stdoutReader.ReadLine() if err != nil && err != io.EOF { return nil, fmt.Errorf("read line: %v", err) } @@ -122,7 +123,7 @@ func (i *runnerInstance) send(args args) ([]byte, error) { } // quit requests the runner to exit gracefully. -func (r *runnerInstance) quit() error { +func (r *pyRunner) quit() error { r.mu.Lock() defer r.mu.Unlock() @@ -135,7 +136,7 @@ func (r *runnerInstance) quit() error { return err } -func (i *runnerInstance) stop() error { +func (i *pyRunner) stop() error { var e error if err := i.quit(); err != nil { From b136b5e7561bea0d8b8c9de4cf24a035c4c571b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Sat, 13 Jul 2024 05:48:34 +0000 Subject: [PATCH 05/15] Monitor command with a goroutine --- runner.go | 51 +++++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/runner.go b/runner.go index 5c21458..8761929 100644 --- a/runner.go +++ b/runner.go @@ -9,6 +9,7 @@ import ( "io" "os/exec" "sync" + "sync/atomic" "github.com/kluctl/go-embed-python/python" ) @@ -19,13 +20,15 @@ var ErrBusy = errors.New("runner is busy") // It ensures that only one command is executed at a time and provides // mechanisms to send commands and receive responses. type pyRunner struct { - py *python.EmbeddedPython - entryPoint string - cmd *exec.Cmd - stdin io.WriteCloser - stdout io.ReadCloser - stdoutReader *bufio.Reader - mu sync.Mutex + py *python.EmbeddedPython // Instance of EmbeddedPython. + entryPoint string // Path to the runner wrapper entry point. + cmd *exec.Cmd // Command running Python interpreter. + running atomic.Bool // Tracks whether the command is still running. + wg sync.WaitGroup // Tracks the cmd monitor goroutine. + stdin io.WriteCloser // Standard input stream. + stdout io.ReadCloser // Standard output stream. + stdoutReader *bufio.Reader // Standard output stream (buffered reader). + mu sync.Mutex // Prevents sharing the command (see ErrBusy). } func createRunner(py *python.EmbeddedPython, entryPoint string) *pyRunner { @@ -35,18 +38,9 @@ func createRunner(py *python.EmbeddedPython, entryPoint string) *pyRunner { } } -// exited determines whether the process has exited. -func (r *pyRunner) exited() bool { - if r.cmd == nil || r.cmd.ProcessState == nil { - return true - } - - return r.cmd.ProcessState.Exited() -} - // ensure that the process is running. func (r *pyRunner) ensure() error { - if !r.exited() { + if r.running.Load() { return nil } @@ -56,6 +50,7 @@ func (r *pyRunner) ensure() error { return fmt.Errorf("start runner: %v", err) } + // Useful for debugging the Python application. // r.cmd.Stderr = os.Stderr r.stdin, err = r.cmd.StdinPipe() @@ -74,6 +69,16 @@ func (r *pyRunner) ensure() error { return fmt.Errorf("start cmd: %v", err) } + r.running.Store(true) + + // Monitor the command from a dedicated goroutine. + r.wg.Add(1) + go func() { + defer r.wg.Done() + _ = r.cmd.Wait() + r.running.Store(false) + }() + return nil } @@ -124,13 +129,13 @@ func (r *pyRunner) send(args args) ([]byte, error) { // quit requests the runner to exit gracefully. func (r *pyRunner) quit() error { - r.mu.Lock() - defer r.mu.Unlock() - - if r.exited() { + if r.running.Load() { return nil } + r.mu.Lock() + defer r.mu.Unlock() + _, err := r.stdin.Write([]byte(`{"name": "exit"}`)) return err @@ -155,9 +160,7 @@ func (i *pyRunner) stop() error { e = errors.Join(e, err) } - if _, err := i.cmd.Process.Wait(); err != nil { - e = errors.Join(e, err) - } + i.wg.Wait() return e } From 0320e7f0f5b3c70b031f44be12c58b32d3fff0b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:26:08 +0000 Subject: [PATCH 06/15] Organize tests --- bagit_test.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/bagit_test.go b/bagit_test.go index cfd08da..95e00b3 100644 --- a/bagit_test.go +++ b/bagit_test.go @@ -24,19 +24,9 @@ func setUp(t *testing.T) *bagit.BagIt { return b } -func TestValidateBag(t *testing.T) { +func TestConcurrency(t *testing.T) { t.Parallel() - t.Run("Fails validation", func(t *testing.T) { - t.Parallel() - - b := setUp(t) - - err := b.Validate("/tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333") - assert.Error(t, err, "invalid: Expected bagit.txt does not exist: /tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333/bagit.txt") - assert.Assert(t, errors.Is(err, bagit.ErrInvalid)) - }) - t.Run("Validates bag", func(t *testing.T) { t.Parallel() @@ -79,6 +69,33 @@ func TestValidateBag(t *testing.T) { err := g.Wait() assert.NilError(t, err) }) +} + +func TestValidateBag(t *testing.T) { + t.Parallel() + + t.Run("Fails validation", func(t *testing.T) { + t.Parallel() + + b := setUp(t) + + err := b.Validate("/tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333") + assert.Error(t, err, "invalid: Expected bagit.txt does not exist: /tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333/bagit.txt") + assert.Assert(t, errors.Is(err, bagit.ErrInvalid)) + }) + + t.Run("Validates bag", func(t *testing.T) { + t.Parallel() + + b := setUp(t) + + err := b.Validate("internal/testdata/valid-bag") + assert.NilError(t, err) + }) +} + +func TestMakeBag(t *testing.T) { + t.Parallel() t.Run("Creates bag", func(t *testing.T) { t.Parallel() From cbfcfe2a7b668eda70484fcf59826f29d86ac9c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:31:06 +0000 Subject: [PATCH 07/15] Add docstring to ErrBusy --- runner.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runner.go b/runner.go index 8761929..f8f0aab 100644 --- a/runner.go +++ b/runner.go @@ -14,6 +14,10 @@ import ( "github.com/kluctl/go-embed-python/python" ) +// ErrBusy is returned when an operation is attempted on BagIt while it is +// already processing another command. This ensures that only one command is +// processed at a time, preventing race conditions and ensuring the integrity +// of the shared resources. var ErrBusy = errors.New("runner is busy") // pyRunner manages the execution of the Python script wrapping bagit-python. From b315cf5db828e50d95ebdff795060c8b1745c82d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:33:25 +0000 Subject: [PATCH 08/15] Change signature of send method --- bagit.go | 14 ++++---------- runner.go | 5 +++-- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/bagit.go b/bagit.go index 369adf4..5df2705 100644 --- a/bagit.go +++ b/bagit.go @@ -75,11 +75,8 @@ type validateResponse struct { } func (b *BagIt) Validate(path string) error { - blob, err := b.runner.send(args{ - Cmd: "validate", - Opts: &validateRequest{ - Path: path, - }, + blob, err := b.runner.send("validate", &validateRequest{ + Path: path, }) if err != nil { return err @@ -110,11 +107,8 @@ type makeResponse struct { } func (b *BagIt) Make(path string) error { - blob, err := b.runner.send(args{ - Cmd: "make", - Opts: &makeRequest{ - Path: path, - }, + blob, err := b.runner.send("make", &makeRequest{ + Path: path, }) if err != nil { return err diff --git a/runner.go b/runner.go index f8f0aab..3d3660f 100644 --- a/runner.go +++ b/runner.go @@ -92,7 +92,7 @@ type args struct { } // send a command to the runner. -func (r *pyRunner) send(args args) ([]byte, error) { +func (r *pyRunner) send(name string, opts any) ([]byte, error) { if ok := r.mu.TryLock(); !ok { return nil, ErrBusy } @@ -102,7 +102,8 @@ func (r *pyRunner) send(args args) ([]byte, error) { return nil, err } - blob, err := json.Marshal(args) + cmd := args{Cmd: name, Opts: opts} + blob, err := json.Marshal(cmd) if err != nil { return nil, fmt.Errorf("encode args: %v", err) } From ee5865f7c5b3f39b762fb85a9ab2aa83b55934ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:35:44 +0000 Subject: [PATCH 09/15] Rename args as cmd --- internal/runner/main.py | 28 ++++++++++++++-------------- runner.go | 10 +++++----- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/runner/main.py b/internal/runner/main.py index d999fe4..7d432a5 100644 --- a/internal/runner/main.py +++ b/internal/runner/main.py @@ -11,21 +11,21 @@ def __init__(self, req): self.req = req @property - def cmd(self): - return self.req.get("cmd") + def name(self): + return self.req.get("name") @property - def opts(self): - return self.req.get("opts") + def args(self): + return self.req.get("args") def run(self): resp = {} try: - if self.cmd == "validate": - resp = self.validate(self.opts) - elif self.cmd == "make": - resp = self.make(self.opts) + if self.name == "validate": + resp = self.validate(self.args) + elif self.name == "make": + resp = self.make(self.args) else: raise Exception("Unknown command") except BaseException as err: @@ -33,14 +33,14 @@ def run(self): return json.dumps(resp) - def validate(self, opts): - bag = Bag(opts.get("path")) + def validate(self, args): + bag = Bag(args.get("path")) bag.validate(processes=multiprocessing.cpu_count()) return {"valid": True} - def make(self, opts): - bag_dir = opts.pop("path") - bag = make_bag(bag_dir, **opts) + def make(self, args): + bag_dir = args.pop("path") + bag = make_bag(bag_dir, **args) return {"version": bag.version} @@ -51,7 +51,7 @@ def main(): break req = json.loads(cmd) - if req.get("cmd") == "exit": + if req.get("name") == "exit": break result = Runner(req).run() diff --git a/runner.go b/runner.go index 3d3660f..030e75c 100644 --- a/runner.go +++ b/runner.go @@ -86,13 +86,13 @@ func (r *pyRunner) ensure() error { return nil } -type args struct { - Cmd string `json:"cmd"` - Opts any `json:"opts"` +type cmd struct { + Name string `json:"name"` // Name of the command, e.g.: "validate", "make", etc... + Args any `json:"args"` // Payload, e.g. &validateRequest{}. } // send a command to the runner. -func (r *pyRunner) send(name string, opts any) ([]byte, error) { +func (r *pyRunner) send(name string, args any) ([]byte, error) { if ok := r.mu.TryLock(); !ok { return nil, ErrBusy } @@ -102,7 +102,7 @@ func (r *pyRunner) send(name string, opts any) ([]byte, error) { return nil, err } - cmd := args{Cmd: name, Opts: opts} + cmd := cmd{Name: name, Args: args} blob, err := json.Marshal(cmd) if err != nil { return nil, fmt.Errorf("encode args: %v", err) From 86dbb4a412e7a4ae49ff509860cf3821629ad2d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:49:02 +0000 Subject: [PATCH 10/15] Clean up writing to stdout --- internal/runner/main.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/internal/runner/main.py b/internal/runner/main.py index 7d432a5..25fede0 100644 --- a/internal/runner/main.py +++ b/internal/runner/main.py @@ -7,8 +7,9 @@ class Runner: - def __init__(self, req): + def __init__(self, req, stdout): self.req = req + self.stdout = stdout @property def name(self): @@ -29,9 +30,10 @@ def run(self): else: raise Exception("Unknown command") except BaseException as err: - resp["err"] = str(err) + self.write_error(self.stdout, err) + return - return json.dumps(resp) + self.write(self.stdout, resp) def validate(self, args): bag = Bag(args.get("path")) @@ -43,6 +45,14 @@ def make(self, args): bag = make_bag(bag_dir, **args) return {"version": bag.version} + @staticmethod + def write(stdout, resp): + print(json.dumps(resp), file=stdout, flush=True) + + @staticmethod + def write_error(stdout, err): + Runner.write(stdout, {"err": str(err), "type": err.__class__.__name__}) + def main(): while True: @@ -54,10 +64,8 @@ def main(): if req.get("name") == "exit": break - result = Runner(req).run() - - sys.stdout.write(result + "\n") - sys.stdout.flush() + runner = Runner(req, sys.stdout) + runner.run() if __name__ == "__main__": From 2ee0008abb62652e4fe54af05647bbc2dfad02b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:55:41 +0000 Subject: [PATCH 11/15] Introduce Command dataclass --- internal/runner/main.py | 50 ++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/internal/runner/main.py b/internal/runner/main.py index 25fede0..9150d00 100644 --- a/internal/runner/main.py +++ b/internal/runner/main.py @@ -1,32 +1,33 @@ import json import multiprocessing import sys +from dataclasses import dataclass, field +from typing import Any, Dict from bagit import Bag, make_bag -from bagit import BagError -class Runner: - def __init__(self, req, stdout): - self.req = req - self.stdout = stdout +@dataclass +class Command: + name: str + args: Dict[str, Any] = field(default_factory=dict) - @property - def name(self): - return self.req.get("name") - @property - def args(self): - return self.req.get("args") +class Runner: + def __init__(self, cmd, stdout): + self.cmd = cmd + self.stdout = stdout def run(self): - resp = {} + name = self.cmd.name + args = self.cmd.args + resp = {} try: - if self.name == "validate": - resp = self.validate(self.args) - elif self.name == "make": - resp = self.make(self.args) + if name == "validate": + resp = self.validate(args) + elif name == "make": + resp = self.make(args) else: raise Exception("Unknown command") except BaseException as err: @@ -56,15 +57,22 @@ def write_error(stdout, err): def main(): while True: - cmd = sys.stdin.readline() - if not cmd: + line = sys.stdin.readline() + if not line: break - req = json.loads(cmd) - if req.get("name") == "exit": + try: + payload = json.loads(line) + except ValueError as err: + Runner.write_error(sys.stdout, err) + continue + + cmd = Command(name=payload.get("name"), args=payload.get("args")) + + if cmd.name == "exit": break - runner = Runner(req, sys.stdout) + runner = Runner(cmd, sys.stdout) runner.run() From 2dd9e60fab2820bf2eb45eab66f9097390c5b42a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:57:19 +0000 Subject: [PATCH 12/15] Handle exit command from run method --- internal/runner/main.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/runner/main.py b/internal/runner/main.py index 9150d00..9980b47 100644 --- a/internal/runner/main.py +++ b/internal/runner/main.py @@ -13,6 +13,10 @@ class Command: args: Dict[str, Any] = field(default_factory=dict) +class ExitError(Exception): + pass + + class Runner: def __init__(self, cmd, stdout): self.cmd = cmd @@ -28,8 +32,12 @@ def run(self): resp = self.validate(args) elif name == "make": resp = self.make(args) + elif name == "exit": + self.exit(args) else: raise Exception("Unknown command") + except ExitError: + raise except BaseException as err: self.write_error(self.stdout, err) return @@ -46,6 +54,9 @@ def make(self, args): bag = make_bag(bag_dir, **args) return {"version": bag.version} + def exit(self, args): + raise ExitError + @staticmethod def write(stdout, resp): print(json.dumps(resp), file=stdout, flush=True) @@ -69,11 +80,11 @@ def main(): cmd = Command(name=payload.get("name"), args=payload.get("args")) - if cmd.name == "exit": - break - runner = Runner(cmd, sys.stdout) - runner.run() + try: + runner.run() + except ExitError: + return if __name__ == "__main__": From 23d514dc4b803fcd3cea069c5ad2bd315cba5811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 11 Jul 2024 18:59:49 +0000 Subject: [PATCH 13/15] Load command handlers dynamically --- internal/runner/main.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/internal/runner/main.py b/internal/runner/main.py index 9980b47..4f3cb1b 100644 --- a/internal/runner/main.py +++ b/internal/runner/main.py @@ -13,11 +13,18 @@ class Command: args: Dict[str, Any] = field(default_factory=dict) +class UnknownCommandError(Exception): + pass + + class ExitError(Exception): pass class Runner: + ALLOWED_COMMANDS = ("validate", "make", "exit") + ALLOWED_COMMANDS_LIST = ", ".join(ALLOWED_COMMANDS) + def __init__(self, cmd, stdout): self.cmd = cmd self.stdout = stdout @@ -28,14 +35,8 @@ def run(self): resp = {} try: - if name == "validate": - resp = self.validate(args) - elif name == "make": - resp = self.make(args) - elif name == "exit": - self.exit(args) - else: - raise Exception("Unknown command") + ret = self.get_handler(name)(args) + resp.update(ret) except ExitError: raise except BaseException as err: @@ -44,17 +45,27 @@ def run(self): self.write(self.stdout, resp) - def validate(self, args): + def get_handler(self, name): + if name not in self.ALLOWED_COMMANDS: + raise UnknownCommandError( + f"'{name}' is not a valid command, use: {self.ALLOWED_COMMANDS_LIST}" + ) + handler = getattr(self, f"{name}_handler") + if handler is None: + raise UnknownCommandError(f"'{name}' does not have a handler") + return handler + + def validate_handler(self, args): bag = Bag(args.get("path")) bag.validate(processes=multiprocessing.cpu_count()) return {"valid": True} - def make(self, args): + def make_handler(self, args): bag_dir = args.pop("path") bag = make_bag(bag_dir, **args) return {"version": bag.version} - def exit(self, args): + def exit_handler(self, args): raise ExitError @staticmethod From 2e99d5bad1f827602e84a874acf5dc8aa619941e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Sat, 13 Jul 2024 04:55:22 +0000 Subject: [PATCH 14/15] Move error --- bagit.go | 16 ++++++++++++---- runner.go | 6 ------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/bagit.go b/bagit.go index 5df2705..5033daf 100644 --- a/bagit.go +++ b/bagit.go @@ -13,10 +13,18 @@ import ( "github.com/kluctl/go-embed-python/python" ) -// ErrInvalid indicates that bag validation failed. If there is a validation -// error message, ErrInvalid will be wrapped so make sure to use -// `errors.Is(err, ErrInvalid)` to test equivalency. -var ErrInvalid = errors.New("invalid") +var ( + // ErrInvalid indicates that bag validation failed. If there is a validation + // error message, ErrInvalid will be wrapped so make sure to use + // `errors.Is(err, ErrInvalid)` to test equivalency. + ErrInvalid = errors.New("invalid") + + // ErrBusy is returned when an operation is attempted on BagIt while it is + // already processing another command. This ensures that only one command is + // processed at a time, preventing race conditions and ensuring the + // integrity of the shared resources. + ErrBusy = errors.New("runner is busy") +) // BagIt is an abstraction to work with BagIt packages that embeds Python and // the bagit-python. diff --git a/runner.go b/runner.go index 030e75c..d08f425 100644 --- a/runner.go +++ b/runner.go @@ -14,12 +14,6 @@ import ( "github.com/kluctl/go-embed-python/python" ) -// ErrBusy is returned when an operation is attempted on BagIt while it is -// already processing another command. This ensures that only one command is -// processed at a time, preventing race conditions and ensuring the integrity -// of the shared resources. -var ErrBusy = errors.New("runner is busy") - // pyRunner manages the execution of the Python script wrapping bagit-python. // It ensures that only one command is executed at a time and provides // mechanisms to send commands and receive responses. From d6d2170b2abf993f9c26b12a466376edc5065d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Sat, 13 Jul 2024 04:55:55 +0000 Subject: [PATCH 15/15] Add benchmark --- bagit_test.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/bagit_test.go b/bagit_test.go index 95e00b3..105c151 100644 --- a/bagit_test.go +++ b/bagit_test.go @@ -11,19 +11,29 @@ import ( "gotest.tools/v3/fs" ) -func setUp(t *testing.T) *bagit.BagIt { - t.Helper() +func setUp(tb testing.TB) *bagit.BagIt { + tb.Helper() b, err := bagit.NewBagIt() - assert.NilError(t, err) + assert.NilError(tb, err) - t.Cleanup(func() { - assert.NilError(t, b.Cleanup()) + tb.Cleanup(func() { + assert.NilError(tb, b.Cleanup()) }) return b } +func BenchmarkValidate(b *testing.B) { + bagit := setUp(b) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _ = bagit.Validate("internal/testdata/valid-bag") + } +} + func TestConcurrency(t *testing.T) { t.Parallel()