Skip to content

Commit

Permalink
workflow: deliver receipts sequentially
Browse files Browse the repository at this point in the history
* Send receipts only when processing succeeded.
* Send "prod" only when "hari" succeeds.
  • Loading branch information
sevein committed Jan 21, 2020
1 parent f7b4261 commit fc3eb29
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 38 deletions.
2 changes: 1 addition & 1 deletion internal/nha/activities/activities.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package activities

const (
var (
UpdateHARIActivityName = "update-hari-activity"
UpdateProductionSystemActivityName = "update-production-system-activity"
)
97 changes: 60 additions & 37 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package workflow

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -221,44 +220,22 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
return sessErr
}

// Activities that we want to run within the session regardless the
// result. E.g. receipts, clean-ups, etc...
// Passing the activity lets the activity determine if the process failed.
var futures []workflow.Future
var receiptsFailed bool
activityOpts := withActivityOptsForRequest(sessCtx)
if status == collection.StatusDone {
if disabled, _ := manager.HookAttrBool(tinfo.Hooks, "hari", "disabled"); !disabled {
futures = append(futures, workflow.ExecuteActivity(activityOpts, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
SIPID: tinfo.SIPID,
StoredAt: tinfo.StoredAt,
FullPath: tinfo.Bundle.FullPath,
PipelineName: tinfo.Event.PipelineName,
NameInfo: nameInfo,
}))
}
}
if disabled, _ := manager.HookAttrBool(tinfo.Hooks, "prod", "disabled"); !disabled {
futures = append(futures, workflow.ExecuteActivity(activityOpts, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
// Deliver receipts.
{
err := sendReceipts(sessCtx, tinfo.Hooks, &sendReceiptsParams{
SIPID: tinfo.SIPID,
StoredAt: tinfo.StoredAt,
FullPath: tinfo.Bundle.FullPath,
PipelineName: tinfo.Event.PipelineName,
Status: status,
NameInfo: nameInfo,
}))
}
for _, f := range futures {
if err := f.Get(activityOpts, nil); err != nil {
receiptsFailed = true
Status: status,
})
if err != nil {
status = collection.StatusError
workflow.CompleteSession(sessCtx)
return fmt.Errorf("error delivering receipt(s): %v", err)
}
}
// This causes the workflow to fail when hooks are errorful. In the future,
// we'd prefer to give the user to decide what to do next, e.g. start over,
// retry hooks individually, etc...
if receiptsFailed {
status = collection.StatusError
workflow.CompleteSession(sessCtx)
return errors.New("at least one hook/receipt activity failed")
}

// Clean-up is the last activity that depends on the session.
// We'll close it as soon as the activity completes.
Expand All @@ -276,8 +253,8 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
// Hide packages from Archivematica Dashboard.
{
if status == collection.StatusDone {
futures = []workflow.Future{}
activityOpts = withActivityOptsForRequest(ctx)
futures := []workflow.Future{}
activityOpts := withActivityOptsForRequest(ctx)
futures = append(futures, workflow.ExecuteActivity(activityOpts, activities.HidePackageActivityName, tinfo.TransferID, "transfer", tinfo.Event.PipelineName))
futures = append(futures, workflow.ExecuteActivity(activityOpts, activities.HidePackageActivityName, tinfo.SIPID, "ingest", tinfo.Event.PipelineName))
for _, f := range futures {
Expand All @@ -293,7 +270,7 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
if err != nil {
logger.Warn("Retention policy timer failed", zap.Error(err))
} else {
activityOpts = withActivityOptsForRequest(ctx)
activityOpts := withActivityOptsForRequest(ctx)
_ = workflow.ExecuteActivity(activityOpts, activities.DeleteOriginalActivityName, tinfo.Event).Get(activityOpts, nil)
}
}
Expand Down Expand Up @@ -399,3 +376,49 @@ func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workfl

return nil
}

type sendReceiptsParams struct {
SIPID string
StoredAt time.Time
FullPath string
PipelineName string
NameInfo nha.NameInfo
Status collection.Status
}

func sendReceipts(ctx workflow.Context, hooks map[string]map[string]interface{}, params *sendReceiptsParams) error {
if params.Status != collection.StatusDone {
return nil
}

ctx = withActivityOptsForRequest(ctx)

if disabled, _ := manager.HookAttrBool(hooks, "hari", "disabled"); !disabled {
err := workflow.ExecuteActivity(ctx, nha_activities.UpdateHARIActivityName, &nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
}).Get(ctx, nil)

if err != nil {
return fmt.Errorf("error sending hari receipt: %v", err)
}
}

if disabled, _ := manager.HookAttrBool(hooks, "prod", "disabled"); !disabled {
err := workflow.ExecuteActivity(ctx, nha_activities.UpdateProductionSystemActivityName, &nha_activities.UpdateProductionSystemActivityParams{
StoredAt: params.StoredAt,
PipelineName: params.PipelineName,
Status: params.Status,
NameInfo: params.NameInfo,
}).Get(ctx, nil)

if err != nil {
return fmt.Errorf("error sending prod receipt: %v", err)
}
}

return nil
}
185 changes: 185 additions & 0 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package workflow

import (
"errors"
"testing"
"time"

"github.com/artefactual-labs/enduro/internal/collection"
collectionfake "github.com/artefactual-labs/enduro/internal/collection/fake"
"github.com/artefactual-labs/enduro/internal/nha"
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
"github.com/artefactual-labs/enduro/internal/pipeline"
watcherfake "github.com/artefactual-labs/enduro/internal/watcher/fake"
"github.com/artefactual-labs/enduro/internal/workflow/manager"

logrtesting "github.com/go-logr/logr/testing"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/cadence"
cadenceactivity "go.uber.org/cadence/activity"
cadencetestsuite "go.uber.org/cadence/testsuite"
cadenceworkflow "go.uber.org/cadence/workflow"
"go.uber.org/zap"
)

// sendReceipts is a no-op when the status is "error".
func TestSendReceiptsNoop(t *testing.T) {
wts := cadencetestsuite.WorkflowTestSuite{}
wts.SetLogger(zap.NewNop())
env := wts.NewTestWorkflowEnvironment()

m := buildManager(t, gomock.NewController(t))

wf := func(ctx cadenceworkflow.Context, hooks map[string]map[string]interface{}, params *sendReceiptsParams) error {
return sendReceipts(ctx, hooks, params)
}
cadenceworkflow.Register(wf)

env.ExecuteWorkflow(wf, m.Hooks, &sendReceiptsParams{
Status: collection.StatusError,
})

assert.True(t, env.IsWorkflowCompleted())
assertNilWorkflowError(t, env.GetWorkflowError())
env.AssertExpectations(t)
}

// sendReceipts exits immediately after an activity error, ensuring that
// receipt delivery is halted once one delivery has failed.
func TestSendReceiptsSequentialBehavior(t *testing.T) {
wts := cadencetestsuite.WorkflowTestSuite{}
wts.SetLogger(zap.NewNop())
env := wts.NewTestWorkflowEnvironment()

m := buildManager(t, gomock.NewController(t))

wf := func(ctx cadenceworkflow.Context, hooks map[string]map[string]interface{}, params *sendReceiptsParams) error {
return sendReceipts(ctx, hooks, params)
}
cadenceworkflow.Register(wf)

nha_activities.UpdateHARIActivityName = uuid.New().String()
cadenceactivity.RegisterWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, cadenceactivity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})

