Skip to content

Commit

Permalink
[FLINK-37241][runtime] Remove Mockito usage from StateBackendTestBase.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmvk committed Feb 13, 2025
1 parent 9689af6 commit 6dcda11
Showing 1 changed file with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
Expand Down Expand Up @@ -124,11 +125,6 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
* Tests for the {@link KeyedStateBackend} and {@link OperatorStateBackend} as produced by various
Expand Down Expand Up @@ -4545,7 +4541,7 @@ void testQueryableStateRegistration() throws Exception {
try {
KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();

KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
final TestingKvStateRegistryListener listener = new TestingKvStateRegistryListener();
registry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, listener);

ValueStateDescriptor<Integer> desc =
Expand All @@ -4556,13 +4552,13 @@ void testQueryableStateRegistration() throws Exception {
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);

// Verify registered
verify(listener, times(1))
.notifyKvStateRegistered(
eq(env.getJobID()),
eq(env.getJobVertexId()),
eq(expectedKeyGroupRange),
eq("banana"),
any(KvStateID.class));
assertThat(
listener.isRegistered(
env.getJobID(),
env.getJobVertexId(),
expectedKeyGroupRange,
"banana"))
.isTrue();

KeyedStateHandle snapshot =
runSnapshot(
Expand All @@ -4575,13 +4571,15 @@ void testQueryableStateRegistration() throws Exception {

backend.dispose();

verify(listener, times(1))
.notifyKvStateUnregistered(
eq(env.getJobID()),
eq(env.getJobVertexId()),
eq(expectedKeyGroupRange),
eq("banana"));
assertThat(
listener.isRegistered(
env.getJobID(),
env.getJobVertexId(),
expectedKeyGroupRange,
"banana"))
.isFalse();
backend.dispose();

// Initialize again
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
if (snapshot != null) {
Expand All @@ -4592,13 +4590,13 @@ void testQueryableStateRegistration() throws Exception {
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);

// Verify registered again
verify(listener, times(2))
.notifyKvStateRegistered(
eq(env.getJobID()),
eq(env.getJobVertexId()),
eq(expectedKeyGroupRange),
eq("banana"),
any(KvStateID.class));
assertThat(
listener.isRegistered(
env.getJobID(),
env.getJobVertexId(),
expectedKeyGroupRange,
"banana"))
.isTrue();
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
Expand Down Expand Up @@ -5699,6 +5697,50 @@ public static final class MutableLong {
long value;
}

private static class TestingKvStateRegistryListener implements KvStateRegistryListener {

private final Map<String, KvStateID> registeredStates = new HashMap<>();

private String createKey(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
return String.format(
"%s-%s-%s-%s",
jobId, jobVertexId, keyGroupRange.prettyPrintInterval(), registrationName);
}

private boolean isRegistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
return registeredStates.containsKey(
createKey(jobId, jobVertexId, keyGroupRange, registrationName));
}

@Override
public void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId) {
registeredStates.put(
createKey(jobId, jobVertexId, keyGroupRange, registrationName), kvStateId);
}

@Override
public void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
registeredStates.remove(createKey(jobId, jobVertexId, keyGroupRange, registrationName));
}
}

private MockEnvironment buildMockEnv() throws Exception {
MockEnvironment mockEnvironment =
MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).build();
Expand Down

0 comments on commit 6dcda11

Please sign in to comment.