Skip to content

Commit

Permalink
Few cosmetic tweaks
Browse files Browse the repository at this point in the history
Just some small cosmetic tweaks.

1. In the WorkItem interface, change the required method to `IsWorkItem() bool` which better identifies objects that are work items. Renamed `Description() string` to `String() string` so it implements the more idiomatic `fmt.Stringer`.
2. Fixed a warning due to `log.Fatalf` not supporting `%w`
  • Loading branch information
ItalyPaleAle committed Feb 2, 2024
1 parent bac0292 commit 2ad1c8e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
4 changes: 2 additions & 2 deletions backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, wi WorkItem) er
func (ap *activityProcessor) CompleteWorkItem(ctx context.Context, wi WorkItem) error {
awi := wi.(*ActivityWorkItem)
if awi.Result == nil {
return fmt.Errorf("can't complete work item '%s' with nil result", wi.Description())
return fmt.Errorf("can't complete work item '%s' with nil result", wi)
}
if awi.Result.GetTaskCompleted() == nil && awi.Result.GetTaskFailed() == nil {
return fmt.Errorf("can't complete work item '%s', which isn't TaskCompleted or TaskFailed", wi.Description())
return fmt.Errorf("can't complete work item '%s', which isn't TaskCompleted or TaskFailed", wi)
}

return ap.be.CompleteActivityWorkItem(ctx, awi)
Expand Down
9 changes: 5 additions & 4 deletions backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,19 @@ func (w *worker) ProcessNext(ctx context.Context) (bool, error) {
}()

wi, err := w.processor.FetchWorkItem(ctx)
if err == ErrNoWorkItems || wi == nil {
switch {
case errors.Is(err, ErrNoWorkItems) || wi == nil:
if !w.waiting {
w.logger.Debugf("%v: waiting for new work items...", w.Name())
w.waiting = true
}
return false, nil
} else if err != nil {
case err != nil:
if !errors.Is(err, ctx.Err()) {
w.logger.Errorf("%v: failed to fetch work item: %v", w.Name(), err)
}
return false, err
} else {
default:
// process the work-item in the background
w.waiting = false
processing = true
Expand All @@ -205,7 +206,7 @@ func (w *worker) processWorkItem(ctx context.Context, wi WorkItem) {
defer w.dispatchSemaphore.Release(1)
defer w.pending.Done()

w.logger.Debugf("%v: processing work item: %s", w.Name(), wi.Description())
w.logger.Debugf("%v: processing work item: %s", w.Name(), wi)

if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
if errors.Is(err, ctx.Err()) {
Expand Down
35 changes: 23 additions & 12 deletions backend/workitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
var ErrNoWorkItems = errors.New("no work items were found")

type WorkItem interface {
Description() string
fmt.Stringer
IsWorkItem() bool
}

type OrchestrationWorkItem struct {
Expand All @@ -23,19 +24,24 @@ type OrchestrationWorkItem struct {
Properties map[string]interface{}
}

func (wi *OrchestrationWorkItem) Description() string {
return fmt.Sprintf("%v (%d event(s))", wi.InstanceID, len(wi.NewEvents))
// String implements core.WorkItem and fmt.Stringer
func (wi OrchestrationWorkItem) String() string {
return fmt.Sprintf("%s (%d event(s))", wi.InstanceID, len(wi.NewEvents))
}

// IsWorkItem implements core.WorkItem
func (wi OrchestrationWorkItem) IsWorkItem() bool {
return true
}

func (wi *OrchestrationWorkItem) GetAbandonDelay() time.Duration {
if wi.RetryCount == 0 {
switch {
case wi.RetryCount == 0:
return time.Duration(0) // no delay
} else {
if wi.RetryCount > 100 {
return 5 * time.Minute // max delay
} else {
return time.Duration(wi.RetryCount) * time.Second // linear backoff
}
case wi.RetryCount > 100:
return 5 * time.Minute // max delay
default:
return time.Duration(wi.RetryCount) * time.Second // linear backoff
}
}

Expand All @@ -48,9 +54,14 @@ type ActivityWorkItem struct {
Properties map[string]interface{}
}

// Description implements core.WorkItem
func (wi *ActivityWorkItem) Description() string {
// String implements core.WorkItem and fmt.Stringer
func (wi ActivityWorkItem) String() string {
name := wi.NewEvent.GetTaskScheduled().GetName()
taskID := wi.NewEvent.EventId
return fmt.Sprintf("%s/%s#%d", wi.InstanceID, name, taskID)
}

// IsWorkItem implements core.WorkItem
func (wi ActivityWorkItem) IsWorkItem() bool {
return true
}
2 changes: 1 addition & 1 deletion samples/distributedtracing/distributedtracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
// Tracing can be configured independently of the orchestration code.
tp, err := ConfigureZipkinTracing()
if err != nil {
log.Fatalf("Failed to create tracer: %w", err)
log.Fatalf("Failed to create tracer: %v", err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
Expand Down

0 comments on commit 2ad1c8e

Please sign in to comment.