Skip to content

Commit

Permalink
[FLINK-37258][Runtime] Return Ordered Job list on Disptacher#request…
Browse files Browse the repository at this point in the history
…MultipleJobDetails
  • Loading branch information
vahmed-hamdy authored and XComp committed Feb 10, 2025
1 parent 9535f75 commit 94007ff
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,19 @@ public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Duration

completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));
runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job));

return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values()));
Collection<JobDetails> orderedDeduplicatedJobs =
deduplicatedJobs.values().stream()
.sorted(
(jd1, jd2) ->
jd1.getStartTime() == jd2.getStartTime()
? jd1.getJobId()
.compareTo(jd2.getJobId())
: Long.compare(
jd2.getStartTime(),
jd1.getStartTime()))
.collect(Collectors.toList());

return new MultipleJobsDetails(orderedDeduplicatedJobs);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
Expand All @@ -140,6 +142,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1158,6 +1162,49 @@ public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() thro
JobStatus.FINISHED, dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
}

@Test
public void testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByStartTimeInDecOrder()
throws Exception {
final JobID secondJobID = new JobID();
JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
secondJobGraph.setJobID(secondJobID);
final JobManagerRunnerFactory blockingJobMaster =
new QueuedJobManagerRunnerFactory(
runningJobManagerRunnerWithJobStatus(
JobStatus.RUNNING, jobId, 0L, 100L, 110L),
runningJobManagerRunnerWithJobStatus(
JobStatus.RUNNING, secondJobID, 10L, 11L, 12L));

DispatcherGateway dispatcherGateway =
createDispatcherAndStartJobs(
blockingJobMaster, Arrays.asList(jobGraph, secondJobGraph));

assertOnlyContainsRunningJobsWithOrder(
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
Arrays.asList(secondJobID, jobId));
}

@Test
public void
testRequestMultipleJobDetails_returnsJobsOfSameStateOrderedByJobIdWhenSameStartTime()
throws Exception {
final JobID secondJobID = new JobID();
JobGraph secondJobGraph = JobGraphTestUtils.streamingJobGraph();
secondJobGraph.setJobID(secondJobID);
final JobManagerRunnerFactory blockingJobMaster =
new QueuedJobManagerRunnerFactory(
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, jobId, 10L),
runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING, secondJobID, 10L));

DispatcherGateway dispatcherGateway =
createDispatcherAndStartJobs(
blockingJobMaster, Arrays.asList(jobGraph, secondJobGraph));

assertOnlyContainsRunningJobsWithOrder(
dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(),
Stream.of(jobId, secondJobID).sorted().collect(Collectors.toList()));
}

@Test
public void testRequestMultipleJobDetails_isSerializable() throws Exception {
final JobManagerRunnerFactory blockingJobMaster =
Expand Down Expand Up @@ -1225,7 +1272,26 @@ public void testOverridingJobVertexParallelisms() throws Exception {

private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus) {
return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 0L);
}

private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus, final JobID jobId, long startTime) {
return runningJobManagerRunnerWithJobStatus(
currentJobStatus, jobId, startTime, startTime, startTime);
}

private JobManagerRunner runningJobManagerRunnerWithJobStatus(
final JobStatus currentJobStatus,
final JobID jobId,
long startTime,
long transitionToCreatedTimestamp,
long transitionToRunningTimestamp) {
Preconditions.checkArgument(!currentJobStatus.isTerminalState());
long[] stateTimeStampsForRunningJob = new long[JobStatus.values().length];
stateTimeStampsForRunningJob[JobStatus.INITIALIZING.ordinal()] = startTime;
stateTimeStampsForRunningJob[JobStatus.CREATED.ordinal()] = transitionToCreatedTimestamp;
stateTimeStampsForRunningJob[JobStatus.RUNNING.ordinal()] = transitionToRunningTimestamp;

return TestingJobManagerRunner.newBuilder()
.setJobId(jobId)
Expand All @@ -1235,6 +1301,7 @@ private JobManagerRunner runningJobManagerRunnerWithJobStatus(
new ArchivedExecutionGraphBuilder()
.setJobID(jobId)
.setState(currentJobStatus)
.setStateTimestamps(stateTimeStampsForRunningJob)
.build()))
.build();
}
Expand All @@ -1256,13 +1323,41 @@ private JobManagerRunner completedJobManagerRunnerWithJobStatus(
.build();
}

private DispatcherGateway createDispatcherAndStartJobs(
final JobManagerRunnerFactory jobManagerRunnerFactory, final List<JobGraph> jobGraphs)
throws Exception {
dispatcher =
createAndStartDispatcher(heartbeatServices, haServices, jobManagerRunnerFactory);
DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
jobMasterLeaderElection.isLeader(UUID.randomUUID());
for (JobGraph jobGraph : jobGraphs) {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
}

return dispatcherGateway;
}

private static void assertOnlyContainsSingleJobWithState(
final JobStatus expectedJobStatus, final MultipleJobsDetails multipleJobsDetails) {
final Collection<JobDetails> finishedJobDetails = multipleJobsDetails.getJobs();
assertThat(finishedJobDetails).hasSize(1);
assertThat(finishedJobDetails.iterator().next().getStatus()).isEqualTo(expectedJobStatus);
}

private static void assertOnlyContainsRunningJobsWithOrder(
final MultipleJobsDetails multipleJobsDetails,
final List<JobID> expectedOrderedJobIDs) {
final Collection<JobDetails> finishedJobDetails = multipleJobsDetails.getJobs();
assertThat(finishedJobDetails).isInstanceOf(List.class);
assertThat(finishedJobDetails).hasSize(expectedOrderedJobIDs.size());
Iterator<JobDetails> jobDetailsIterator = finishedJobDetails.iterator();
for (final JobID nextExpectedJobId : expectedOrderedJobIDs) {
final JobDetails jobDetails = jobDetailsIterator.next();
assertThat(jobDetails.getStatus()).isEqualTo(JobStatus.RUNNING);
assertThat(jobDetails.getJobId()).isEqualTo(nextExpectedJobId);
}
}

@Test
public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws Exception {
final JobID jobId1 = new JobID();
Expand Down

0 comments on commit 94007ff

Please sign in to comment.