Skip to content

Commit

Permalink
Make the Python interpreter persistent for repeated use
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Jul 11, 2024
1 parent 5df4225 commit cc66397
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 206 deletions.
198 changes: 41 additions & 157 deletions bagit.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package bagit

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"

"github.com/artefactual-labs/bagit-gython/internal/dist/data"
Expand All @@ -20,10 +16,11 @@ import (
// BagIt is an abstraction to work with BagIt packages that embeds Python and
// the bagit-python.
type BagIt struct {
tmpDir string // Top-level container for embedded files.
ep *python.EmbeddedPython // Python files.
lib *embed_util.EmbeddedFiles // bagit-python library files.
runner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner).
tmpDir string // Top-level container for embedded files.
embedPython *python.EmbeddedPython // Python files.
embedBagit *embed_util.EmbeddedFiles // bagit-python library files.
embedRunner *embed_util.EmbeddedFiles // bagit-python wrapper files (runner).
runner *pyRunner
}

// NewBagIt creates and initializes a new BagIt instance. This constructor is
Expand All @@ -39,54 +36,40 @@ func NewBagIt() (*BagIt, error) {
return nil, fmt.Errorf("make tmpDir: %v", err)
}

ep, err := python.NewEmbeddedPythonWithTmpDir(filepath.Join(b.tmpDir, "python"), true)
ep, err := python.NewEmbeddedPythonWithTmpDir(
filepath.Join(b.tmpDir, "python"),
true,
)
if err != nil {
return nil, fmt.Errorf("embed python: %v", err)
}
b.ep = ep
b.embedPython = ep

b.lib, err = embed_util.NewEmbeddedFilesWithTmpDir(data.Data, filepath.Join(b.tmpDir, "bagit-lib"), true)
b.embedBagit, err = embed_util.NewEmbeddedFilesWithTmpDir(
data.Data,
filepath.Join(b.tmpDir, "bagit-lib"),
true,
)
if err != nil {
return nil, fmt.Errorf("embed bagit: %v", err)
}
b.ep.AddPythonPath(b.lib.GetExtractedPath())
b.embedPython.AddPythonPath(b.embedBagit.GetExtractedPath())

b.runner, err = embed_util.NewEmbeddedFilesWithTmpDir(runner.Source, filepath.Join(b.tmpDir, "bagit-runner"), true)
b.embedRunner, err = embed_util.NewEmbeddedFilesWithTmpDir(
runner.Source,
filepath.Join(b.tmpDir, "bagit-runner"),
true,
)
if err != nil {
return nil, fmt.Errorf("embed runner: %v", err)
}

return b, nil
}

// create a Python intepreter running the bagit-python wrapper.
func (b *BagIt) create() (*runnerInstance, error) {
i := &runnerInstance{}

cmd, err := b.ep.PythonCmd(filepath.Join(b.runner.GetExtractedPath(), "main.py"))
if err != nil {
return nil, fmt.Errorf("create command: %v", err)
}
i.cmd = cmd

stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("create stdin pipe: %v", err)
}
i.stdin = stdin

stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("create stdout pipe: %v", err)
}
i.stdout = stdout

err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("cmd: %v", err)
}
b.runner = createRunner(
b.embedPython,
filepath.Join(b.embedRunner.GetExtractedPath(), "main.py"),
)

return i, nil
return b, nil
}

