From 6dcda117d04b8c2f88cbf4553b54a6d324f967ee Mon Sep 17 00:00:00 2001 From: David Moravek Date: Fri, 31 Jan 2025 12:18:48 +0000 Subject: [PATCH] [FLINK-37241][runtime] Remove Mockito usage from StateBackendTestBase. --- .../runtime/state/StateBackendTestBase.java | 94 ++++++++++++++----- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 86389a00f2c37..9ebc3d52c9a52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; @@ -124,11 +125,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assumptions.assumeThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** * Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as produced by various @@ -4545,7 +4541,7 @@ void testQueryableStateRegistration() throws Exception { try { KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange(); - KvStateRegistryListener listener = mock(KvStateRegistryListener.class); + final TestingKvStateRegistryListener listener = new TestingKvStateRegistryListener(); registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, listener); ValueStateDescriptor desc = @@ -4556,13 +4552,13 @@ void testQueryableStateRegistration() throws Exception { VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); // Verify registered - verify(listener, times(1)) - .notifyKvStateRegistered( - eq(env.getJobID()), - eq(env.getJobVertexId()), - eq(expectedKeyGroupRange), - eq("banana"), - any(KvStateID.class)); + assertThat( + listener.isRegistered( + env.getJobID(), + env.getJobVertexId(), + expectedKeyGroupRange, + "banana")) + .isTrue(); KeyedStateHandle snapshot = runSnapshot( @@ -4575,13 +4571,15 @@ void testQueryableStateRegistration() throws Exception { backend.dispose(); - verify(listener, times(1)) - .notifyKvStateUnregistered( - eq(env.getJobID()), - eq(env.getJobVertexId()), - eq(expectedKeyGroupRange), - eq("banana")); + assertThat( + listener.isRegistered( + env.getJobID(), + env.getJobVertexId(), + expectedKeyGroupRange, + "banana")) + .isFalse(); backend.dispose(); + // Initialize again backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); if (snapshot != null) { @@ -4592,13 +4590,13 @@ void testQueryableStateRegistration() throws Exception { VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); // Verify registered again - verify(listener, times(2)) - .notifyKvStateRegistered( - eq(env.getJobID()), - eq(env.getJobVertexId()), - eq(expectedKeyGroupRange), - eq("banana"), - any(KvStateID.class)); + assertThat( + listener.isRegistered( + env.getJobID(), + env.getJobVertexId(), + expectedKeyGroupRange, + "banana")) + .isTrue(); } finally { IOUtils.closeQuietly(backend); backend.dispose(); @@ -5699,6 +5697,50 @@ public static final class MutableLong { long value; } + private static class TestingKvStateRegistryListener implements KvStateRegistryListener { + + private final Map registeredStates = new HashMap<>(); + + private String createKey( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { + return String.format( + "%s-%s-%s-%s", + jobId, jobVertexId, keyGroupRange.prettyPrintInterval(), registrationName); + } + + private boolean isRegistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { + return registeredStates.containsKey( + createKey(jobId, jobVertexId, keyGroupRange, registrationName)); + } + + @Override + public void notifyKvStateRegistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName, + KvStateID kvStateId) { + registeredStates.put( + createKey(jobId, jobVertexId, keyGroupRange, registrationName), kvStateId); + } + + @Override + public void notifyKvStateUnregistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { + registeredStates.remove(createKey(jobId, jobVertexId, keyGroupRange, registrationName)); + } + } + private MockEnvironment buildMockEnv() throws Exception { MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).build();