diff --git a/_examples/callback/callback_test.go b/_examples/callback/callback_test.go index 495b9c1..f51ff04 100644 --- a/_examples/callback/callback_test.go +++ b/_examples/callback/callback_test.go @@ -25,7 +25,7 @@ func TestCallbackWorkflow(t *testing.T) { wf.Run(ctx) foreignID := "andrew" - runID, err := wf.Trigger(ctx, foreignID, callback.StatusStarted) + runID, err := wf.Trigger(ctx, foreignID) require.Nil(t, err) workflow.TriggerCallbackOn(t, wf, foreignID, runID, callback.StatusStarted, callback.EmailConfirmationResponse{ diff --git a/_examples/connector/connector.go b/_examples/connector/connector.go index eabf2d9..a82d0b0 100644 --- a/_examples/connector/connector.go +++ b/_examples/connector/connector.go @@ -54,7 +54,6 @@ func Workflow(d Deps) *workflow.Workflow[GettingStarted, Status] { _, err := api.Trigger( ctx, e.ForeignID, - StatusStarted, workflow.WithInitialValue[GettingStarted, Status](&GettingStarted{ ReadTheDocs: "✅", }), diff --git a/_examples/gettingstarted/gettingstarted_test.go b/_examples/gettingstarted/gettingstarted_test.go index d33e240..549d44a 100644 --- a/_examples/gettingstarted/gettingstarted_test.go +++ b/_examples/gettingstarted/gettingstarted_test.go @@ -27,7 +27,7 @@ func TestWorkflow(t *testing.T) { wf.Run(ctx) foreignID := "82347982374982374" - _, err := wf.Trigger(ctx, foreignID, gettingstarted.StatusStarted) + _, err := wf.Trigger(ctx, foreignID) require.Nil(t, err) workflow.Require(t, wf, foreignID, gettingstarted.StatusReadTheDocs, gettingstarted.GettingStarted{ diff --git a/_examples/schedule/schedule_test.go b/_examples/schedule/schedule_test.go index 1a63f8c..499252d 100644 --- a/_examples/schedule/schedule_test.go +++ b/_examples/schedule/schedule_test.go @@ -36,7 +36,7 @@ func TestExampleWorkflow(t *testing.T) { foreignID := "hourly-run" go func() { - err := wf.Schedule(foreignID, schedule.StatusStarted, "@hourly") + err := wf.Schedule(foreignID, "@hourly") require.Nil(t, err) }() diff --git a/_examples/timeout/timeout_test.go b/_examples/timeout/timeout_test.go index 5b7ec72..33512f3 100644 --- a/_examples/timeout/timeout_test.go +++ b/_examples/timeout/timeout_test.go @@ -32,7 +32,7 @@ func TestTimeoutWorkflow(t *testing.T) { wf.Run(ctx) foreignID := "andrew" - runID, err := wf.Trigger(ctx, foreignID, timeout.StatusStarted) + runID, err := wf.Trigger(ctx, foreignID) require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, foreignID, runID, timeout.StatusStarted) diff --git a/_examples/webui/main.go b/_examples/webui/main.go index ad8f457..16d57f9 100644 --- a/_examples/webui/main.go +++ b/_examples/webui/main.go @@ -27,7 +27,7 @@ func main() { "Customer 3", } for _, foreignID := range seed { - _, err := w.Trigger(ctx, foreignID, StatusStart) + _, err := w.Trigger(ctx, foreignID) if err != nil { panic(err) } diff --git a/adapters/adaptertest/connector.go b/adapters/adaptertest/connector.go index 3aac095..3261192 100644 --- a/adapters/adaptertest/connector.go +++ b/adapters/adaptertest/connector.go @@ -38,7 +38,6 @@ func RunConnectorTest(t *testing.T, maker func(seedEvents []workflow.ConnectorEv _, err := api.Trigger( ctx, e.ForeignID, - SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&User{ UID: e.ForeignID, }), diff --git a/adapters/adaptertest/eventstreaming.go b/adapters/adaptertest/eventstreaming.go index ad42ccb..5417e71 100644 --- a/adapters/adaptertest/eventstreaming.go +++ b/adapters/adaptertest/eventstreaming.go @@ -158,7 +158,7 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) { u := User{ CountryCode: "GB", } - runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u)) + runId, err := wf.Trigger(ctx, foreignID, workflow.WithInitialValue[User, SyncStatus](&u)) require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet) diff --git a/adapters/reflexstreamer/streamfunc_test.go b/adapters/reflexstreamer/streamfunc_test.go index 69307f6..3d9663d 100644 --- a/adapters/reflexstreamer/streamfunc_test.go +++ b/adapters/reflexstreamer/streamfunc_test.go @@ -24,7 +24,7 @@ func TestStreamFunc(t *testing.T) { wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable) fid := "23847923847" - _, err := wf.Trigger(ctx, fid, statusStart) + _, err := wf.Trigger(ctx, fid) require.Nil(t, err) workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow") @@ -66,7 +66,7 @@ func TestOnComplete(t *testing.T) { wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable) fid := "23847923847" - _, err := wf.Trigger(ctx, fid, statusStart) + _, err := wf.Trigger(ctx, fid) require.Nil(t, err) workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow") diff --git a/autopause_test.go b/autopause_test.go index d2ff3f1..c831b41 100644 --- a/autopause_test.go +++ b/autopause_test.go @@ -46,7 +46,7 @@ func TestRetryOfPausedRecords(t *testing.T) { t.Cleanup(w.Stop) fid := "12345" - _, err := w.Trigger(ctx, fid, StatusStart) + _, err := w.Trigger(ctx, fid) require.NoError(t, err) workflow.Require(t, w, fid, StatusEnd, "") diff --git a/await_test.go b/await_test.go index 1411023..c4d1741 100644 --- a/await_test.go +++ b/await_test.go @@ -33,7 +33,7 @@ func TestAwait(t *testing.T) { wf.Run(ctx) t.Cleanup(wf.Stop) - runID, err := wf.Trigger(ctx, "1", StatusStart) + runID, err := wf.Trigger(ctx, "1") require.Nil(t, err) res, err := wf.Await(ctx, "1", runID, StatusEnd, workflow.WithAwaitPollingFrequency(10*time.Nanosecond)) diff --git a/builder.go b/builder.go index aeb33b5..e471e91 100644 --- a/builder.go +++ b/builder.go @@ -246,9 +246,18 @@ func (b *Builder[Type, Status]) Build( } if len(b.workflow.timeouts) > 0 && b.workflow.timeoutStore == nil { - panic("cannot configure timeouts without providing TimeoutStore for workflow") + panic("Cannot configure timeouts without providing TimeoutStore for workflow") } + graph := b.workflow.statusGraph.Info() + if len(graph.StartingNodes) < 1 { + panic( + "Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.", + ) + } + + b.workflow.defaultStartingPoint = Status(graph.StartingNodes[0]) + return b.workflow } diff --git a/builder_internal_test.go b/builder_internal_test.go index c823123..6086482 100644 --- a/builder_internal_test.go +++ b/builder_internal_test.go @@ -103,6 +103,7 @@ func TestWithClock(t *testing.T) { now := time.Now() clock := clock_testing.NewFakeClock(now) b := NewBuilder[string, testStatus]("determine starting points") + b.AddStep(statusStart, nil, statusMiddle) wf := b.Build(nil, nil, nil, WithClock(clock)) clock.Step(time.Hour) @@ -131,6 +132,7 @@ func TestBuildOptions(t *testing.T) { } b := NewBuilder[string, testStatus]("determine starting points") + b.AddStep(statusStart, nil, statusMiddle) w := b.Build( nil, nil, @@ -207,6 +209,16 @@ func TestAddTimeoutPollingFrequency(t *testing.T) { require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].pollingFrequency) } +func TestDefaultStartingPoint(t *testing.T) { + require.PanicsWithValue(t, + "Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.", + func() { + b := NewBuilder[string, testStatus]("") + _ = b.Build(nil, nil, nil) + }, + ) +} + func TestAddTimeoutDontAllowParallelCount(t *testing.T) { require.PanicsWithValue(t, "Cannot configure parallel timeout", @@ -248,6 +260,7 @@ func TestConnectorConstruction(t *testing.T) { ErrBackOff(time.Hour*6), ) + b.AddStep(statusStart, nil, statusEnd) w := b.Build(nil, nil, nil) for _, config := range w.connectorConfigs { @@ -374,7 +387,7 @@ func TestConfigureTimeoutWithoutTimeoutStore(t *testing.T) { // Should panic as setting a second config of statusStart require.PanicsWithValue(t, - "cannot configure timeouts without providing TimeoutStore for workflow", + "Cannot configure timeouts without providing TimeoutStore for workflow", func() { b.Build( nil, diff --git a/hook_test.go b/hook_test.go index 867e774..588d266 100644 --- a/hook_test.go +++ b/hook_test.go @@ -29,7 +29,7 @@ func TestWorkflow_OnPauseHook(t *testing.T) { }) foreignID := "andrew" - _, err := wf.Trigger(context.Background(), foreignID, StatusStart) + _, err := wf.Trigger(context.Background(), foreignID) require.Nil(t, err) wg.Wait() @@ -51,7 +51,7 @@ func TestWorkflow_OnCancelHook(t *testing.T) { }) foreignID := "andrew" - _, err := wf.Trigger(context.Background(), foreignID, StatusStart) + _, err := wf.Trigger(context.Background(), foreignID) require.Nil(t, err) wg.Wait() @@ -73,7 +73,7 @@ func TestWorkflow_OnCompleteHook(t *testing.T) { }) foreignID := "andrew" - _, err := wf.Trigger(context.Background(), foreignID, StatusStart) + _, err := wf.Trigger(context.Background(), foreignID) require.Nil(t, err) wg.Wait() diff --git a/metrics_test.go b/metrics_test.go index 231cd7e..0769c58 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -358,7 +358,7 @@ func TestRunStateChanges(t *testing.T) { w.Run(ctx) t.Cleanup(w.Stop) - _, err := w.Trigger(ctx, "983467934", StatusStart) + _, err := w.Trigger(ctx, "983467934") require.Nil(t, err) time.Sleep(time.Millisecond * 500) @@ -391,13 +391,13 @@ func TestMetricProcessSkippedEvents(t *testing.T) { w.Run(ctx) t.Cleanup(w.Stop) - _, err := w.Trigger(ctx, "9834679343", StatusStart) + _, err := w.Trigger(ctx, "9834679343") require.Nil(t, err) - _, err = w.Trigger(ctx, "2349839483", StatusStart) + _, err = w.Trigger(ctx, "2349839483") require.Nil(t, err) - _, err = w.Trigger(ctx, "7548702398", StatusStart) + _, err = w.Trigger(ctx, "7548702398") require.Nil(t, err) time.Sleep(time.Millisecond * 500) diff --git a/runstate_test.go b/runstate_test.go index bd397bc..e0ebfdd 100644 --- a/runstate_test.go +++ b/runstate_test.go @@ -68,7 +68,7 @@ func TestRunState(t *testing.T) { t.Cleanup(w.Stop) // Trigger workflow before it's running to assert that the initial state is workflow.RunStateInitiated - runID, err := w.Trigger(ctx, "fid", StatusStart) + runID, err := w.Trigger(ctx, "fid") require.Nil(t, err) time.Sleep(time.Second) diff --git a/schedule.go b/schedule.go index b519388..2eb3a24 100644 --- a/schedule.go +++ b/schedule.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "time" "github.com/robfig/cron/v3" @@ -13,7 +12,6 @@ import ( func (w *Workflow[Type, Status]) Schedule( foreignID string, - startingStatus Status, spec string, opts ...ScheduleOption[Type, Status], ) error { @@ -21,16 +19,6 @@ func (w *Workflow[Type, Status]) Schedule( return fmt.Errorf("schedule failed: workflow is not running") } - if !w.statusGraph.IsValid(int(startingStatus)) { - w.logger.Debug( - w.ctx, - fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name()), - map[string]string{}, - ) - - return fmt.Errorf("schedule failed: status provided is not configured for workflow: %s", startingStatus) - } - var options scheduleOpts[Type, Status] for _, opt := range opts { opt(&options) @@ -41,8 +29,8 @@ func (w *Workflow[Type, Status]) Schedule( return err } - role := makeRole(w.Name(), strconv.FormatInt(int64(startingStatus), 10), foreignID, "scheduler", spec) - processName := makeRole(startingStatus.String(), foreignID, "scheduler", spec) + role := makeRole(w.Name(), foreignID, "scheduler", spec) + processName := makeRole(foreignID, "scheduler", spec) w.launching.Add(1) w.run(role, processName, func(ctx context.Context) error { @@ -92,7 +80,7 @@ func (w *Workflow[Type, Status]) Schedule( return nil } - _, err = w.Trigger(ctx, foreignID, startingStatus, tOpts...) + _, err = w.Trigger(ctx, foreignID, tOpts...) if errors.Is(err, ErrWorkflowInProgress) { // NoReturnErr: Fallthrough to schedule next workflow as there is already one in progress. If this // happens it is likely that we scheduled a workflow and were unable to schedule the next. diff --git a/schedule_test.go b/schedule_test.go index 4587aea..50df4a8 100644 --- a/schedule_test.go +++ b/schedule_test.go @@ -46,7 +46,7 @@ func TestSchedule(t *testing.T) { t.Cleanup(wf.Stop) go func() { - err := wf.Schedule("andrew", StatusStart, "@monthly") + err := wf.Schedule("andrew", "@monthly") require.Nil(t, err) }() @@ -105,7 +105,7 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) { wg.Add(1) go func() { wg.Done() - err := wf.Schedule("andrew", StatusStart, "@monthly") + err := wf.Schedule("andrew", "@monthly") require.Nil(t, err) }() @@ -114,21 +114,21 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) { time.Sleep(200 * time.Millisecond) require.Equal(t, map[string]workflow.State{ - "start-andrew-scheduler-@monthly": workflow.StateRunning, - "start-consumer-1-of-1": workflow.StateRunning, - "outbox-consumer": workflow.StateRunning, - "delete-consumer": workflow.StateRunning, - "paused-records-retry-consumer": workflow.StateRunning, + "andrew-scheduler-@monthly": workflow.StateRunning, + "start-consumer-1-of-1": workflow.StateRunning, + "outbox-consumer": workflow.StateRunning, + "delete-consumer": workflow.StateRunning, + "paused-records-retry-consumer": workflow.StateRunning, }, wf.States()) wf.Stop() require.Equal(t, map[string]workflow.State{ - "start-andrew-scheduler-@monthly": workflow.StateShutdown, - "start-consumer-1-of-1": workflow.StateShutdown, - "outbox-consumer": workflow.StateShutdown, - "delete-consumer": workflow.StateShutdown, - "paused-records-retry-consumer": workflow.StateShutdown, + "andrew-scheduler-@monthly": workflow.StateShutdown, + "start-consumer-1-of-1": workflow.StateShutdown, + "outbox-consumer": workflow.StateShutdown, + "delete-consumer": workflow.StateShutdown, + "paused-records-retry-consumer": workflow.StateShutdown, }, wf.States()) } @@ -169,7 +169,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) { opt := workflow.WithScheduleFilter[MyType, status](filter) go func() { - err := wf.Schedule("andrew", StatusStart, "@monthly", opt) + err := wf.Schedule("andrew", "@monthly", opt) require.Nil(t, err) }() diff --git a/testing_test.go b/testing_test.go index fada468..64ec109 100644 --- a/testing_test.go +++ b/testing_test.go @@ -110,7 +110,7 @@ func TestRequire(t *testing.T) { t.Cleanup(wf.Stop) fid := "10298309123" - _, err := wf.Trigger(ctx, fid, StatusStart) + _, err := wf.Trigger(ctx, fid) require.Nil(t, err) workflow.Require(t, wf, fid, StatusEnd, "Lower") @@ -181,7 +181,7 @@ func TestWaitFor(t *testing.T) { t.Cleanup(wf.Stop) fid := "10298309123" - _, err := wf.Trigger(ctx, fid, StatusStart) + _, err := wf.Trigger(ctx, fid) require.Nil(t, err) workflow.WaitFor(t, wf, fid, func(r *workflow.Run[string, status]) (bool, error) { @@ -197,11 +197,11 @@ func (a apiImpl[Type, Status]) Name() string { return "test" } -func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) { +func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) { return "", nil } -func (a apiImpl[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, opts ...workflow.ScheduleOption[Type, Status]) error { +func (a apiImpl[Type, Status]) Schedule(foreignID string, spec string, opts ...workflow.ScheduleOption[Type, Status]) error { return nil } diff --git a/trigger.go b/trigger.go index 37029d6..99b225c 100644 --- a/trigger.go +++ b/trigger.go @@ -11,10 +11,9 @@ import ( func (w *Workflow[Type, Status]) Trigger( ctx context.Context, foreignID string, - startingStatus Status, opts ...TriggerOption[Type, Status], ) (runID string, err error) { - return trigger(ctx, w, w.recordStore.Latest, foreignID, startingStatus, opts...) + return trigger(ctx, w, w.recordStore.Latest, foreignID, opts...) } func trigger[Type any, Status StatusType]( @@ -22,13 +21,22 @@ func trigger[Type any, Status StatusType]( w *Workflow[Type, Status], lookup latestLookup, foreignID string, - startingStatus Status, opts ...TriggerOption[Type, Status], ) (runID string, err error) { if !w.calledRun { return "", fmt.Errorf("trigger failed: workflow is not running") } + var o triggerOpts[Type, Status] + for _, fn := range opts { + fn(&o) + } + + startingStatus := w.defaultStartingPoint + if o.startingPoint != Status(0) { + startingStatus = o.startingPoint + } + if !w.statusGraph.IsValid(int(startingStatus)) { w.logger.Debug( w.ctx, @@ -39,11 +47,6 @@ func trigger[Type any, Status StatusType]( return "", fmt.Errorf("trigger failed: status provided is not configured for workflow: %s", startingStatus) } - var o triggerOpts[Type, Status] - for _, fn := range opts { - fn(&o) - } - var t Type if o.initialValue != nil { t = *o.initialValue @@ -93,11 +96,18 @@ func trigger[Type any, Status StatusType]( } type triggerOpts[Type any, Status StatusType] struct { - initialValue *Type + startingPoint Status + initialValue *Type } type TriggerOption[Type any, Status StatusType] func(o *triggerOpts[Type, Status]) +func WithStartingPoint[Type any, Status StatusType](startingStatus Status) TriggerOption[Type, Status] { + return func(o *triggerOpts[Type, Status]) { + o.startingPoint = startingStatus + } +} + func WithInitialValue[Type any, Status StatusType](t *Type) TriggerOption[Type, Status] { return func(o *triggerOpts[Type, Status]) { o.initialValue = t diff --git a/trigger_internal_test.go b/trigger_internal_test.go index 413c8f3..e4ca8fc 100644 --- a/trigger_internal_test.go +++ b/trigger_internal_test.go @@ -18,7 +18,7 @@ func Test_trigger(t *testing.T) { t.Run("Expected non-nil error when Trigger called before Run()", func(t *testing.T) { ctx := context.Background() - _, err := trigger(ctx, w, nil, "1", statusStart) + _, err := trigger(ctx, w, nil, "1") require.Equal(t, "trigger failed: workflow is not running", err.Error()) }) @@ -27,7 +27,7 @@ func Test_trigger(t *testing.T) { ctx := context.Background() w.calledRun = true - _, err := trigger(ctx, w, nil, "1", statusEnd) + _, err := trigger(ctx, w, nil, "1", WithStartingPoint[string, testStatus](statusEnd)) require.Equal(t, fmt.Sprintf("trigger failed: status provided is not configured for workflow: %s", statusEnd), err.Error()) }) @@ -42,7 +42,7 @@ func Test_trigger(t *testing.T) { RunState: RunStateRunning, Status: int(statusMiddle), }, nil - }, "1", statusStart) + }, "1") require.True(t, errors.Is(err, ErrWorkflowInProgress)) }) } diff --git a/workflow.go b/workflow.go index 36ec0fd..0e6a3f0 100644 --- a/workflow.go +++ b/workflow.go @@ -19,10 +19,11 @@ type API[Type any, Status StatusType] interface { // Name returns the name of the implemented workflow. Name() string - // Trigger will kickstart a workflow for the provided foreignID starting from the provided starting status. There - // is no limitation as to where you start the workflow from. For workflows that have data preceding the initial - // trigger that needs to be used in the workflow, using WithInitialValue will allow you to provide pre-populated - // fields of Type that can be accessed by the consumers. + // Trigger will kickstart a workflow Run for the provided foreignID starting from the default entrypoint to + // the workflow which is the first "from" status added via the builder + // (e.g. builder.AddStep(FromStatus, func{}, ToStatus). There is no limitation as to where you start the workflow + // from and can do so via the WithStartAt trigger option. WithInitialValue should be used when you need data to be + // present in the workflow Run before it starts. This can be used to reduce the need for duplicating reads. // // foreignID should not be random and should be deterministic for the thing that you are running the workflow for. // This especially helps when connecting other workflows as the foreignID is the only way to connect the streams. The @@ -31,14 +32,13 @@ type API[Type any, Status StatusType] interface { Trigger( ctx context.Context, foreignID string, - startingStatus Status, opts ...TriggerOption[Type, Status], ) (runID string, err error) // Schedule takes a cron spec and will call Trigger at the specified intervals. Schedule is a blocking call and all // schedule errors will be retried indefinitely. The same options are available for Schedule as they are // for Trigger. - Schedule(foreignID string, startingStatus Status, spec string, opts ...ScheduleOption[Type, Status]) error + Schedule(foreignID string, spec string, opts ...ScheduleOption[Type, Status]) error // Await is a blocking call that returns the typed Run when the workflow of the specified run ID reaches the // specified status. @@ -93,9 +93,11 @@ type Workflow[Type any, Status StatusType] struct { // and block until this transition is complete. launching sync.WaitGroup - statusGraph *graph.Graph + // defaultStartingPoint defines that status that the workflow run will start on when Trigger is called. + defaultStartingPoint Status + statusGraph *graph.Graph // errorCounter keeps a central in-mem state of errors from consumers and timeouts in order to implement - // PauseAfterErrCount. The tracking of errors is done in a way where errors need to be unique per process + // PauseAfterstatusGraphErrCount. The tracking of errors is done in a way where errors need to be unique per process // (consumer / timeout). errorCounter errorcounter.ErrorCounter } diff --git a/workflow_test.go b/workflow_test.go index 8bff02e..921a9b7 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -123,7 +123,7 @@ func TestWorkflowAcceptanceTest(t *testing.T) { UserID: expectedUserID, } - runID, err := wf.Trigger(ctx, fid, StatusInitiated, workflow.WithInitialValue[MyType, status](&mt)) + runID, err := wf.Trigger(ctx, fid, workflow.WithInitialValue[MyType, status](&mt)) require.Nil(t, err) // Once in the correct status, trigger third party callbacks @@ -228,7 +228,7 @@ func benchmarkWorkflow(b *testing.B, numberOfSteps int) { UserID: expectedUserID, } for range b.N { - _, err := wf.Trigger(ctx, fid, 0, workflow.WithInitialValue[MyType, status](&mt)) + _, err := wf.Trigger(ctx, fid, workflow.WithInitialValue[MyType, status](&mt)) if err != nil { b.Fatal(err) } @@ -276,7 +276,7 @@ func TestTimeout(t *testing.T) { wf.Run(ctx) t.Cleanup(wf.Stop) - runID, err := wf.Trigger(ctx, "example", StatusInitiated) + runID, err := wf.Trigger(ctx, "example") require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, "example", runID, StatusProfileCreated) @@ -399,10 +399,10 @@ func TestWorkflow_ErrWorkflowNotRunning(t *testing.T) { cancel() }) - _, err := wf.Trigger(ctx, "andrew", StatusStart) + _, err := wf.Trigger(ctx, "andrew") require.Equal(t, "trigger failed: workflow is not running", err.Error()) - err = wf.Schedule("andrew", StatusStart, "@monthly") + err = wf.Schedule("andrew", "@monthly") require.Equal(t, "schedule failed: workflow is not running", err.Error()) } @@ -434,7 +434,7 @@ func TestWorkflow_TestingRequire(t *testing.T) { t.Cleanup(wf.Stop) foreignID := "andrew" - _, err := wf.Trigger(ctx, foreignID, StatusStart) + _, err := wf.Trigger(ctx, foreignID) require.Nil(t, err) expected := MyType{ @@ -487,7 +487,7 @@ func TestTimeTimerFunc(t *testing.T) { wf.Run(ctx) t.Cleanup(wf.Stop) - runID, err := wf.Trigger(ctx, "Andrew Wormald", StatusStart) + runID, err := wf.Trigger(ctx, "Andrew Wormald") require.Nil(t, err) workflow.AwaitTimeoutInsert(t, wf, "Andrew Wormald", runID, StatusStart) @@ -525,7 +525,7 @@ func TestConnector(t *testing.T) { "my-test-connector", connector, func(ctx context.Context, api workflow.API[typeX, status], e *workflow.ConnectorEvent) error { - _, err := api.Trigger(ctx, e.ForeignID, StatusStart, workflow.WithInitialValue[typeX, status](&typeX{ + _, err := api.Trigger(ctx, e.ForeignID, workflow.WithInitialValue[typeX, status](&typeX{ Val: "trigger set value", })) if err != nil { @@ -599,7 +599,7 @@ func TestStepConsumerLag(t *testing.T) { t.Cleanup(wf.Stop) foreignID := "1" - _, err := wf.Trigger(ctx, foreignID, StatusStart, workflow.WithInitialValue[TimeWatcher, status](&TimeWatcher{ + _, err := wf.Trigger(ctx, foreignID, workflow.WithInitialValue[TimeWatcher, status](&TimeWatcher{ StartTime: clock.Now(), })) require.Nil(t, err) @@ -621,7 +621,9 @@ func TestStepConsumerLag(t *testing.T) { } func TestName(t *testing.T) { - wf := workflow.NewBuilder[string, status]("test name").Build(nil, nil, nil) + b := workflow.NewBuilder[string, status]("test name") + b.AddStep(StatusStart, nil, StatusEnd) + wf := b.Build(nil, nil, nil) require.Equal(t, "test name", wf.Name()) }