Skip to content

Commit

Permalink
[FLINK-36299][runtime] Makes DeclarativeSlotPool rely on the Componen…
Browse files Browse the repository at this point in the history
…tMainThreadExecutor that's used in the corresponding test rather than the test's main thread

This issue was introduced by FLINK-36168 where I added proper shutdown logic
  • Loading branch information
XComp committed Sep 23, 2024
1 parent e583898 commit 1a65f45
Showing 1 changed file with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@
import java.util.stream.IntStream;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
Expand Down Expand Up @@ -412,7 +411,7 @@ void testExecutionGraphGenerationWithAvailableResources() throws Exception {
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(
Expand Down Expand Up @@ -470,7 +469,7 @@ void testExecutionGraphGenerationSetsInitializationTimestamp() throws Exception
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(
Expand Down Expand Up @@ -695,7 +694,7 @@ void testStatusMetrics() throws Exception {
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = createConfigurationWithNoTimeouts();
configuration.set(
Expand Down Expand Up @@ -784,7 +783,7 @@ void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exceptio
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler =
new AdaptiveSchedulerBuilder(
Expand All @@ -805,7 +804,7 @@ void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Excepti
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
Expand Down Expand Up @@ -834,7 +833,7 @@ void testResourceAcquisitionTriggersJobExecution() throws Exception {
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(
Expand Down Expand Up @@ -929,7 +928,7 @@ void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
final JobGraph jobGraph = createJobGraph();
final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(
Expand Down Expand Up @@ -1074,7 +1073,7 @@ void testConsistentMaxParallelism() throws Exception {
final JobGraph jobGraph = streamingJobGraph(vertex);

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler =
new AdaptiveSchedulerBuilder(
Expand Down Expand Up @@ -1142,7 +1141,7 @@ void testRequirementIncreaseTriggersScaleUp() throws Exception {
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);

Expand Down Expand Up @@ -1177,7 +1176,7 @@ void testRequirementDecreaseTriggersScaleDown() throws Exception {
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);

Expand All @@ -1203,7 +1202,7 @@ void testRequirementLowerBoundIncreaseBelowCurrentParallelismDoesNotTriggerResca
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);

Expand Down Expand Up @@ -1239,7 +1238,7 @@ void testRequirementLowerBoundIncreaseBeyondCurrentParallelismKeepsJobRunning()
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

scheduler = createSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool);
int scaledUpParallelism = PARALLELISM * 10;
Expand Down Expand Up @@ -1304,7 +1303,7 @@ void testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure(
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final int availableSlots = 1;
JobResourceRequirements initialJobResourceRequirements =
Expand Down Expand Up @@ -1343,7 +1342,7 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots()
final JobGraph jobGraph = createJobGraph();

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), singleThreadMainThreadExecutor);

final int availableSlots = 1;
JobResourceRequirements initialJobResourceRequirements =
Expand Down Expand Up @@ -2068,7 +2067,8 @@ void testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequireme
configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout);

final DeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID(), slotIdleTimeout);
createDeclarativeSlotPool(
jobGraph.getJobID(), singleThreadMainThreadExecutor, slotIdleTimeout);
scheduler =
new AdaptiveSchedulerBuilder(
jobGraph,
Expand Down Expand Up @@ -2505,20 +2505,21 @@ private Consumer<ExecutionAttemptID> createCancelConsumer(SchedulerNG scheduler)
executionAttemptId, ExecutionState.CANCELED)));
}

private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobId) {
return createDeclarativeSlotPool(jobId, DEFAULT_TIMEOUT);
private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(
JobID jobId, ComponentMainThreadExecutor mainThreadExecutor) {
return createDeclarativeSlotPool(jobId, mainThreadExecutor, DEFAULT_TIMEOUT);
}

private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(
JobID jobId, Duration idleSlotTimeout) {
JobID jobId, ComponentMainThreadExecutor mainThreadExecutor, Duration idleSlotTimeout) {
return new DefaultDeclarativeSlotPool(
jobId,
new DefaultAllocatedSlotPool(),
ignored -> {},
idleSlotTimeout,
DEFAULT_TIMEOUT,
Duration.ZERO,
forMainThread());
mainThreadExecutor);
}

/**
Expand Down Expand Up @@ -2747,7 +2748,7 @@ Iterable<RootExceptionHistoryEntry> run() throws Exception {
completedCheckpointStore, checkpointIDCounter);

final DefaultDeclarativeSlotPool declarativeSlotPool =
createDeclarativeSlotPool(jobGraph.getJobID());
createDeclarativeSlotPool(jobGraph.getJobID(), mainThreadExecutor);

final Configuration configuration = new Configuration();
configuration.set(
Expand Down

0 comments on commit 1a65f45

Please sign in to comment.