type validateRequest struct {
Expand All @@ -99,41 +82,15 @@ type validateResponse struct {
}

func (b *BagIt) Validate(path string) error {
i, err := b.create()
blob, err := b.runner.Command("validate", &validateRequest{
Path: path,
})
if err != nil {
return fmt.Errorf("run python: %v", err)
}
defer i.stop()

reader := bufio.NewReader(i.stdout)

if err := i.send(args{
Cmd: "validate",
Opts: &validateRequest{
Path: path,
},
}); err != nil {
return err
}

line := bytes.NewBuffer(nil)
for {
l, p, err := reader.ReadLine()
if err != nil && err != io.EOF {
return fmt.Errorf("read line: %v", err)
}
line.Write(l)
if !p {
break
}
}

if line.Len() < 1 {
return fmt.Errorf("response not received")
}

r := validateResponse{}
err = json.Unmarshal(line.Bytes(), &r)
err = json.Unmarshal(blob, &r)
if err != nil {
return fmt.Errorf("decode response: %v", err)
}
Expand All @@ -157,41 +114,15 @@ type makeResponse struct {
}

func (b *BagIt) Make(path string) error {
i, err := b.create()
blob, err := b.runner.Command("make", &makeRequest{
Path: path,
})
if err != nil {
return fmt.Errorf("run python: %v", err)
}
defer i.stop()

reader := bufio.NewReader(i.stdout)

if err := i.send(args{
Cmd: "make",
Opts: &makeRequest{
Path: path,
},
}); err != nil {
return err
}

line := bytes.NewBuffer(nil)
for {
l, p, err := reader.ReadLine()
if err != nil && err != io.EOF {
return fmt.Errorf("read line: %v", err)
}
line.Write(l)
if !p {
break
}
}

if line.Len() < 1 {
return fmt.Errorf("response not received")
}

r := makeResponse{}
err = json.Unmarshal(line.Bytes(), &r)
err = json.Unmarshal(blob, &r)
if err != nil {
return fmt.Errorf("decode response: %v", err)
}
Expand All @@ -205,15 +136,19 @@ func (b *BagIt) Make(path string) error {
func (b *BagIt) Cleanup() error {
var e error

if err := b.runner.Cleanup(); err != nil {
if err := b.runner.Shutdown(); err != nil {
e = errors.Join(e, fmt.Errorf("shutdown runner: %v", err))
}

if err := b.embedRunner.Cleanup(); err != nil {
e = errors.Join(e, fmt.Errorf("clean up runner: %v", err))
}

if err := b.lib.Cleanup(); err != nil {
if err := b.embedBagit.Cleanup(); err != nil {
e = errors.Join(e, fmt.Errorf("clean up bagit: %v", err))
}

if err := b.ep.Cleanup(); err != nil {
if err := b.embedPython.Cleanup(); err != nil {
e = errors.Join(e, fmt.Errorf("clean up python: %v", err))
}

Expand All @@ -223,54 +158,3 @@ func (b *BagIt) Cleanup() error {

return e
}

type args struct {
Cmd string `json:"cmd"`
Opts any `json:"opts"`
}

// runnerInstance is an instance of a Python interpreter executing the
// bagit-python runner.
type runnerInstance struct {
cmd *exec.Cmd
stdin io.WriteCloser
stdout io.ReadCloser
}

// send a command to the runner.
func (i *runnerInstance) send(args args) error {
blob, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("encode args: %v", err)
}
blob = append(blob, '\n')

_, err = i.stdin.Write(blob)
if err != nil {
return fmt.Errorf("write blob: %v", err)
}

return nil
}

func (i *runnerInstance) stop() error {
var e error

if err := i.stdin.Close(); err != nil {
e = errors.Join(e, err)
}

if err := i.stdout.Close(); err != nil {
e = errors.Join(e, err)
}

if err := i.cmd.Process.Kill(); err != nil {
e = errors.Join(e, err)
}

if _, err := i.cmd.Process.Wait(); err != nil {
e = errors.Join(e, err)
}

return e
}
56 changes: 39 additions & 17 deletions bagit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,66 @@ func setUp(t *testing.T) *bagit.BagIt {
return b
}

func TestValidateBag(t *testing.T) {
func TestConcurrency(t *testing.T) {
t.Parallel()

t.Run("Fails validation", func(t *testing.T) {
t.Run("Returns ErrBusy if the resource is busy", func(t *testing.T) {
t.Parallel()

b := setUp(t)

err := b.Validate("/tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333")
assert.Error(t, err, "invalid: Expected bagit.txt does not exist: /tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333/bagit.txt")
var g errgroup.Group
for i := 0; i < 10; i++ {
g.Go(func() error {
return b.Validate("internal/testdata/valid-bag")
})
}

err := g.Wait()
assert.ErrorIs(t, err, bagit.ErrBusy)
})

t.Run("Validates bag", func(t *testing.T) {
t.Run("Parallel execution", func(t *testing.T) {
t.Parallel()

b := setUp(t)
// *bagit.BagIt is not shareable, each goroutine creates its own.
var g errgroup.Group
for i := 0; i < 3; i++ {
g.Go(func() error {
b := setUp(t)
return b.Validate("internal/testdata/valid-bag")
})
}

err := b.Validate("internal/testdata/valid-bag")
err := g.Wait()
assert.NilError(t, err)
})
}

t.Run("Validates bag concurrently", func(t *testing.T) {
func TestValidateBag(t *testing.T) {
t.Parallel()

t.Run("Fails validation", func(t *testing.T) {
t.Parallel()

b := setUp(t)

// This test should pass because each call to Validate() creates its own
// distinct Python interpreter instance.
var g errgroup.Group
for i := 0; i < 10; i++ {
g.Go(func() error {
return b.Validate("internal/testdata/valid-bag")
})
}
err := b.Validate("/tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333")
assert.Error(t, err, "invalid: Expected bagit.txt does not exist: /tmp/691b8e7f-e6b7-41dd-bc47-868e2ff69333/bagit.txt")
})

err := g.Wait()
t.Run("Validates bag", func(t *testing.T) {
t.Parallel()

b := setUp(t)

err := b.Validate("internal/testdata/valid-bag")
assert.NilError(t, err)
})
}

func TestMakeBag(t *testing.T) {
t.Parallel()

t.Run("Creates bag", func(t *testing.T) {
t.Parallel()
Expand Down
Loading

0 comments on commit cc66397

Please sign in to comment.