From ed0697dccfb59e4fcd1aa8a4c48801458457acf4 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 2 Dec 2024 19:11:13 +0100 Subject: [PATCH] [FLINK-36451][runtime] Makes hasLeadership and confirmLeadership work asynchronously --- .../runner/DefaultDispatcherRunner.java | 2 +- .../nonha/embedded/EmbeddedLeaderService.java | 18 +-- .../JobMasterServiceLeadershipRunner.java | 63 +++++----- .../leaderelection/DefaultLeaderElection.java | 11 +- .../DefaultLeaderElectionService.java | 109 ++++++++++-------- .../leaderelection/LeaderElection.java | 5 +- .../StandaloneLeaderElection.java | 11 +- .../runner/DefaultDispatcherRunnerTest.java | 2 +- .../embedded/EmbeddedHaServicesTest.java | 14 +-- .../DefaultLeaderElectionServiceTest.java | 102 +++++++++++----- .../DefaultLeaderElectionTest.java | 37 +++--- .../leaderelection/LeaderElectionTest.java | 20 +++- .../StandaloneLeaderElectionTest.java | 17 ++- .../leaderelection/TestingContender.java | 9 +- .../leaderelection/TestingLeaderElection.java | 11 +- .../DocumentingDispatcherRestEndpoint.java | 10 +- 16 files changed, 276 insertions(+), 165 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java index ce6579a67b956..9326a1ff41ab0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java @@ -176,7 +176,7 @@ private void forwardConfirmLeaderSessionFuture( FutureUtils.assertNoException( newDispatcherLeaderProcess .getLeaderAddressFuture() - .thenAccept( + .thenCompose( leaderAddress -> leaderElection.confirmLeadership( leaderSessionID, leaderAddress))); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java index 189189c2bc286..0d2085f44e623 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java @@ -244,14 +244,14 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) { } /** Callback from leader contenders when they confirm a leader grant. */ - private void confirmLeader( + private CompletableFuture confirmLeader( final EmbeddedLeaderElection embeddedLeaderElection, final UUID leaderSessionId, final String leaderAddress) { synchronized (lock) { // if the leader election was shut down in the meantime, ignore this confirmation if (!embeddedLeaderElection.running || shutdown) { - return; + return FutureUtils.completedVoidFuture(); } try { @@ -269,7 +269,7 @@ private void confirmLeader( currentLeaderProposed = null; // notify all listeners - notifyAllListeners(leaderAddress, leaderSessionId); + return notifyAllListeners(leaderAddress, leaderSessionId); } else { LOG.debug( "Received confirmation of leadership for a stale leadership grant. Ignoring."); @@ -278,6 +278,8 @@ private void confirmLeader( fatalError(t); } } + + return FutureUtils.completedVoidFuture(); } private CompletableFuture notifyAllListeners(String address, UUID leaderSessionId) { @@ -465,15 +467,17 @@ public void close() { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { + public CompletableFuture confirmLeadership( + UUID leaderSessionID, String leaderAddress) { checkNotNull(leaderSessionID); checkNotNull(leaderAddress); - confirmLeader(this, leaderSessionID, leaderAddress); + return confirmLeader(this, leaderSessionID, leaderAddress); } @Override - public boolean hasLeadership(UUID leaderSessionId) { - return isLeader && leaderSessionId.equals(currentLeaderSessionId); + public CompletableFuture hasLeadership(UUID leaderSessionId) { + return CompletableFuture.completedFuture( + isLeader && leaderSessionId.equals(currentLeaderSessionId)); } void shutdown(Exception cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index 160830beaa456..5a479708ae29a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -257,26 +257,27 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) { unused -> jobResultStore .hasJobResultEntryAsync(getJobID()) - .thenAccept( + .thenCompose( hasJobResult -> { if (hasJobResult) { - handleJobAlreadyDoneIfValidLeader( + return handleJobAlreadyDoneIfValidLeader( leaderSessionId); } else { - createNewJobMasterServiceProcessIfValidLeader( + return createNewJobMasterServiceProcessIfValidLeader( leaderSessionId); } })); handleAsyncOperationError(sequentialOperation, "Could not start the job manager."); } - private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { - runIfValidLeader( + private CompletableFuture handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { + return runIfValidLeader( leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job"); } - private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) { - runIfValidLeader( + private CompletableFuture createNewJobMasterServiceProcessIfValidLeader( + UUID leaderSessionId) { + return runIfValidLeader( leaderSessionId, () -> ThrowingRunnable.unchecked( @@ -336,15 +337,18 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) { private void confirmLeadership( UUID leaderSessionId, CompletableFuture leaderAddressFuture) { FutureUtils.assertNoException( - leaderAddressFuture.thenAccept( + leaderAddressFuture.thenCompose( address -> - runIfStateRunning( - () -> { - LOG.debug("Confirm leadership {}.", leaderSessionId); - leaderElection.confirmLeadership( - leaderSessionId, address); - }, - "confirming leadership"))); + callIfRunning( + () -> { + LOG.debug( + "Confirm leadership {}.", + leaderSessionId); + return leaderElection.confirmLeadership( + leaderSessionId, address); + }, + "confirming leadership") + .orElse(FutureUtils.completedVoidFuture()))); } private void forwardResultFuture( @@ -478,20 +482,32 @@ private boolean isRunning() { return state == State.RUNNING; } - private void runIfValidLeader( + private CompletableFuture runIfValidLeader( UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) { synchronized (lock) { - if (isValidLeader(expectedLeaderId)) { - action.run(); + if (isRunning() && leaderElection != null) { + return leaderElection + .hasLeadership(expectedLeaderId) + .thenAccept( + hasLeadership -> { + synchronized (lock) { + if (isRunning() && hasLeadership) { + action.run(); + } else { + noLeaderFallback.run(); + } + } + }); } else { noLeaderFallback.run(); + return FutureUtils.completedVoidFuture(); } } } - private void runIfValidLeader( + private CompletableFuture runIfValidLeader( UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) { - runIfValidLeader( + return runIfValidLeader( expectedLeaderId, action, () -> @@ -499,13 +515,6 @@ private void runIfValidLeader( noLeaderFallbackCommandDescription, expectedLeaderId)); } - @GuardedBy("lock") - private boolean isValidLeader(UUID expectedLeaderId) { - return isRunning() - && leaderElection != null - && leaderElection.hasLeadership(expectedLeaderId); - } - private void forwardIfValidLeader( UUID expectedLeaderId, CompletableFuture source, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java index 06d6b62fb65f3..b74566aab12a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java @@ -21,6 +21,7 @@ import org.apache.flink.util.Preconditions; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link @@ -43,12 +44,12 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { - parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress); + public CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress) { + return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress); } @Override - public boolean hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadership(UUID leaderSessionId) { return parentService.hasLeadership(componentId, leaderSessionId); } @@ -81,7 +82,7 @@ abstract static class ParentService { * the {@link LeaderContender} that is associated with the {@code componentId}. The * information is only propagated to the HA backend if the leadership is still acquired. */ - abstract void confirmLeadership( + abstract CompletableFuture confirmLeadership( String componentId, UUID leaderSessionID, String leaderAddress); /** @@ -91,6 +92,6 @@ abstract void confirmLeadership( * @return {@code true} if the service has leadership with the passed {@code * leaderSessionID} acquired; {@code false} otherwise. */ - abstract boolean hasLeadership(String componentId, UUID leaderSessionID); + abstract CompletableFuture hasLeadership(String componentId, UUID leaderSessionID); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 83acee3aa1804..376fbb2b057a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -313,7 +313,7 @@ public void close() throws Exception { } @Override - protected void confirmLeadership( + protected CompletableFuture confirmLeadership( String componentId, UUID leaderSessionID, String leaderAddress) { Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId)); LOG.debug( @@ -324,59 +324,72 @@ protected void confirmLeadership( checkNotNull(leaderSessionID); - synchronized (lock) { - if (hasLeadership(componentId, leaderSessionID)) { - Preconditions.checkState( - leaderElectionDriver != null, - "The leadership check should only return true if a driver is instantiated."); - Preconditions.checkState( - !confirmedLeaderInformation.hasLeaderInformation(componentId), - "No confirmation should have happened, yet."); - - final LeaderInformation newConfirmedLeaderInformation = - LeaderInformation.known(leaderSessionID, leaderAddress); - confirmedLeaderInformation = - LeaderInformationRegister.merge( - confirmedLeaderInformation, - componentId, - newConfirmedLeaderInformation); - leaderElectionDriver.publishLeaderInformation( - componentId, newConfirmedLeaderInformation); - } else { - if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { - LOG.debug( - "Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).", - leaderSessionID, - componentId, - issuedLeaderSessionID); - } else { - LOG.warn( - "The leader session ID {} for component '{}' was confirmed even though the corresponding " - + "service was not elected as the leader or has been stopped already.", - componentId, - leaderSessionID); - } - } - } + return CompletableFuture.runAsync( + () -> { + synchronized (lock) { + if (hasLeadershipInternal(componentId, leaderSessionID)) { + Preconditions.checkState( + leaderElectionDriver != null, + "The leadership check should only return true if a driver is instantiated."); + Preconditions.checkState( + !confirmedLeaderInformation.hasLeaderInformation(componentId), + "No confirmation should have happened, yet."); + + final LeaderInformation newConfirmedLeaderInformation = + LeaderInformation.known(leaderSessionID, leaderAddress); + confirmedLeaderInformation = + LeaderInformationRegister.merge( + confirmedLeaderInformation, + componentId, + newConfirmedLeaderInformation); + leaderElectionDriver.publishLeaderInformation( + componentId, newConfirmedLeaderInformation); + } else { + if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { + LOG.debug( + "Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).", + leaderSessionID, + componentId, + issuedLeaderSessionID); + } else { + LOG.warn( + "The leader session ID {} for component '{}' was confirmed even though the corresponding " + + "service was not elected as the leader or has been stopped already.", + componentId, + leaderSessionID); + } + } + } + }, + leadershipOperationExecutor); } @Override - protected boolean hasLeadership(String componentId, UUID leaderSessionId) { - synchronized (lock) { - if (leaderElectionDriver != null) { - if (leaderContenderRegistry.containsKey(componentId)) { - return leaderElectionDriver.hasLeadership() - && leaderSessionId.equals(issuedLeaderSessionID); - } else { - LOG.debug( - "hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.", - componentId); - return false; - } + protected CompletableFuture hasLeadership(String componentId, UUID leaderSessionId) { + return CompletableFuture.supplyAsync( + () -> { + synchronized (lock) { + return hasLeadershipInternal(componentId, leaderSessionId); + } + }, + leadershipOperationExecutor); + } + + @GuardedBy("lock") + private boolean hasLeadershipInternal(String componentId, UUID leaderSessionId) { + if (leaderElectionDriver != null) { + if (leaderContenderRegistry.containsKey(componentId)) { + return leaderElectionDriver.hasLeadership() + && leaderSessionId.equals(issuedLeaderSessionID); } else { - LOG.debug("hasLeadership is called after the service is closed, returning false."); + LOG.debug( + "hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.", + componentId); return false; } + } else { + LOG.debug("hasLeadership is called after the service is closed, returning false."); + return false; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java index 0dc4ff5562197..5067c708c019e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link @@ -42,7 +43,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionID The new leader session ID * @param leaderAddress The address of the new leader */ - void confirmLeadership(UUID leaderSessionID, String leaderAddress); + CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress); /** * Returns {@code true} if the service's {@link LeaderContender} has the leadership under the @@ -51,7 +52,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionId identifying the current leader * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ - boolean hasLeadership(UUID leaderSessionId); + CompletableFuture hasLeadership(UUID leaderSessionId); /** * Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java index aa408a24563e2..26d6bc2b0cc8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code StandaloneLeaderElection} implements {@link LeaderElection} for non-HA cases. This @@ -57,12 +59,15 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {} + public CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress) { + return FutureUtils.completedVoidFuture(); + } @Override - public boolean hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadership(UUID leaderSessionId) { synchronized (lock) { - return this.leaderContender != null && this.sessionID.equals(leaderSessionId); + return CompletableFuture.completedFuture( + this.leaderContender != null && this.sessionID.equals(leaderSessionId)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java index d4ddb1daedb32..5cd6d0183c574 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java @@ -273,7 +273,7 @@ public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Excep // complete the confirmation future after losing the leadership contenderConfirmationFuture.complete("leader address"); - assertThat(leaderElection.hasLeadership(leaderSessionId), is(false)); + assertThat(leaderElection.hasLeadership(leaderSessionId).get(), is(false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java index deefa2bf2daf2..9155cf71e3933 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java @@ -140,7 +140,7 @@ private void runLeaderRetrievalTest( final UUID leaderId = leaderContender.getLeaderSessionFuture().get(); - leaderElection.confirmLeadership(leaderId, ADDRESS); + leaderElection.confirmLeadership(leaderId, ADDRESS).get(); final LeaderInformation leaderInformation = leaderRetrievalListener.getLeaderInformationFuture().get(); @@ -172,20 +172,20 @@ public void testConcurrentLeadershipOperations() throws Exception { final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(true)); embeddedHaServices.getDispatcherLeaderService().revokeLeadership().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId), is(false)); + assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(false)); embeddedHaServices.getDispatcherLeaderService().grantLeadership(); final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true)); - leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS); - leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS); + leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS).get(); + leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS).get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true)); leaderContender.tryRethrowException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index 6776e32fe0473..f5e0a4d6f6efb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -713,14 +713,20 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception applyToBothContenderContexts( ctx -> { - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) - .isFalse(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, UUID.randomUUID()); + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -746,14 +752,20 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { assertThat(ctx.contender.getLeaderSessionID()) .isEqualTo(expectedSessionID); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) - .isTrue(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, UUID.randomUUID()); + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() + .isEqualTo(true); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -774,20 +786,27 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep applyToBothContenderContexts( ctx -> { - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadership( + ctx.componentId, UUID.randomUUID()); + + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() .as( "No operation should be handled anymore after the HA backend " + "indicated leadership loss even if the onRevokeLeadership wasn't " + "processed, yet, because some other process could have picked up " + "the leadership in the meantime already based on the HA " + "backend's decision.") - .isFalse(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + .isEqualTo(false); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -806,14 +825,16 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti applyToBothContenderContexts( ctx -> { - assertThat( + assertThatFuture( leaderElectionService.hasLeadership( ctx.componentId, expectedSessionID)) - .isFalse(); - assertThat( + .eventuallySucceeds() + .isEqualTo(false); + assertThatFuture( leaderElectionService.hasLeadership( ctx.componentId, UUID.randomUUID())) - .isFalse(); + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -833,10 +854,11 @@ void testHasLeadershipAfterLeaderElectionClose() throws Exception { ctx -> { ctx.leaderElection.close(); - assertThat( + assertThatFuture( leaderElectionService.hasLeadership( ctx.componentId, expectedSessionID)) - .isFalse(); + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -1253,9 +1275,15 @@ private void testNonBlockingCall( */ @Test void testNestedDeadlockInLeadershipConfirmation() throws Exception { + final AtomicReference leaderInformationStorage = + new AtomicReference<>(LeaderInformationRegister.empty()); try (final DefaultLeaderElectionService testInstance = new DefaultLeaderElectionService( - TestingLeaderElectionDriver.newNoOpBuilder()::build)) { + TestingLeaderElectionDriver.newBuilder( + new AtomicBoolean(false), + leaderInformationStorage, + new AtomicBoolean(false)) + ::build)) { final String componentId = "test-component"; final LeaderElection leaderElection = testInstance.createLeaderElection(componentId); @@ -1283,14 +1311,24 @@ void testNestedDeadlockInLeadershipConfirmation() throws Exception { grantReceivedLatch.await(); final CompletableFuture revocationFuture; + final CompletableFuture confirmLeadershipFuture; synchronized (leaderContender.getLock()) { revocationFuture = CompletableFuture.runAsync(testInstance::onRevokeLeadership); contenderLockAcquireLatch.await(); - leaderElection.confirmLeadership(leaderSessionId, "random-address"); + confirmLeadershipFuture = + leaderElection.confirmLeadership(leaderSessionId, "random-address"); } assertThatFuture(revocationFuture).eventuallySucceeds(); + assertThatFuture(confirmLeadershipFuture).eventuallySucceeds(); + assertThat(contenderLeadership).isFalse(); + assertThat(leaderInformationStorage.get().forComponentId(componentId).isEmpty()) + .as( + "The LeaderInformation is empty because the leadership confirmation succeeded the " + + "leadership revocation which resulted in no leader information being written out to " + + "the HA backend.") + .isTrue(); // not closing the LeaderElection instance would leave the DefaultLeaderElectionService // in an inconsistent state causing an error when closing the service diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java index 575f01396ae3a..2c23c903527f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java @@ -18,8 +18,9 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.BiConsumerWithException; -import org.apache.flink.util.function.TriConsumer; +import org.apache.flink.util.function.TriFunction; import org.junit.jupiter.api.Test; @@ -104,6 +105,8 @@ void testLeaderConfirmation() throws Exception { componentIdRef.set(componentId); leaderSessionIDRef.set(leaderSessionID); leaderAddressRef.set(address); + + return FutureUtils.completedVoidFuture(); }) .build(); try (final DefaultLeaderElection testInstance = @@ -173,14 +176,15 @@ private void testHasLeadership(boolean expectedReturnValue) throws Exception { (actualComponentId, actualLeaderSessionID) -> { componentIdRef.set(actualComponentId); leaderSessionIDRef.set(actualLeaderSessionID); - return expectedReturnValue; + return CompletableFuture.completedFuture(expectedReturnValue); }) .build(); try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService, DEFAULT_TEST_COMPONENT_ID)) { final UUID expectedLeaderSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(expectedLeaderSessionID)) + assertThatFuture(testInstance.hasLeadership(expectedLeaderSessionID)) + .eventuallySucceeds() .isEqualTo(expectedReturnValue); assertThat(componentIdRef).hasValue(DEFAULT_TEST_COMPONENT_ID); assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); @@ -192,14 +196,16 @@ private static class TestingAbstractLeaderElectionService private final BiConsumerWithException registerConsumer; private final Consumer removeConsumer; - private final TriConsumer confirmLeadershipConsumer; - private final BiFunction hasLeadershipFunction; + private final TriFunction> + confirmLeadershipConsumer; + private final BiFunction> hasLeadershipFunction; private TestingAbstractLeaderElectionService( BiConsumerWithException registerConsumer, Consumer removeConsumer, - TriConsumer confirmLeadershipConsumer, - BiFunction hasLeadershipFunction) { + TriFunction> + confirmLeadershipConsumer, + BiFunction> hasLeadershipFunction) { super(); this.registerConsumer = registerConsumer; @@ -219,13 +225,14 @@ protected void remove(String componentId) { } @Override - protected void confirmLeadership( + protected CompletableFuture confirmLeadership( String componentId, UUID leaderSessionID, String leaderAddress) { - confirmLeadershipConsumer.accept(componentId, leaderSessionID, leaderAddress); + return confirmLeadershipConsumer.apply(componentId, leaderSessionID, leaderAddress); } @Override - protected boolean hasLeadership(String componentId, UUID leaderSessionId) { + protected CompletableFuture hasLeadership( + String componentId, UUID leaderSessionId) { return hasLeadershipFunction.apply(componentId, leaderSessionId); } @@ -252,8 +259,9 @@ private static class Builder { private BiConsumerWithException registerConsumer; private Consumer removeConsumer; - private TriConsumer confirmLeadershipConsumer; - private BiFunction hasLeadershipFunction; + private TriFunction> + confirmLeadershipConsumer; + private BiFunction> hasLeadershipFunction; private Builder() {} @@ -269,13 +277,14 @@ public Builder setRemoveConsumer(Consumer removeConsumer) { } public Builder setConfirmLeadershipConsumer( - TriConsumer confirmLeadershipConsumer) { + TriFunction> + confirmLeadershipConsumer) { this.confirmLeadershipConsumer = confirmLeadershipConsumer; return this; } public Builder setHasLeadershipFunction( - BiFunction hasLeadershipFunction) { + BiFunction> hasLeadershipFunction) { this.hasLeadershipFunction = hasLeadershipFunction; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index 92c4720878df3..e91ed4b2e7880 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; /** Tests for leader election. */ @@ -89,16 +90,25 @@ void testHasLeadership() throws Exception { final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue(); - assertThat(leaderElection.hasLeadership(UUID.randomUUID())).isFalse(); + assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(true); + assertThatFuture(leaderElection.hasLeadership(UUID.randomUUID())) + .eventuallySucceeds() + .isEqualTo(false); - leaderElection.confirmLeadership(leaderSessionId, "foobar"); + assertThatFuture(leaderElection.confirmLeadership(leaderSessionId, "foobar")) + .eventuallySucceeds(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue(); + assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(true); leaderElection.close(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isFalse(); + assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(false); assertThat(manualLeaderContender.waitForLeaderSessionId()) .as("The leadership has been revoked from the contender.") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java index de8e782254940..59b6aace43ea9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; class StandaloneLeaderElectionTest { @@ -85,20 +86,28 @@ void testHasLeadershipWithContender() throws Exception { try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { testInstance.startLeaderElection(contender); - assertThat(testInstance.hasLeadership(SESSION_ID)).isTrue(); + assertThatFuture(testInstance.hasLeadership(SESSION_ID)) + .eventuallySucceeds() + .isEqualTo(true); final UUID differentSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(differentSessionID)).isFalse(); + assertThatFuture(testInstance.hasLeadership(differentSessionID)) + .eventuallySucceeds() + .isEqualTo(false); } } @Test void testHasLeadershipWithoutContender() throws Exception { try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { - assertThat(testInstance.hasLeadership(SESSION_ID)).isFalse(); + assertThatFuture(testInstance.hasLeadership(SESSION_ID)) + .eventuallySucceeds() + .isEqualTo(false); final UUID differentSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(differentSessionID)).isFalse(); + assertThatFuture(testInstance.hasLeadership(differentSessionID)) + .eventuallySucceeds() + .isEqualTo(false); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java index d566289c711a0..3352c1d256158 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java @@ -50,9 +50,12 @@ public void grantLeadership(UUID leaderSessionID) { this.leaderSessionID = leaderSessionID; - leaderElection.confirmLeadership(leaderSessionID, address); - - leaderEventQueue.offer(LeaderInformation.known(leaderSessionID, address)); + leaderElection + .confirmLeadership(leaderSessionID, address) + .thenRun( + () -> + leaderEventQueue.offer( + LeaderInformation.known(leaderSessionID, address))); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java index cd59be897327f..bc147365a2303 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; @@ -65,17 +66,21 @@ public synchronized void startLeaderElection(LeaderContender contender) throws E } @Override - public synchronized void confirmLeadership(UUID leaderSessionID, String leaderAddress) { + public synchronized CompletableFuture confirmLeadership( + UUID leaderSessionID, String leaderAddress) { if (leaderSessionID.equals(this.issuedLeaderSessionId) && confirmationFuture != null && !confirmationFuture.isDone()) { confirmationFuture.complete(LeaderInformation.known(leaderSessionID, leaderAddress)); } + + return FutureUtils.completedVoidFuture(); } @Override - public synchronized boolean hasLeadership(UUID leaderSessionId) { - return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId); + public synchronized CompletableFuture hasLeadership(UUID leaderSessionId) { + return CompletableFuture.completedFuture( + hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId)); } private boolean hasLeadership() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index c42a2a9b4047a..59288ba37e934 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -93,11 +94,14 @@ private enum NoOpLeaderElection implements LeaderElection { public void startLeaderElection(LeaderContender contender) {} @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {} + public CompletableFuture confirmLeadership( + UUID leaderSessionID, String leaderAddress) { + return FutureUtils.completedVoidFuture(); + } @Override - public boolean hasLeadership(UUID leaderSessionId) { - return false; + public CompletableFuture hasLeadership(UUID leaderSessionId) { + return CompletableFuture.completedFuture(false); } @Override