Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove watcher from manager #580

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions internal/nha/activities/hari_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -509,7 +508,6 @@ func createHariActivity(t *testing.T, hariConfig map[string]interface{}) *Update
manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
hooks,
)
Expand Down
2 changes: 0 additions & 2 deletions internal/nha/activities/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -165,7 +164,6 @@ func createProdActivity(t *testing.T, hookConfig map[string]interface{}) *Update
manager := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": hookConfig,
Expand Down
9 changes: 4 additions & 5 deletions internal/workflow/activities/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import (
"github.com/artefactual-labs/enduro/internal/bundler"
"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

type BundleActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewBundleActivity(m *manager.Manager) *BundleActivity {
return &BundleActivity{manager: m}
func NewBundleActivity(wsvc watcher.Service) *BundleActivity {
return &BundleActivity{wsvc: wsvc}
}

type BundleActivityParams struct {
Expand Down Expand Up @@ -74,7 +73,7 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara
}
} else if params.IsDir {
var w watcher.Watcher
w, err = a.manager.Watcher.ByName(params.WatcherName)
w, err = a.wsvc.ByName(params.WatcherName)
if err == nil {
src := filepath.Join(w.Path(), params.Key)
dst := params.TransferDir
Expand Down
16 changes: 1 addition & 15 deletions internal/workflow/activities/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ import (
"syscall"
"testing"

"github.com/go-logr/logr"
temporalsdk_testsuite "go.temporal.io/sdk/testsuite"
"go.uber.org/mock/gomock"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"

collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/watcher"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

func TestBundleActivity(t *testing.T) {
Expand All @@ -26,17 +22,7 @@ func TestBundleActivity(t *testing.T) {
t.Run("Excludes hidden files", func(t *testing.T) {
ctrl := gomock.NewController(t)
wsvc := watcherfake.NewMockService(ctrl)
m := manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
wsvc,
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
},
)
activity := NewBundleActivity(m)
activity := NewBundleActivity(wsvc)
ts := &temporalsdk_testsuite.WorkflowTestSuite{}
env := ts.NewTestActivityEnvironment()
env.RegisterActivity(activity.Execute)
Expand Down
10 changes: 3 additions & 7 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
"context"
"fmt"
"os"

"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// CleanUpActivity removes the contents that we've created in the TS location.
type CleanUpActivity struct {
manager *manager.Manager
}
type CleanUpActivity struct{}

func NewCleanUpActivity(m *manager.Manager) *CleanUpActivity {
return &CleanUpActivity{manager: m}
func NewCleanUpActivity() *CleanUpActivity {
return &CleanUpActivity{}

Check warning on line 13 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L13

Added line #L13 was not covered by tests
}

type CleanUpActivityParams struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/delete_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
"os"
"path/filepath"

"github.com/artefactual-labs/enduro/internal/workflow/manager"
"github.com/artefactual-labs/enduro/internal/watcher"
)

type DeleteOriginalActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDeleteOriginalActivity(m *manager.Manager) *DeleteOriginalActivity {
return &DeleteOriginalActivity{manager: m}
func NewDeleteOriginalActivity(wsvc watcher.Service) *DeleteOriginalActivity {
return &DeleteOriginalActivity{wsvc: wsvc}

Check warning on line 16 in internal/workflow/activities/delete_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/delete_original.go#L16

Added line #L16 was not covered by tests
}

func (a *DeleteOriginalActivity) Execute(ctx context.Context, watcherName, batchDir, key string) error {
if batchDir != "" {
return deleteOriginalFromBatch(batchDir, key)
}
return a.manager.Watcher.Delete(ctx, watcherName, key)
return a.wsvc.Delete(ctx, watcherName, key)

Check warning on line 23 in internal/workflow/activities/delete_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/delete_original.go#L23

Added line #L23 was not covered by tests
}

func deleteOriginalFromBatch(batchDir, key string) error {
Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/activities/dispose_original.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
"path/filepath"

"github.com/artefactual-labs/enduro/internal/fsutil"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
"github.com/artefactual-labs/enduro/internal/watcher"
)

type DisposeOriginalActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDisposeOriginalActivity(m *manager.Manager) *DisposeOriginalActivity {
return &DisposeOriginalActivity{manager: m}
func NewDisposeOriginalActivity(wsvc watcher.Service) *DisposeOriginalActivity {
return &DisposeOriginalActivity{wsvc: wsvc}

Check warning on line 16 in internal/workflow/activities/dispose_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/dispose_original.go#L16

Added line #L16 was not covered by tests
}

func (a *DisposeOriginalActivity) Execute(ctx context.Context, watcherName, completedDir, batchDir, key string) error {
if batchDir != "" {
return disposeOriginalFromBatch(completedDir, batchDir, key)
}
return a.manager.Watcher.Dispose(ctx, watcherName, key)
return a.wsvc.Dispose(ctx, watcherName, key)

Check warning on line 23 in internal/workflow/activities/dispose_original.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/dispose_original.go#L23

Added line #L23 was not covered by tests
}

func disposeOriginalFromBatch(completedDir, batchDir, key string) error {
Expand Down
8 changes: 5 additions & 3 deletions internal/workflow/activities/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"fmt"

"github.com/artefactual-labs/enduro/internal/temporal"
"github.com/artefactual-labs/enduro/internal/watcher"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

// DownloadActivity downloads the blob into the pipeline processing directory.
type DownloadActivity struct {
manager *manager.Manager
wsvc watcher.Service
}

func NewDownloadActivity(m *manager.Manager) *DownloadActivity {
return &DownloadActivity{manager: m}
func NewDownloadActivity(m *manager.Manager, wsvc watcher.Service) *DownloadActivity {
return &DownloadActivity{manager: m, wsvc: wsvc}

Check warning on line 19 in internal/workflow/activities/download.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/download.go#L19

Added line #L19 was not covered by tests
}

func (a *DownloadActivity) Execute(ctx context.Context, pipelineName, watcherName, key string) (string, error) {
Expand All @@ -29,7 +31,7 @@
}
defer file.Close()

if err := a.manager.Watcher.Download(ctx, file, watcherName, key); err != nil {
if err := a.wsvc.Download(ctx, file, watcherName, key); err != nil {

Check warning on line 34 in internal/workflow/activities/download.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/download.go#L34

Added line #L34 was not covered by tests
return "", temporal.NewNonRetryableError(fmt.Errorf("error downloading blob: %v", err))
}

Expand Down
5 changes: 1 addition & 4 deletions internal/workflow/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,21 @@ import (

"github.com/artefactual-labs/enduro/internal/collection"
"github.com/artefactual-labs/enduro/internal/pipeline"
"github.com/artefactual-labs/enduro/internal/watcher"
)

// Manager carries workflow and activity dependencies.
type Manager struct {
Logger logr.Logger
Collection collection.Service
Watcher watcher.Service
Pipelines *pipeline.Registry
Hooks map[string]map[string]interface{}
}

// NewManager returns a pointer to a new Manager.
func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
func NewManager(logger logr.Logger, colsvc collection.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
return &Manager{
Logger: logger,
Collection: colsvc,
Watcher: wsvc,
Pipelines: pipelines,
Hooks: hooks,
}
Expand Down
2 changes: 0 additions & 2 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"
)

Expand Down Expand Up @@ -126,7 +125,6 @@ func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {
return manager.NewManager(
logr.Discard(),
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
Expand Down
12 changes: 6 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func main() {
// TODO: this is a temporary workaround for dependency injection until we
// figure out what's the depdencency tree is going to look like after POC.
// The share-everything pattern should be avoided.
m := manager.NewManager(logger, colsvc, wsvc, pipelineRegistry, config.Hooks)
m := manager.NewManager(logger, colsvc, pipelineRegistry, config.Hooks)

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
Expand All @@ -264,16 +264,16 @@ func main() {

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(m).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewDownloadActivity(m, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
w.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
w.RegisterActivityWithOptions(activities.NewTransferActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(m).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})

w.RegisterActivityWithOptions(workflow.NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: workflow.AsyncCompletionActivityName})
Expand Down
Loading