Skip to content

Commit

Permalink
fix: added jobid to trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
pksorensen committed Oct 17, 2023
1 parent 046996c commit 4a77e92
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 26 deletions.
35 changes: 15 additions & 20 deletions src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Hangfire;
using Hangfire;
using Hangfire.Server;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -36,7 +36,7 @@ public static string TriggerAsync<TTriggerContext>(this IBackgroundJobClient bac
where TTriggerContext : TriggerContext
{
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(
(executor) => executor.TriggerAsync(trigger));
(executor) => executor.TriggerAsync(trigger,null));

return job;

Expand Down Expand Up @@ -95,13 +95,7 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
{
var result = await actionExecutor.ExecuteAsync(run, workflow, action);

await outputRepository.AddEvent(run, workflow, action, new ActionCompletedEvent
{
// TODO: Add path to action in state
// assignees: thygesteffensen
JobId = context.BackgroundJob.Id,
ActionKey = action.Key
});
await outputRepository.AddEvent(run, workflow, action, ActionCompletedEvent.FromAction(result,action,context.BackgroundJob.Id));

if (result != null)
{
Expand All @@ -127,12 +121,12 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
}
else if (result.Status == "Failed" && result.ReThrow)
{
await outputRepository.AddEvent(run, workflow, action, new WorkflowFinishedEvent());
await outputRepository.AddEvent(run, workflow, action, WorkflowEvent.CreateFinishedEvent(context.BackgroundJob.Id,result));
throw new InvalidOperationException("Action failed: " + result.FailedReason) { Data = { ["ActionResult"] = result } };
}
else
{
await outputRepository.AddEvent(run, workflow, action, new WorkflowFinishedEvent());
await outputRepository.AddEvent(run, workflow, action, WorkflowEvent.CreateFinishedEvent(context.BackgroundJob.Id, result));
}
}

Expand All @@ -148,25 +142,26 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,
/// <summary>
/// Runs on the background process in hangfire
/// </summary>
/// <param name="context"></param>
/// <param name="triggerContext"></param>
/// <returns></returns>
public async ValueTask<object> TriggerAsync(ITriggerContext context)
public async ValueTask<object> TriggerAsync(ITriggerContext triggerContext, PerformContext context)
{
//TODO - avoid sending all workflow over hangfire,
context.Workflow.Manifest ??= await workflowAccessor.GetWorkflowManifestAsync(context.Workflow);

context.RunId = context.RunId == Guid.Empty ? Guid.NewGuid() : context.RunId;
triggerContext.Workflow.Manifest ??= await workflowAccessor.GetWorkflowManifestAsync(triggerContext.Workflow);

runContextAccessor.RunContext = context;
var action = await executor.Trigger(context);
triggerContext.RunId = triggerContext.RunId == Guid.Empty ? Guid.NewGuid() : triggerContext.RunId;
triggerContext.JobId = context.BackgroundJob.Id;

runContextAccessor.RunContext = triggerContext;
var action = await executor.Trigger(triggerContext);

if (action != null)
{
//TODO - avoid sending all workflow over hangfire, so we should wipe the workflow.manifest before scheduling and restore it after.
context.Workflow.Manifest = null;
triggerContext.Workflow.Manifest = null;

var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(context, context.Workflow, action, null));
(executor) => executor.ExecuteAsync(triggerContext, triggerContext.Workflow, action, null));
}
return action;
}
Expand Down
5 changes: 3 additions & 2 deletions src/WorkflowEngine.Hangfire/IHangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Hangfire;
using Hangfire;
using Hangfire.Server;
using System.ComponentModel;
using System.Threading.Tasks;
using WorkflowEngine.Core;
Expand All @@ -8,7 +9,7 @@ namespace WorkflowEngine
public interface IHangfireWorkflowExecutor
{
[JobDisplayName("Trigger: {0:Workflow} RunId={0:Id}")]
public ValueTask<object> TriggerAsync(ITriggerContext context);
public ValueTask<object> TriggerAsync(ITriggerContext triggercontext, PerformContext context);
}

}
4 changes: 2 additions & 2 deletions src/WorkflowEngine.Hangfire/ScheduledWorkflowTrigger.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Hangfire;
using Hangfire;
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.Storage;
Expand Down Expand Up @@ -55,7 +55,7 @@ public Task<string> Trigger(string externalid, bool create, DateTimeOffset time,

var job = _backgroundJobClient.Schedule<IHangfireWorkflowExecutor>((executor) => executor.TriggerAsync(
new TriggerContext { Workflow = workflow, Trigger = trigger ,
}), time);
},null), time);

_logger.LogInformation("Created scheduled workflow job {JobID}", job);

Expand Down
4 changes: 2 additions & 2 deletions src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Hangfire;
using Hangfire;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -50,7 +50,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Type = trigger.Value.Type,
Key = trigger.Key
},
}), trigger.Value.Inputs["cronExpression"] as string);
},null), trigger.Value.Inputs["cronExpression"] as string);

if (first && trigger.Value.Inputs.ContainsKey("runAtStartup") && (bool)trigger.Value.Inputs["runAtStartup"])
jobs.Trigger(workflow.Id.ToString());
Expand Down

0 comments on commit 4a77e92

Please sign in to comment.