Skip to content

Commit

Permalink
Add flow execution id to flow metadata for multi-active case (#3733)
Browse files Browse the repository at this point in the history
Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Aug 8, 2023
1 parent 01036cc commit 0d3b3b5
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
return;
}
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());

// If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later
Expand Down

0 comments on commit 0d3b3b5

Please sign in to comment.