Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trigger: Remove need for specifying starting point #107

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/callback/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCallbackWorkflow(t *testing.T) {
wf.Run(ctx)

foreignID := "andrew"
runID, err := wf.Trigger(ctx, foreignID, callback.StatusStarted)
runID, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)

workflow.TriggerCallbackOn(t, wf, foreignID, runID, callback.StatusStarted, callback.EmailConfirmationResponse{
Expand Down
1 change: 0 additions & 1 deletion _examples/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func Workflow(d Deps) *workflow.Workflow[GettingStarted, Status] {
_, err := api.Trigger(
ctx,
e.ForeignID,
StatusStarted,
workflow.WithInitialValue[GettingStarted, Status](&GettingStarted{
ReadTheDocs: "✅",
}),
Expand Down
2 changes: 1 addition & 1 deletion _examples/gettingstarted/gettingstarted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWorkflow(t *testing.T) {
wf.Run(ctx)

foreignID := "82347982374982374"
_, err := wf.Trigger(ctx, foreignID, gettingstarted.StatusStarted)
_, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)

workflow.Require(t, wf, foreignID, gettingstarted.StatusReadTheDocs, gettingstarted.GettingStarted{
Expand Down
2 changes: 1 addition & 1 deletion _examples/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestExampleWorkflow(t *testing.T) {
foreignID := "hourly-run"

go func() {
err := wf.Schedule(foreignID, schedule.StatusStarted, "@hourly")
err := wf.Schedule(foreignID, "@hourly")
require.Nil(t, err)
}()

Expand Down
2 changes: 1 addition & 1 deletion _examples/timeout/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTimeoutWorkflow(t *testing.T) {
wf.Run(ctx)

foreignID := "andrew"
runID, err := wf.Trigger(ctx, foreignID, timeout.StatusStarted)
runID, err := wf.Trigger(ctx, foreignID)
require.Nil(t, err)

workflow.AwaitTimeoutInsert(t, wf, foreignID, runID, timeout.StatusStarted)
Expand Down
2 changes: 1 addition & 1 deletion _examples/webui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func main() {
"Customer 3",
}
for _, foreignID := range seed {
_, err := w.Trigger(ctx, foreignID, StatusStart)
_, err := w.Trigger(ctx, foreignID)
if err != nil {
panic(err)
}
Expand Down
1 change: 0 additions & 1 deletion adapters/adaptertest/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func RunConnectorTest(t *testing.T, maker func(seedEvents []workflow.ConnectorEv
_, err := api.Trigger(
ctx,
e.ForeignID,
SyncStatusStarted,
workflow.WithInitialValue[User, SyncStatus](&User{
UID: e.ForeignID,
}),
Expand Down
2 changes: 1 addition & 1 deletion adapters/adaptertest/eventstreaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func RunEventStreamerTest(t *testing.T, factory func() workflow.EventStreamer) {
u := User{
CountryCode: "GB",
}
runId, err := wf.Trigger(ctx, foreignID, SyncStatusStarted, workflow.WithInitialValue[User, SyncStatus](&u))
runId, err := wf.Trigger(ctx, foreignID, workflow.WithInitialValue[User, SyncStatus](&u))
require.Nil(t, err)

workflow.AwaitTimeoutInsert(t, wf, foreignID, runId, SyncStatusEmailSet)
Expand Down
4 changes: 2 additions & 2 deletions adapters/reflexstreamer/streamfunc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestStreamFunc(t *testing.T) {
wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable)

fid := "23847923847"
_, err := wf.Trigger(ctx, fid, statusStart)
_, err := wf.Trigger(ctx, fid)
require.Nil(t, err)

workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow")
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestOnComplete(t *testing.T) {
wf, store, ctx, cancel := createTestWorkflow(t, dbc, eventsTable)

fid := "23847923847"
_, err := wf.Trigger(ctx, fid, statusStart)
_, err := wf.Trigger(ctx, fid)
require.Nil(t, err)

workflow.Require(t, wf, fid, statusEnd, "Started and Completed in a Workflow")
Expand Down
2 changes: 1 addition & 1 deletion autopause_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestRetryOfPausedRecords(t *testing.T) {
t.Cleanup(w.Stop)

fid := "12345"
_, err := w.Trigger(ctx, fid, StatusStart)
_, err := w.Trigger(ctx, fid)
require.NoError(t, err)

workflow.Require(t, w, fid, StatusEnd, "")
Expand Down
2 changes: 1 addition & 1 deletion await_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestAwait(t *testing.T) {
wf.Run(ctx)
t.Cleanup(wf.Stop)

runID, err := wf.Trigger(ctx, "1", StatusStart)
runID, err := wf.Trigger(ctx, "1")
require.Nil(t, err)

res, err := wf.Await(ctx, "1", runID, StatusEnd, workflow.WithAwaitPollingFrequency(10*time.Nanosecond))
Expand Down
11 changes: 10 additions & 1 deletion builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,18 @@ func (b *Builder[Type, Status]) Build(
}

if len(b.workflow.timeouts) > 0 && b.workflow.timeoutStore == nil {
panic("cannot configure timeouts without providing TimeoutStore for workflow")
panic("Cannot configure timeouts without providing TimeoutStore for workflow")
}

graph := b.workflow.statusGraph.Info()
if len(graph.StartingNodes) < 1 {
panic(
"Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.",
)
}

b.workflow.defaultStartingPoint = Status(graph.StartingNodes[0])

return b.workflow
}

Expand Down
15 changes: 14 additions & 1 deletion builder_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestWithClock(t *testing.T) {
now := time.Now()
clock := clock_testing.NewFakeClock(now)
b := NewBuilder[string, testStatus]("determine starting points")
b.AddStep(statusStart, nil, statusMiddle)
wf := b.Build(nil, nil, nil, WithClock(clock))

clock.Step(time.Hour)
Expand Down Expand Up @@ -131,6 +132,7 @@ func TestBuildOptions(t *testing.T) {
}

b := NewBuilder[string, testStatus]("determine starting points")
b.AddStep(statusStart, nil, statusMiddle)
w := b.Build(
nil,
nil,
Expand Down Expand Up @@ -207,6 +209,16 @@ func TestAddTimeoutPollingFrequency(t *testing.T) {
require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].pollingFrequency)
}

func TestDefaultStartingPoint(t *testing.T) {
require.PanicsWithValue(t,
"Workflow requires at least one starting point. Please provide at least one Step, Callback, or Timeout to add a starting point.",
func() {
b := NewBuilder[string, testStatus]("")
_ = b.Build(nil, nil, nil)
},
)
}

func TestAddTimeoutDontAllowParallelCount(t *testing.T) {
require.PanicsWithValue(t,
"Cannot configure parallel timeout",
Expand Down Expand Up @@ -248,6 +260,7 @@ func TestConnectorConstruction(t *testing.T) {
ErrBackOff(time.Hour*6),
)

b.AddStep(statusStart, nil, statusEnd)
w := b.Build(nil, nil, nil)

for _, config := range w.connectorConfigs {
Expand Down Expand Up @@ -374,7 +387,7 @@ func TestConfigureTimeoutWithoutTimeoutStore(t *testing.T) {

// Should panic as setting a second config of statusStart
require.PanicsWithValue(t,
"cannot configure timeouts without providing TimeoutStore for workflow",
"Cannot configure timeouts without providing TimeoutStore for workflow",
func() {
b.Build(
nil,
Expand Down
6 changes: 3 additions & 3 deletions hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestWorkflow_OnPauseHook(t *testing.T) {
})

foreignID := "andrew"
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
_, err := wf.Trigger(context.Background(), foreignID)
require.Nil(t, err)

wg.Wait()
Expand All @@ -51,7 +51,7 @@ func TestWorkflow_OnCancelHook(t *testing.T) {
})

foreignID := "andrew"
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
_, err := wf.Trigger(context.Background(), foreignID)
require.Nil(t, err)

wg.Wait()
Expand All @@ -73,7 +73,7 @@ func TestWorkflow_OnCompleteHook(t *testing.T) {
})

foreignID := "andrew"
_, err := wf.Trigger(context.Background(), foreignID, StatusStart)
_, err := wf.Trigger(context.Background(), foreignID)
require.Nil(t, err)

wg.Wait()
Expand Down
8 changes: 4 additions & 4 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestRunStateChanges(t *testing.T) {
w.Run(ctx)
t.Cleanup(w.Stop)

_, err := w.Trigger(ctx, "983467934", StatusStart)
_, err := w.Trigger(ctx, "983467934")
require.Nil(t, err)

time.Sleep(time.Millisecond * 500)
Expand Down Expand Up @@ -391,13 +391,13 @@ func TestMetricProcessSkippedEvents(t *testing.T) {
w.Run(ctx)
t.Cleanup(w.Stop)

_, err := w.Trigger(ctx, "9834679343", StatusStart)
_, err := w.Trigger(ctx, "9834679343")
require.Nil(t, err)

_, err = w.Trigger(ctx, "2349839483", StatusStart)
_, err = w.Trigger(ctx, "2349839483")
require.Nil(t, err)

_, err = w.Trigger(ctx, "7548702398", StatusStart)
_, err = w.Trigger(ctx, "7548702398")
require.Nil(t, err)

time.Sleep(time.Millisecond * 500)
Expand Down
2 changes: 1 addition & 1 deletion runstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestRunState(t *testing.T) {
t.Cleanup(w.Stop)

// Trigger workflow before it's running to assert that the initial state is workflow.RunStateInitiated
runID, err := w.Trigger(ctx, "fid", StatusStart)
runID, err := w.Trigger(ctx, "fid")
require.Nil(t, err)

time.Sleep(time.Second)
Expand Down
18 changes: 3 additions & 15 deletions schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/robfig/cron/v3"
Expand All @@ -13,24 +12,13 @@ import (

func (w *Workflow[Type, Status]) Schedule(
foreignID string,
startingStatus Status,
spec string,
opts ...ScheduleOption[Type, Status],
) error {
if !w.calledRun {
return fmt.Errorf("schedule failed: workflow is not running")
}

if !w.statusGraph.IsValid(int(startingStatus)) {
w.logger.Debug(
w.ctx,
fmt.Sprintf("ensure %v is configured for workflow: %v", startingStatus, w.Name()),
map[string]string{},
)

return fmt.Errorf("schedule failed: status provided is not configured for workflow: %s", startingStatus)
}

var options scheduleOpts[Type, Status]
for _, opt := range opts {
opt(&options)
Expand All @@ -41,8 +29,8 @@ func (w *Workflow[Type, Status]) Schedule(
return err
}

role := makeRole(w.Name(), strconv.FormatInt(int64(startingStatus), 10), foreignID, "scheduler", spec)
processName := makeRole(startingStatus.String(), foreignID, "scheduler", spec)
role := makeRole(w.Name(), foreignID, "scheduler", spec)
processName := makeRole(foreignID, "scheduler", spec)

w.launching.Add(1)
w.run(role, processName, func(ctx context.Context) error {
Expand Down Expand Up @@ -92,7 +80,7 @@ func (w *Workflow[Type, Status]) Schedule(
return nil
}

_, err = w.Trigger(ctx, foreignID, startingStatus, tOpts...)
_, err = w.Trigger(ctx, foreignID, tOpts...)
if errors.Is(err, ErrWorkflowInProgress) {
// NoReturnErr: Fallthrough to schedule next workflow as there is already one in progress. If this
// happens it is likely that we scheduled a workflow and were unable to schedule the next.
Expand Down
26 changes: 13 additions & 13 deletions schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSchedule(t *testing.T) {
t.Cleanup(wf.Stop)

go func() {
err := wf.Schedule("andrew", StatusStart, "@monthly")
err := wf.Schedule("andrew", "@monthly")
require.Nil(t, err)
}()

Expand Down Expand Up @@ -105,7 +105,7 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) {
wg.Add(1)
go func() {
wg.Done()
err := wf.Schedule("andrew", StatusStart, "@monthly")
err := wf.Schedule("andrew", "@monthly")
require.Nil(t, err)
}()

Expand All @@ -114,21 +114,21 @@ func TestWorkflow_ScheduleShutdown(t *testing.T) {
time.Sleep(200 * time.Millisecond)

require.Equal(t, map[string]workflow.State{
"start-andrew-scheduler-@monthly": workflow.StateRunning,
"start-consumer-1-of-1": workflow.StateRunning,
"outbox-consumer": workflow.StateRunning,
"delete-consumer": workflow.StateRunning,
"paused-records-retry-consumer": workflow.StateRunning,
"andrew-scheduler-@monthly": workflow.StateRunning,
"start-consumer-1-of-1": workflow.StateRunning,
"outbox-consumer": workflow.StateRunning,
"delete-consumer": workflow.StateRunning,
"paused-records-retry-consumer": workflow.StateRunning,
}, wf.States())

wf.Stop()

require.Equal(t, map[string]workflow.State{
"start-andrew-scheduler-@monthly": workflow.StateShutdown,
"start-consumer-1-of-1": workflow.StateShutdown,
"outbox-consumer": workflow.StateShutdown,
"delete-consumer": workflow.StateShutdown,
"paused-records-retry-consumer": workflow.StateShutdown,
"andrew-scheduler-@monthly": workflow.StateShutdown,
"start-consumer-1-of-1": workflow.StateShutdown,
"outbox-consumer": workflow.StateShutdown,
"delete-consumer": workflow.StateShutdown,
"paused-records-retry-consumer": workflow.StateShutdown,
}, wf.States())
}

Expand Down Expand Up @@ -169,7 +169,7 @@ func TestWorkflow_ScheduleFilter(t *testing.T) {
opt := workflow.WithScheduleFilter[MyType, status](filter)

go func() {
err := wf.Schedule("andrew", StatusStart, "@monthly", opt)
err := wf.Schedule("andrew", "@monthly", opt)
require.Nil(t, err)
}()

Expand Down
8 changes: 4 additions & 4 deletions testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestRequire(t *testing.T) {
t.Cleanup(wf.Stop)

fid := "10298309123"
_, err := wf.Trigger(ctx, fid, StatusStart)
_, err := wf.Trigger(ctx, fid)
require.Nil(t, err)

workflow.Require(t, wf, fid, StatusEnd, "Lower")
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestWaitFor(t *testing.T) {
t.Cleanup(wf.Stop)

fid := "10298309123"
_, err := wf.Trigger(ctx, fid, StatusStart)
_, err := wf.Trigger(ctx, fid)
require.Nil(t, err)

workflow.WaitFor(t, wf, fid, func(r *workflow.Run[string, status]) (bool, error) {
Expand All @@ -197,11 +197,11 @@ func (a apiImpl[Type, Status]) Name() string {
return "test"
}

func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) {
func (a apiImpl[Type, Status]) Trigger(ctx context.Context, foreignID string, opts ...workflow.TriggerOption[Type, Status]) (runID string, err error) {
return "", nil
}

func (a apiImpl[Type, Status]) Schedule(foreignID string, startingStatus Status, spec string, opts ...workflow.ScheduleOption[Type, Status]) error {
func (a apiImpl[Type, Status]) Schedule(foreignID string, spec string, opts ...workflow.ScheduleOption[Type, Status]) error {
return nil
}

Expand Down
Loading
Loading