Skip to content

Commit

Permalink
fix: skipped workflow finished eventwhen action is scoped and has mor…
Browse files Browse the repository at this point in the history
…e and added queue for loops handling to keep on same queue
  • Loading branch information
pksorensen committed Oct 17, 2024
1 parent b9a269f commit 0a6d6c0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
16 changes: 12 additions & 4 deletions src/WorkflowEngine.Hangfire/ForloopAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ namespace WorkflowEngine.Core.Actions
{
public interface IArrayContext
{
public string JobId { get; set; }
string JobId { get; set; }
string Queue { get; set; }
bool HasMore { get; set; }
}
public class ArrayContext : IArrayContext
{
public string JobId { get; set; }
public string Queue { get; set; }
public bool HasMore { get; set; }
}

public class ForeachAction : IActionImplementation
Expand All @@ -35,7 +39,9 @@ public async ValueTask<object> ExecuteAsync(IRunContext context, IWorkflow workf
{

var loop = workflow.Manifest.Actions.FindAction(action.Key) as ForLoopActionMetadata;


arrayContext.HasMore = true;

if (loop.ForEach is string str && str.Contains("@"))
{
var items = await expressionEngine.ParseToValueContainer(str);
Expand Down Expand Up @@ -64,6 +70,7 @@ public async ValueTask<object> ExecuteAsync(IRunContext context, IWorkflow workf
}
else if (action.Index < itemsToRunover.Count)
{

// var nextAction = new Action { Type = action.Type, Key=action.Key, ScheduledTime = DateTimeOffset.UtcNow, RunId = context.RunId, Index = action.Index+1 };

var nextactionmetadata = loop.Actions.SingleOrDefault(c => c.Value.RunAfter?.Count == 0);
Expand All @@ -73,17 +80,18 @@ public async ValueTask<object> ExecuteAsync(IRunContext context, IWorkflow workf
var nextaction = context.CopyTo(new Action { Type = nextactionmetadata.Value.Type, Key = $"{action.Key}.{nextactionmetadata.Key}", ScheduledTime = DateTimeOffset.UtcNow, Index = action.Index+1 });


var a = backgroundJobClient.ContinueJobWith<IHangfireActionExecutor>(arrayContext.JobId,
var a = backgroundJobClient.ContinueJobWith<IHangfireActionExecutor>(arrayContext.JobId, arrayContext.Queue,
(executor) => executor.ExecuteAsync(context, workflow, nextaction, null));

return new { item = itemsToRunover[action.Index] };
}
}

}
}



arrayContext.HasMore = false;

return new { };
}
Expand Down
8 changes: 7 additions & 1 deletion src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
try
{
runContextAccessor.RunContext = run;
arrayContext.JobId = context.BackgroundJob.Id;

var queue = context.BackgroundJob.Job.Queue ?? "default";
arrayContext.JobId = context.BackgroundJob.Id;
arrayContext.Queue = queue;

var result = await actionExecutor.ExecuteAsync(run, workflow, action);

Expand Down Expand Up @@ -149,6 +151,10 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
{
await outputRepository.AddEvent(run, workflow, action, WorkflowEvent.CreateFinishedEvent(context.BackgroundJob.Id, result));
throw new InvalidOperationException("Action failed: " + result.FailedReason) { Data = { ["ActionResult"] = result } };
}
else if ( workflow.Manifest.Actions.FindAction(action.Key) is IScopedActionMetadata scopedAction && arrayContext.HasMore)
{

}
else
{
Expand Down

0 comments on commit 0a6d6c0

Please sign in to comment.