-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2134] update job status to SKIPPED for all the dependent jobs of a cancelled job #4049
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4049 +/- ##
============================================
+ Coverage 38.79% 41.11% +2.31%
- Complexity 1599 2201 +602
============================================
Files 388 480 +92
Lines 15998 20360 +4362
Branches 1585 2355 +770
============================================
+ Hits 6207 8371 +2164
- Misses 9293 11097 +1804
- Partials 498 892 +394 ☔ View full report in Codecov by Sentry. |
fix merge conflicts
8d5589d
to
4bb8ae5
Compare
4bb8ae5
to
43d53e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice improvement. it generally looks good, but let's align on whether SKIPPED is only job-level or also flow-level. once we decide, I'll take one more pass through the ReevaluateDagProcTest
to read that more closely
@@ -49,4 +49,9 @@ enum ExecutionStatus { | |||
* Flow cancelled. | |||
*/ | |||
CANCELLED | |||
|
|||
/** | |||
* Flow or job is skipped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would a flow be skipped? wouldn't the flow instead be CANCELLED or FAILED? after that (fewer than all of) that flow's jobs may be SKIPPED (fewer, because at least one would be CANCELLED or FAILED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it just comes down to how we define things. imo, when a flow execution is skipped when there is already an execution for the same flow is running, status SKIPPED sounds more appropriate.
@@ -164,7 +164,8 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) | |||
DagNode<JobExecutionPlan> node = nodesToExpand.poll(); | |||
ExecutionStatus executionStatus = getExecutionStatus(node); | |||
boolean addFlag = true; | |||
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) { | |||
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME || | |||
executionStatus == SKIPPED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unclear here: is "skipping" able to be reversed, so the node can later be ready? (I'm equating getNext
to identifying the set of "ready" nodes.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no , skipped cannot not be reversed. this diff should not be here, ill change it. i think it might be appropriate in some draft version of this PR, but not anymore
@@ -116,7 +116,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by | |||
|
|||
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = ImmutableList | |||
.of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY, | |||
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.COMPLETE, | |||
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, ExecutionStatus.SKIPPED, ExecutionStatus.COMPLETE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is CANCELLED last, and SKIPPED prior to COMPLETE? what of the similar idea that news of job COMPLETE might arrive after we'd already attempted cancellation or skipping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these four are the terminal statuses and once a job reaches here, further GTEs can be ignored.
we also do not expect to see two of them for the same job.
yes, there can be some combinations of events among these four, that may arrive due to race condition, but i think, in that case, it is ok for GaaS to just adhere to any of the ordering. I do not want to change the correct ordering, so I added SKIPPED before other terminal statuses, to basically support the same idea of yours - let it show complete if it is cancelled/skipped earlier.
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
findDependentJobs(dag, node, dependentJobs); | ||
for (Dag.DagNode<JobExecutionPlan> dependentJob : dependentJobs) { | ||
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), dependentJob.getValue()); | ||
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SKIPPED).stop(jobMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about hard-coding to this static
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
Outdated
Show resolved
Hide resolved
@@ -159,9 +164,12 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da | |||
dag.setMessage("Flow failed because job " + jobName + " failed"); | |||
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED); | |||
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode); | |||
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering... is this a kind of 'ping-pong'?
a. a job fails, which emits a GTE
b. the KJSM sees the GTE and then creates a DagActionType.REEVALUATE
c. this ReevaluateDagProc emits a SKIPPED GTE for all dependent jobs
d. the KJSM sees those GTEs and creates a DagActionType.REEVALUATE
for each of those
I'm wondering whether step d.) is necessary, given we setting SKIPPED should be a bulk operation on ALL dependent jobs. does the KJSM really need to create a DagAction
for reevaluating those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes (d) needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.
break; | ||
case CANCELLED: | ||
case SKIPPED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is job-level SKIPPED
, due to the "ping-pong" I just described?
or is arising from a flow-level execution-status of SKIPPED
. if the latter, who sets that? I thought it would be only job-level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, yes this needs to be removed. in a draft version, i was emitting skipped events only for the child jobs not for all the dependent jobs.
fix merge conflicts
add tests
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
created SKIPPED execution status
used it for the jobs that cannot be run because it's parent job is cancelled
Tests
updated tests in ReevaluateDagProcTest
Commits