From 29d8057a8aa791c526ffd43d955a09613f51663d Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Mon, 27 Jan 2020 14:22:06 -0800 Subject: [PATCH] fixed broken test after kafka merge --- .../ksql/services/SimpleKsqlClient.java | 4 +- .../ksql/rest/server/KsqlRestApplication.java | 14 +-- .../ksql/rest/server/LivenessFilter.java | 2 - .../server/execution/PullQueryExecutor.java | 50 +++++----- .../resources/ActiveStandbyResource.java | 96 ------------------- .../HeartbeatAgentFunctionalTest.java | 6 +- .../integration/HighAvailabilityTestUtil.java | 94 ++++++++++-------- .../PullQueryRoutingFunctionalTest.java | 76 ++++----------- .../rest/server/KsqlRestApplicationTest.java | 6 +- .../resources/ActiveStandbyResourceTest.java | 93 ------------------ .../streams/materialization/ks/KsLocator.java | 12 ++- .../materialization/ks/KsStateStore.java | 7 +- .../materialization/ks/KsLocatorTest.java | 86 +++++++++-------- .../materialization/ks/KsStateStoreTest.java | 14 +-- 14 files changed, 176 insertions(+), 384 deletions(-) delete mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java delete mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java index 8cf13779f54a..bd816d76190e 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java @@ -58,8 +58,8 @@ void makeAsyncHeartbeatRequest( RestResponse makeClusterStatusRequest(URI serverEndPoint); /** - * Send a request to remote Ksql server to inquire about the state stores it is active and - * standby. + * Send a request to remote Ksql server to inquire to inquire about which state stores the + * remote server maintains as an active and standby. * @param serverEndPoint the remote destination. * @return response containing the state stores for which the remote host is active and standby. */ diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 3e042d323acc..5617de1e9271 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -53,7 +53,6 @@ import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor; import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; -import io.confluent.ksql.rest.server.resources.ActiveStandbyResource; import io.confluent.ksql.rest.server.resources.ClusterStatusResource; import io.confluent.ksql.rest.server.resources.HealthCheckResource; import io.confluent.ksql.rest.server.resources.HeartbeatResource; @@ -161,7 +160,6 @@ public final class KsqlRestApplication extends ExecutableApplication configurables; private final Consumer rocksDBConfigSetterHandler; private final Optional heartbeatAgent; - private final ActiveStandbyResource activeStandbyResource; // Cannot be set in constructor, depends on parent server start private KsqlConfig ksqlConfigWithPort; @@ -193,8 +191,7 @@ public static SourceName getCommandsStreamName() { final List preconditions, final List configurables, final Consumer rocksDBConfigSetterHandler, - final Optional heartbeatAgent, - final ActiveStandbyResource activeStandbyResource + final Optional heartbeatAgent ) { super(restConfig); @@ -219,7 +216,6 @@ public static SourceName getCommandsStreamName() { this.rocksDBConfigSetterHandler = requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler"); this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); - this.activeStandbyResource = requireNonNull(activeStandbyResource, "activeStandbyResource"); } @Override @@ -235,7 +231,6 @@ public void setupResources(final Configurable config, final KsqlRestConfig ap config.register(new HeartbeatResource(heartbeatAgent.get())); config.register(new ClusterStatusResource(ksqlEngine, heartbeatAgent.get())); } - config.register(activeStandbyResource); config.register(new KsqlExceptionMapper()); config.register(new ServerStateDynamicBinding(serverState)); } @@ -258,8 +253,6 @@ public void startAsync() { .getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG)); heartbeatAgent.get().startAgent(); } - activeStandbyResource.setLocalHostInfo((String)ksqlConfigWithPort - .getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG)); displayWelcomeMessage(); } @@ -618,8 +611,6 @@ static KsqlRestApplication buildApplication( errorHandler ); - final ActiveStandbyResource activeStandbyResource = new ActiveStandbyResource(ksqlEngine); - final List managedTopics = new LinkedList<>(); managedTopics.add(commandTopicName); if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) { @@ -671,8 +662,7 @@ static KsqlRestApplication buildApplication( preconditions, configurables, rocksDBConfigSetterHandler, - heartbeatAgent, - activeStandbyResource + heartbeatAgent ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java index 2e554f892c76..dbf39c1f5ed8 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java @@ -38,8 +38,6 @@ public boolean filter(final HostInfo hostInfo, final String storeName, final int if (!hostStatus.containsKey(hostInfo)) { return true; } - System.out.println("-------------> Host " + hostInfo - + " is alive " + hostStatus.get(hostInfo).isHostAlive()); return hostStatus.get(hostInfo).isHostAlive(); } return true; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 47770e0ba678..cc4c4f5fa3f0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -196,47 +196,48 @@ public static TableRowsEntity execute( } } - @VisibleForTesting - static TableRowsEntity handlePullQuery( + private static TableRowsEntity handlePullQuery( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext, final PullQueryContext pullQueryContext, final List routingFilters ) { - // Get active and standby nodes for this key - final Locator locator = pullQueryContext.mat.locator(); - final List filteredAndOrderedNodes = locator.locate( - pullQueryContext.rowKey, routingFilters); + try { + // Get active and standby nodes for this key + final Locator locator = pullQueryContext.mat.locator(); + final List filteredAndOrderedNodes = locator.locate( + pullQueryContext.rowKey, routingFilters); - if (filteredAndOrderedNodes.isEmpty()) { - throw new MaterializationException(String.format( - "Unable to execute pull query : %s. Streams Metadata not initialized", - statement.getStatementText())); - } + if (filteredAndOrderedNodes.isEmpty()) { + throw new MaterializationException("All nodes are dead or exceed max allowed lag."); + } - if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) { - System.out.println("---------> Query standby enabled"); - for (KsqlNode node : filteredAndOrderedNodes) { - try { - return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext); - } catch (Throwable t) { - LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t); + if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) { + // Nodes are ordered by preference: active is first if alive then standby nodes in + // increasing order of lag. + for (KsqlNode node : filteredAndOrderedNodes) { + try { + return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext); + } catch (Throwable t) { + LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t); + } } } - } else { - // Only active handles the query + // Only active handles the query. // Fail fast if active is dead: Let client handle retries. return routeQuery( filteredAndOrderedNodes.get(0), statement, executionContext, serviceContext, pullQueryContext); + + } catch (Throwable t) { + throw new MaterializationException(String.format( + "Unable to execute pull query: %s. Error: %s", statement.getStatementText(), + t.getMessage()), t); } - throw new MaterializationException( - "Unable to execute pull query :" + statement.getStatementText()); } - @VisibleForTesting - static TableRowsEntity routeQuery( + private static TableRowsEntity routeQuery( final KsqlNode node, final ConfiguredStatement statement, final KsqlExecutionContext executionContext, @@ -244,7 +245,6 @@ static TableRowsEntity routeQuery( final PullQueryContext pullQueryContext ) { try { - System.out.println("---------> Route Query to host " + node); if (node.isLocal()) { LOG.info("Query {} executed locally at host {}.", statement.getStatementText(), node.location()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java deleted file mode 100644 index 8ae4fe9e2bb2..000000000000 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server.resources; - -import com.google.common.annotations.VisibleForTesting; -import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.rest.entity.ActiveStandbyEntity; -import io.confluent.ksql.rest.entity.ActiveStandbyResponse; -import io.confluent.ksql.rest.entity.Versions; -import io.confluent.ksql.rest.server.ServerUtil; -import io.confluent.ksql.util.PersistentQueryMetadata; -import io.confluent.ksql.util.QueryMetadata; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.StreamsMetadata; - -/** - * Endpoint for registering heartbeats received from remote servers. The heartbeats are used - * to determine the status of the remote servers, i.e. whether they are alive or dead. - */ - -@Path("/activeStandby") -@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) -public class ActiveStandbyResource { - - private final KsqlEngine engine; - private HostInfo localHostInfo; - - public ActiveStandbyResource(final KsqlEngine engine) { - this.engine = engine; - } - - public void setLocalHostInfo(final String applicationServerId) { - this.localHostInfo = ServerUtil.parseHostInfo(applicationServerId); - } - - @GET - public Response getActiveStandbyInformation() { - return Response.ok(getResponse()).build(); - } - - @VisibleForTesting - ActiveStandbyResponse getResponse() { - final List currentQueries = engine.getPersistentQueries(); - if (currentQueries.isEmpty()) { - // empty response - return new ActiveStandbyResponse(Collections.emptyMap()); - } - - final Map perQueryMap = new HashMap<>(); - for (PersistentQueryMetadata persistentMetadata: currentQueries) { - for (StreamsMetadata streamsMetadata : ((QueryMetadata)persistentMetadata).getAllMetadata()) { - if (streamsMetadata == null || !streamsMetadata.hostInfo().equals(localHostInfo)) { - continue; - } - final ActiveStandbyEntity entity = new ActiveStandbyEntity( - streamsMetadata.stateStoreNames(), - streamsMetadata.topicPartitions() - .stream() - .map(TopicPartition::toString) - .collect(Collectors.toSet()), - streamsMetadata.standbyStateStoreNames(), - streamsMetadata.standbyTopicPartitions() - .stream() - .map(TopicPartition::toString) - .collect(Collectors.toSet())); - perQueryMap.put(((QueryMetadata)persistentMetadata).getQueryApplicationId(), entity); - } - - } - return new ActiveStandbyResponse(perQueryMap); - } -} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java index fca411bac3ce..100651eaab60 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java @@ -107,7 +107,7 @@ public void shouldMarkServersAsUp() { REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // When: - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000); final ClusterStatusResponse clusterStatusResponseUp = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); @@ -134,7 +134,7 @@ public void shouldMarkRemoteServerAsDown() { public void shouldMarkRemoteServerAsUpThenDownThenUp() { // Given: HighAvailabilityTestUtil.waitForClusterToBeDiscovered(2, REST_APP_0); - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000); // When: final ClusterStatusResponse clusterStatusResponseUp1 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( @@ -153,7 +153,7 @@ public void shouldMarkRemoteServerAsUpThenDownThenUp() { assertThat(clusterStatusResponseDown.getClusterStatus().get(host1).getHostAlive(), is(false)); // When : - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000); ClusterStatusResponse clusterStatusResponseUp2 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java index dc928d33f211..dca7d585c877 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java @@ -21,46 +21,37 @@ import io.confluent.ksql.rest.entity.HostInfoEntity; import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.rest.server.TestKsqlRestApp; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.function.BiFunction; +import java.util.stream.Collectors; class HighAvailabilityTestUtil { - static void waitForClusterToBeDiscovered(final int numServers, final TestKsqlRestApp restApp) { - while (true) { - final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp); - if(allServersDiscovered(numServers, clusterStatusResponse.getClusterStatus())) { - break; - } - try { - Thread.sleep(200); - } catch (final Exception e) { - // Meh - } - } - } + static ClusterStatusResponse sendClusterStatusRequest(final TestKsqlRestApp restApp) { - static boolean allServersDiscovered( - final int numServers, - final Map clusterStatus) { + try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - if(clusterStatus.size() < numServers) { - return false; + final RestResponse res = restClient.makeClusterStatusRequest(); + + if (res.isErroneous()) { + throw new AssertionError("Erroneous result: " + res.getErrorMessage()); + } + + return res.getResponse(); } - return true; } - static void sendHeartbeartsEveryIntervalForWindowLength( + static void sendHeartbeartsForWindowLength( final TestKsqlRestApp receiverApp, final HostInfoEntity sender, - final long interval, final long window) { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < window) { sendHeartbeatRequest(receiverApp, sender, System.currentTimeMillis()); try { - Thread.sleep(interval); + Thread.sleep(200); } catch (final Exception e) { // Meh } @@ -85,6 +76,41 @@ static ClusterStatusResponse waitForRemoteServerToChangeStatus( } } + static void waitForClusterToBeDiscovered(final int numServers, final TestKsqlRestApp restApp) { + while (true) { + final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp); + if(allServersDiscovered(numServers, clusterStatusResponse.getClusterStatus())) { + break; + } + try { + Thread.sleep(200); + } catch (final Exception e) { + // Meh + } + } + } + + static void waitForStreamsMetadataToInitialize( + final TestKsqlRestApp restApp, List hosts, String queryId) { + + while (true) { + ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(restApp); + List initialized = hosts.stream().filter( + hostInfo -> clusterStatusResponse + .getClusterStatus() + .get(hostInfo) + .getPerQueryActiveStandbyEntity() + .get(queryId) != null).collect(Collectors.toList()); + if(initialized.size() == hosts.size()) + break; + } + try { + Thread.sleep(200); + } catch (final Exception e) { + // Meh + } + } + static boolean remoteServerIsDown( final HostInfoEntity remoteServer, final Map clusterStatus) { @@ -112,7 +138,14 @@ static boolean remoteServerIsUp( return false; } - static void sendHeartbeatRequest( + private static boolean allServersDiscovered( + final int numServers, + final Map clusterStatus) { + + return clusterStatus.size() >= numServers; + } + + private static void sendHeartbeatRequest( final TestKsqlRestApp restApp, final HostInfoEntity hostInfoEntity, final long timestamp) { @@ -121,19 +154,4 @@ static void sendHeartbeatRequest( restClient.makeAsyncHeartbeatRequest(hostInfoEntity, timestamp); } } - - static ClusterStatusResponse sendClusterStatusRequest(final TestKsqlRestApp restApp) { - - try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - - final RestResponse res = restClient.makeClusterStatusRequest(); - - if (res.isErroneous()) { - throw new AssertionError("Erroneous result: " + res.getErrorMessage()); - } - - return res.getResponse(); - } - } - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java index d355baecd991..ea6896c6eea2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -27,10 +27,7 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; -import io.confluent.ksql.rest.client.KsqlRestClient; -import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.ActiveStandbyEntity; -import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.HostInfoEntity; import io.confluent.ksql.rest.entity.StreamedRow; @@ -56,8 +53,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; @@ -88,10 +87,10 @@ public class PullQueryRoutingFunctionalTest { private static final UserDataProvider USER_PROVIDER = new UserDataProvider(); private static final Format VALUE_FORMAT = Format.JSON; private static final int HEADER = 1; - + private static String output; private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); - private static final int BASE_TIME = 1_000_000; + private static final String QUERY_ID = "_confluent-ksql-default_query_CTAS_ID_0_0"; private static final PhysicalSchema AGGREGATE_SCHEMA = PhysicalSchema.from( LogicalSchema.builder() @@ -100,8 +99,6 @@ public class PullQueryRoutingFunctionalTest { SerdeOption.none() ); - private static final String QUERY_ID = "_confluent-ksql-default_query_CTAS_ID_0_0"; - private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) @@ -152,7 +149,8 @@ public class PullQueryRoutingFunctionalTest { .around(REST_APP_1) .around(REST_APP_2); - private static String output; + @Rule + public final ExpectedException expectedException = ExpectedException.none(); @BeforeClass public static void setUpClass() { @@ -186,6 +184,7 @@ public void setUp() { REST_APP_1.start(); REST_APP_2.start(); output = KsqlIdentifierTestUtil.uniqueIdentifierName(); + } @After @@ -195,7 +194,6 @@ public void cleanUp() { REST_APP_2.stop(); } - @Test(timeout = 60000) public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { // Given: @@ -207,11 +205,12 @@ public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { + " GROUP BY " + USER_PROVIDER.key() + ";" ); waitForTableRows(); - waitForStreamsMetadataToInitialize(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, REST_APP_0); + HighAvailabilityTestUtil.waitForStreamsMetadataToInitialize( + REST_APP_0, ImmutableList.of(host0, host1, host2), QUERY_ID); ClusterFormation clusterFormation = findClusterFormation(); - HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.standBy.right); - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( - clusterFormation.standBy.right, clusterFormation.active.left, 100, 2000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength( + clusterFormation.standBy.right, clusterFormation.active.left, 2000); HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( clusterFormation.standBy.right, clusterFormation.active.left, HighAvailabilityTestUtil::remoteServerIsUp); @@ -224,7 +223,6 @@ public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); } - @Test(timeout = 60000) public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { // Given: @@ -236,11 +234,12 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { + " GROUP BY " + USER_PROVIDER.key() + ";" ); waitForTableRows(); - waitForStreamsMetadataToInitialize(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, REST_APP_0); + HighAvailabilityTestUtil.waitForStreamsMetadataToInitialize( + REST_APP_0, ImmutableList.of(host0, host1, host2), QUERY_ID); ClusterFormation clusterFormation = findClusterFormation(); - HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.router.right); - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( - clusterFormation.router.right, clusterFormation.active.left, 100, 2000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength( + clusterFormation.router.right, clusterFormation.active.left, 2000); HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( clusterFormation.router.right, clusterFormation.active.left, @@ -255,7 +254,6 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); } - @Test(timeout = 60000) public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() { // Given: @@ -268,11 +266,11 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() { ); waitForTableRows(); HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, REST_APP_0); - waitForStreamsMetadataToInitialize(); + HighAvailabilityTestUtil.waitForStreamsMetadataToInitialize( + REST_APP_0, ImmutableList.of(host0, host1, host2), QUERY_ID); ClusterFormation clusterFormation = findClusterFormation(); - System.out.println("---------------> Cluster Formation: " + clusterFormation); - HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( - clusterFormation.router.right, clusterFormation.standBy.left, 100, 2000); + HighAvailabilityTestUtil.sendHeartbeartsForWindowLength( + clusterFormation.router.right, clusterFormation.standBy.left, 2000); HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( clusterFormation.router.right, clusterFormation.standBy.left, @@ -298,22 +296,6 @@ private static void makeAdminRequest(final String sql) { RestIntegrationTestUtil.makeKsqlRequest(REST_APP_0, sql, Optional.empty()); } - private void waitForStreamsMetadataToInitialize() { - while (true) { - ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(REST_APP_0); - if(clusterStatusResponse.getClusterStatus().get(host0).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null - && clusterStatusResponse.getClusterStatus().get(host1).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null - && clusterStatusResponse.getClusterStatus().get(host2).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null) { - break; - } - try { - Thread.sleep(200); - } catch (final Exception e) { - // Meh - } - } - } - private static ClusterFormation findClusterFormation() { ClusterFormation clusterFormation = new ClusterFormation(); ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(REST_APP_0); @@ -324,8 +306,6 @@ private static ClusterFormation findClusterFormation() { ActiveStandbyEntity entity2 = clusterStatusResponse.getClusterStatus().get(host2) .getPerQueryActiveStandbyEntity().get(QUERY_ID); - System.out.println(" --------> ClusterStatusResponse = " + clusterStatusResponse); - // find active if(!entity0.getActiveStores().isEmpty() && !entity0.getActivePartitions().isEmpty()) { clusterFormation.setActive(Pair.of(host0, REST_APP_0)); @@ -359,20 +339,6 @@ else if(entity1.getStandByStores().isEmpty() && entity1.getActiveStores().isEmpt return clusterFormation; } - private static ActiveStandbyResponse makeActiveStandbyRequest(final TestKsqlRestApp restApp) { - - try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - - final RestResponse res = restClient.makeActiveStandbyRequest(); - - if (res.isErroneous()) { - throw new AssertionError("Erroneous result: " + res.getErrorMessage()); - } - - return res.getResponse(); - } - } - static class ClusterFormation { Pair active; Pair standBy; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 4b582e1b88db..0f4b66d5e785 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -43,7 +43,6 @@ import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; -import io.confluent.ksql.rest.server.resources.ActiveStandbyResource; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.resources.RootDocument; import io.confluent.ksql.rest.server.resources.StatusResource; @@ -129,8 +128,6 @@ public class KsqlRestApplicationTest { private Consumer rocksDBConfigSetterHandler; @Mock private HeartbeatAgent heartbeatAgent; - @Mock - private ActiveStandbyResource activeStandbyResource; @Mock private SchemaRegistryClient schemaRegistryClient; @@ -437,8 +434,7 @@ private void givenAppWithRestConfig(final Map restConfigMap) { ImmutableList.of(precondition1, precondition2), ImmutableList.of(ksqlResource, streamedQueryResource), rocksDBConfigSetterHandler, - Optional.of(heartbeatAgent), - activeStandbyResource + Optional.of(heartbeatAgent) ); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java deleted file mode 100644 index b054ae82740e..000000000000 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2020 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"; you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server.resources; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.rest.entity.ActiveStandbyResponse; -import io.confluent.ksql.rest.server.ServerUtil; -import io.confluent.ksql.util.PersistentQueryMetadata; -import io.confluent.ksql.util.QueryMetadata; -import java.util.List; -import javax.ws.rs.core.Response; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.StreamsMetadata; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class ActiveStandbyResourceTest { - - @Mock - private KsqlEngine ksqlEngine; - @Mock - private PersistentQueryMetadata query; - @Mock - private StreamsMetadata streamsMetadata; - - private static final String APPLICATION_SERVER_ID = "http://localhost:8088"; - private HostInfo hostInfo; - private List allMetadata; - private ActiveStandbyResource activeStandbyResource; - - @Before - public void setUp() { - hostInfo = ServerUtil.parseHostInfo(APPLICATION_SERVER_ID); - activeStandbyResource = new ActiveStandbyResource(ksqlEngine); - activeStandbyResource.setLocalHostInfo(APPLICATION_SERVER_ID); - allMetadata = ImmutableList.of(streamsMetadata); - } - - @Test - public void shouldReturnResponse() { - // When: - final Response response = activeStandbyResource.getActiveStandbyInformation(); - - // Then: - assertThat(response.getStatus(), is(200)); - assertThat(response.getEntity(), instanceOf(ActiveStandbyResponse.class)); - } - - @Test - public void shouldReturnActiveAndStandByStores() { - // Given: - when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(query)); - when(query.getAllMetadata()).thenReturn(allMetadata); - when(((QueryMetadata)query).getQueryApplicationId()).thenReturn("test"); - when(streamsMetadata.hostInfo()).thenReturn(hostInfo); - when(streamsMetadata.stateStoreNames()).thenReturn(ImmutableSet.of("activeStore1", "activeStore2")); - when(streamsMetadata.standbyStateStoreNames()).thenReturn(ImmutableSet.of("standByStore1", "standByStore2")); - - // When: - final ActiveStandbyResponse response = activeStandbyResource.getResponse(); - - // Then: - assertThat(response.getPerQueryInfo().get("test").getActiveStores().containsAll( - ImmutableSet.of("activeStore1", "activeStore2")), is(true)); - assertThat(response.getPerQueryInfo().get("test").getStandByStores().containsAll( - ImmutableSet.of("standByStore1", "standByStore2")), is(true)); - } - -} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index 2a8adbe57aee..defb0c15aa51 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -21,10 +21,10 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.execution.streams.materialization.Locator; +import io.confluent.ksql.execution.streams.materialization.MaterializationException; import java.net.URI; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -68,16 +68,18 @@ public List locate( final KeyQueryMetadata metadata = kafkaStreams .queryMetadataForKey(stateStoreName, key, keySerializer); - // FAil fast if Streams not ready. Let client handle it + // Fail fast if Streams not ready. Let client handle it if (metadata == KeyQueryMetadata.NOT_AVAILABLE) { - LOG.debug("Streams Metadata not available"); - return Collections.emptyList(); + LOG.debug("KeyQueryMetadata not available for state store {} and key {}", + stateStoreName, key); + throw new MaterializationException(String.format( + "KeyQueryMetadata not available for state store %s and key %s", stateStoreName, key)); } final HostInfo activeHost = metadata.getActiveHost(); final Set standByHosts = metadata.getStandbyHosts(); - LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); + LOG.info("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); final List hosts = new ArrayList<>(); hosts.add(activeHost); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java index 7134d092b2f3..fa7db40e2ff9 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java @@ -71,7 +71,12 @@ T store(final QueryableStoreType queryableStoreType) { awaitRunning(); try { - return kafkaStreams.store(stateStoreName, queryableStoreType); + if (ksqlConfig.getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) { + // True flag allows queries on standby and replica state stores + return kafkaStreams.store(stateStoreName, queryableStoreType, true); + } + // False flag allows queries only on active state store + return kafkaStreams.store(stateStoreName, queryableStoreType, false); } catch (final Exception e) { final State state = kafkaStreams.state(); if (state != State.RUNNING) { diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java index f540db388bab..3e82e1312d96 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java @@ -19,15 +19,19 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.testing.NullPointerTester; import com.google.common.testing.NullPointerTester.Visibility; import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; +import io.confluent.ksql.execution.streams.materialization.MaterializationException; import io.confluent.ksql.rest.entity.HostStatusEntity; import java.net.MalformedURLException; import java.net.URI; @@ -43,7 +47,9 @@ import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.state.HostInfo; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -62,25 +68,32 @@ public class KsLocatorTest { @Mock private Serializer keySerializer; @Mock + private IRoutingFilter livenessFilter; + @Mock private HostInfo activeHostInfo; @Mock private HostInfo standByHostInfo1; @Mock private HostInfo standByHostInfo2; - @Mock - private IRoutingFilter livenessFilter; private KsLocator locator; private KsqlNode activeNode; private KsqlNode standByNode1; private KsqlNode standByNode2; - private Optional> hostsStatus; + + private Map hostsStatus; private List routingFilters; + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Before public void setUp() { locator = new KsLocator(STORE_NAME, kafkaStreams, keySerializer, LOCAL_HOST_URL); + //activeHostInfo = new HostInfo("remoteHost", 2345); + //standByHostInfo1 = new HostInfo("standBy1", 1234); + //standByHostInfo2 = new HostInfo("standBy2", 5678); when(activeHostInfo.host()).thenReturn("remoteHost"); when(activeHostInfo.port()).thenReturn(2345); @@ -94,7 +107,7 @@ public void setUp() { standByNode1 = locator.asNode(standByHostInfo1); standByNode2 = locator.asNode(standByHostInfo2); - hostsStatus = Optional.of(ImmutableMap.of( + hostsStatus = ImmutableMap.of( activeHostInfo, new HostStatusEntity( true, 0L, @@ -107,9 +120,9 @@ standByHostInfo2, new HostStatusEntity( true, 0L, Collections.emptyMap()) - )); + ); - routingFilters.add(livenessFilter); + routingFilters = ImmutableList.of(livenessFilter); } @Test @@ -122,35 +135,26 @@ public void shouldThrowNPEs() { } @Test - @SuppressWarnings("deprecation") - public void shouldRequestMetadata() { + public void shouldThrowIfMetadataNotAvailable() { // Given: getEmtpyMetadata(); - // When: - locator.locate(SOME_KEY, routingFilters); + // Expect: + expectedException.expect(MaterializationException.class); + expectedException.expectMessage( + "KeyQueryMetadata not available for state store someStoreName and key Struct{}"); - // Then: - verify(kafkaStreams).queryMetadataForKey(STORE_NAME, SOME_KEY, keySerializer); - } - - @Test - public void shouldReturnEmptyIfOwnerNotKnown() { - // Given: - getEmtpyMetadata(); // When: final List result = locator.locate(SOME_KEY, routingFilters); - - // Then: - assertThat(result.isEmpty(), is(true)); } + @Test public void shouldReturnOwnerIfKnown() { // Given: getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -169,7 +173,7 @@ public void shouldReturnLocalOwnerIfSameAsSuppliedLocalHost() { when(activeHostInfo.host()).thenReturn(LOCAL_HOST_URL.getHost()); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -184,7 +188,7 @@ public void shouldReturnLocalOwnerIfExplicitlyLocalHostOnSamePortAsSuppliedLocal when(activeHostInfo.host()).thenReturn("LocalHOST"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -199,7 +203,7 @@ public void shouldReturnRemoteOwnerForDifferentHost() { when(activeHostInfo.host()).thenReturn("different"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -214,7 +218,7 @@ public void shouldReturnRemoteOwnerForDifferentPort() { when(activeHostInfo.host()).thenReturn(LOCAL_HOST_URL.getHost()); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort() + 1); getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -229,7 +233,7 @@ public void shouldReturnRemoteOwnerForDifferentPortOnLocalHost() { when(activeHostInfo.host()).thenReturn("LOCALhost"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort() + 1); getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -242,27 +246,27 @@ public void shouldReturnRemoteOwnerForDifferentPortOnLocalHost() { public void shouldReturnActiveAndStandBysWhenHeartBeatNotEnabled() { // Given: getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); - when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(true); - when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(true); + when(livenessFilter.filter(eq(standByHostInfo1), anyString(), anyInt())).thenReturn(true); + when(livenessFilter.filter(eq(standByHostInfo2), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.size(), is(3)); - assertThat(result.stream().findFirst(), is(activeNode)); - assertThat(result, containsInAnyOrder(standByNode1, standByNode2)); + assertThat(result.stream().findFirst().get(), is(activeNode)); + assertThat(result, containsInAnyOrder(activeNode, standByNode1, standByNode2)); } @Test public void shouldReturnStandBysWhenActiveDown() { // Given: getActiveAndStandbyMetadata(); - hostsStatus.get().get(activeHostInfo.toString()).setHostAlive(false); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(false); - when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(true); - when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); + hostsStatus.get(activeHostInfo).setHostAlive(false); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(false); + when(livenessFilter.filter(eq(standByHostInfo1), anyString(), anyInt())).thenReturn(true); + when(livenessFilter.filter(eq(standByHostInfo2), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); @@ -276,16 +280,16 @@ public void shouldReturnStandBysWhenActiveDown() { public void shouldReturnOneStandByWhenActiveAndOtherStandByDown() { // Given: getActiveAndStandbyMetadata(); - when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(false); - when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(false); - when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); + when(livenessFilter.filter(eq(activeHostInfo), anyString(), anyInt())).thenReturn(false); + when(livenessFilter.filter(eq(standByHostInfo1), anyString(), anyInt())).thenReturn(false); + when(livenessFilter.filter(eq(standByHostInfo2), anyString(), anyInt())).thenReturn(true); // When: final List result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.size(), is(1)); - assertThat(result.stream().findFirst(), is(standByNode2)); + assertThat(result.stream().findFirst().get(), is(standByNode2)); } @SuppressWarnings("unchecked") diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java index d9ef6f5970f6..cc2fa08d8df2 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java @@ -19,6 +19,8 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -142,7 +144,7 @@ public void shouldThrowIfNotRunningAfterFailedToGetStore() { .thenReturn(State.RUNNING) .thenReturn(State.NOT_RUNNING); - when(kafkaStreams.store(any(), any())).thenThrow(new IllegalStateException()); + when(kafkaStreams.store(any(), any(), anyBoolean())).thenThrow(new IllegalStateException()); // When: expectedException.expect(NotRunningException.class); @@ -163,7 +165,7 @@ public void shouldGetStoreOnceRunning() { // Then: final InOrder inOrder = Mockito.inOrder(kafkaStreams); inOrder.verify(kafkaStreams, atLeast(1)).state(); - inOrder.verify(kafkaStreams).store(any(), any()); + inOrder.verify(kafkaStreams).store(any(), any(), anyBoolean()); } @Test @@ -176,13 +178,13 @@ public void shouldRequestStore() { store.store(storeType); // Then: - verify(kafkaStreams).store(STORE_NAME, storeType); + verify(kafkaStreams).store(eq(STORE_NAME), eq(storeType), anyBoolean()); } @Test public void shouldThrowIfStoreNotAvailableWhenRequested() { // Given: - when(kafkaStreams.store(any(), any())).thenThrow(new InvalidStateStoreException("boom")); + when(kafkaStreams.store(any(), any(), anyBoolean())).thenThrow(new InvalidStateStoreException("boom")); // Then: expectedException.expect(MaterializationException.class); @@ -197,7 +199,7 @@ public void shouldThrowIfStoreNotAvailableWhenRequested() { public void shouldReturnSessionStore() { // Given: final ReadOnlySessionStore sessionStore = mock(ReadOnlySessionStore.class); - when(kafkaStreams.store(any(), any())).thenReturn(sessionStore); + when(kafkaStreams.store(any(), any(), anyBoolean())).thenReturn(sessionStore); // When: final ReadOnlySessionStore result = store @@ -211,7 +213,7 @@ public void shouldReturnSessionStore() { public void shouldReturnWindowStore() { // Given: final ReadOnlyWindowStore windowStore = mock(ReadOnlyWindowStore.class); - when(kafkaStreams.store(any(), any())).thenReturn(windowStore); + when(kafkaStreams.store(any(), any(), anyBoolean())).thenReturn(windowStore); // When: final ReadOnlyWindowStore result = store