nha_activities.UpdateProductionSystemActivityName = uuid.New().String()
cadenceactivity.RegisterWithOptions(nha_activities.NewUpdateProductionSystemActivity(m).Execute, cadenceactivity.RegisterOptions{Name: nha_activities.UpdateProductionSystemActivityName})

params := sendReceiptsParams{
SIPID: "91e3ed2f-b798-4f4e-9133-74193f0d6a4f",
StoredAt: time.Now().UTC(),
FullPath: "/",
PipelineName: "pipeline",
NameInfo: nha.NameInfo{},
Status: collection.StatusDone,
}

// Make HARI fail so the workflow returns immediately.
env.OnActivity(
nha_activities.UpdateHARIActivityName,
mock.Anything,
&nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
},
).Return(errors.New("failed")).Once()

env.ExecuteWorkflow(wf, m.Hooks, &params)

assert.True(t, env.IsWorkflowCompleted())
assert.Error(t, env.GetWorkflowError())
env.AssertExpectations(t)
}

func TestSendReceipts(t *testing.T) {
wts := cadencetestsuite.WorkflowTestSuite{}
wts.SetLogger(zap.NewNop())
env := wts.NewTestWorkflowEnvironment()

m := buildManager(t, gomock.NewController(t))

wf := func(ctx cadenceworkflow.Context, hooks map[string]map[string]interface{}, params *sendReceiptsParams) error {
return sendReceipts(ctx, hooks, params)
}
cadenceworkflow.Register(wf)

nha_activities.UpdateHARIActivityName = uuid.New().String()
cadenceactivity.RegisterWithOptions(nha_activities.NewUpdateHARIActivity(m).Execute, cadenceactivity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})

nha_activities.UpdateProductionSystemActivityName = uuid.New().String()
cadenceactivity.RegisterWithOptions(nha_activities.NewUpdateProductionSystemActivity(m).Execute, cadenceactivity.RegisterOptions{Name: nha_activities.UpdateProductionSystemActivityName})

params := sendReceiptsParams{
SIPID: "91e3ed2f-b798-4f4e-9133-74193f0d6a4f",
StoredAt: time.Now().UTC(),
FullPath: "/",
PipelineName: "pipeline",
NameInfo: nha.NameInfo{},
Status: collection.StatusDone,
}

env.OnActivity(
nha_activities.UpdateHARIActivityName,
mock.Anything,
&nha_activities.UpdateHARIActivityParams{
SIPID: params.SIPID,
StoredAt: params.StoredAt,
FullPath: params.FullPath,
PipelineName: params.PipelineName,
NameInfo: params.NameInfo,
},
).Return(nil).Once()

env.OnActivity(
nha_activities.UpdateProductionSystemActivityName,
mock.Anything,
&nha_activities.UpdateProductionSystemActivityParams{
StoredAt: params.StoredAt,
PipelineName: params.PipelineName,
Status: params.Status,
NameInfo: params.NameInfo,
},
).Return(nil).Once()

env.ExecuteWorkflow(wf, m.Hooks, &params)

assert.True(t, env.IsWorkflowCompleted())
assertNilWorkflowError(t, env.GetWorkflowError())
env.AssertExpectations(t)
}

func buildManager(t *testing.T, ctrl *gomock.Controller) *manager.Manager {
t.Helper()

return manager.NewManager(
logrtesting.NullLogger{},
collectionfake.NewMockService(ctrl),
watcherfake.NewMockService(ctrl),
&pipeline.Registry{},
map[string]map[string]interface{}{
"prod": {"disabled": "false"},
"hari": {"disabled": "false"},
},
)
}
func assertNilWorkflowError(t *testing.T, err error) {
t.Helper()

if err == nil {
return
}

if perr, ok := err.(*cadence.CustomError); ok {
var details string
perr.Details(&details)
t.Fatal(details)
} else {
t.Fatal(err.Error())
}

}

0 comments on commit fc3eb29

Please sign in to comment.