From a60e2fa1804cfd3b6627c84fa917f08f52117a17 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 24 Jan 2020 18:28:12 -0800 Subject: [PATCH] fixed tests --- .../io/confluent/ksql/util/KsqlConfig.java | 2 +- .../ksql/rest/server/HeartbeatAgent.java | 5 ++ .../ksql/rest/server/KsqlRestApplication.java | 3 +- .../ksql/rest/server/LivenessFilter.java | 14 ++--- .../server/execution/PullQueryExecutor.java | 18 +++---- .../resources/ClusterStatusResource.java | 6 +-- .../streaming/StreamedQueryResource.java | 2 +- .../HeartbeatAgentFunctionalTest.java | 26 ++++----- .../integration/HighAvailabilityTestUtil.java | 31 ++++++----- .../PullQueryRoutingFunctionalTest.java | 53 ++++++++++++------- .../execution/PullQueryExecutorTest.java | 5 +- .../streaming/PullQueryPublisherTest.java | 16 +++--- .../streaming/StreamedQueryResourceTest.java | 8 +-- .../streaming/WSQueryEndpointTest.java | 6 +-- .../ksql/rest/entity/ActiveStandbyEntity.java | 10 ++++ .../rest/entity/ClusterStatusResponse.java | 5 ++ .../ksql/rest/entity/HostStatusEntity.java | 4 +- .../streams/materialization/Locator.java | 13 ----- .../streams/materialization/ks/KsLocator.java | 38 +++++-------- .../materialization/ks/KsLocatorTest.java | 5 -- 20 files changed, 132 insertions(+), 138 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index a8223060e913..51389f3c3db8 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -165,7 +165,7 @@ public class KsqlConfig extends AbstractConfig { "Config to enable or disable transient pull queries on a specific KSQL server."; public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true; - // TODO shall we remove this config? + // Shall we remove this config? public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG = "ksql.query.pull.routing.timeout.ms"; public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java index 7c026483421c..95a0c0bd1f35 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java @@ -474,6 +474,11 @@ public long getLastStatusUpdateMs() { public boolean isHostAlive() { return hostAlive; } + + @Override + public String toString() { + return hostAlive + "," + lastStatusUpdateMs; + } } public static class HeartbeatInfo { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 222837ae3a0b..3e042d323acc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -594,7 +594,8 @@ static KsqlRestApplication buildApplication( final Optional heartbeatAgent = initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext); - final List routingFilters = ImmutableList.of(new LivenessFilter(heartbeatAgent)); + final List routingFilters = ImmutableList.of( + new LivenessFilter(heartbeatAgent)); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java index 13d1d330ac92..2e554f892c76 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java @@ -18,7 +18,7 @@ import static java.util.Objects.requireNonNull; import io.confluent.ksql.execution.streams.IRoutingFilter; -import io.confluent.ksql.rest.entity.HostStatusEntity; +import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatus; import java.util.Map; import java.util.Optional; import org.apache.kafka.streams.state.HostInfo; @@ -27,18 +27,20 @@ public class LivenessFilter implements IRoutingFilter { private final Optional heartbeatAgent; - public LivenessFilter(Optional heartbeatAgent) { + public LivenessFilter(final Optional heartbeatAgent) { this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); } @Override - public boolean filter(final HostInfo hostInfo, String storeName, int partition) { + public boolean filter(final HostInfo hostInfo, final String storeName, final int partition) { if (heartbeatAgent.isPresent()) { - Map hostStatus = heartbeatAgent.get().getHostsStatus(); - if (!hostStatus.containsKey(hostInfo.toString())) { + final Map hostStatus = heartbeatAgent.get().getHostsStatus(); + if (!hostStatus.containsKey(hostInfo)) { return true; } - return hostStatus.get(hostInfo.toString()).getHostAlive(); + System.out.println("-------------> Host " + hostInfo + + " is alive " + hostStatus.get(hostInfo).isHostAlive()); + return hostStatus.get(hostInfo).isHostAlive(); } return true; } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index b159114a159e..47770e0ba678 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -216,14 +216,12 @@ static TableRowsEntity handlePullQuery( } if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) { + System.out.println("---------> Query standby enabled"); for (KsqlNode node : filteredAndOrderedNodes) { try { return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext); } catch (Throwable t) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error routing query {} to host {} ", - statement.getStatementText(), node, t); - } + LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t); } } } else { @@ -246,22 +244,20 @@ static TableRowsEntity routeQuery( final PullQueryContext pullQueryContext ) { try { + System.out.println("---------> Route Query to host " + node); if (node.isLocal()) { - LOG.debug("Query {} executed locally at host {}.", - statement.getStatementText(), node.location()); + LOG.info("Query {} executed locally at host {}.", + statement.getStatementText(), node.location()); return queryRowsLocally( statement, executionContext, pullQueryContext); } else { - LOG.debug("Query {} routed to host {}.", - statement.getStatementText(), node.location()); + LOG.info("Query {} routed to host {}.", statement.getStatementText(), node.location()); return forwardTo(node, statement, serviceContext); } } catch (Throwable t) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error routing query " + statement.getStatementText() + " to " + node, t); - } + LOG.info("Error routing query " + statement.getStatementText() + " to " + node, t); } throw new MaterializationException( "Unable to execute pull query :" + statement.getStatementText()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java index 920f41379421..a9940e0dbf61 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java @@ -64,9 +64,9 @@ public Response checkClusterStatus() { } private ClusterStatusResponse getResponse() { - Map allHostStatus = heartbeatAgent.getHostsStatus(); + final Map allHostStatus = heartbeatAgent.getHostsStatus(); - Map response = allHostStatus + final Map response = allHostStatus .entrySet() .stream() .collect(Collectors.toMap( @@ -78,7 +78,7 @@ private ClusterStatusResponse getResponse() { return new ClusterStatusResponse(response); } - private Map getActiveStandbyInformation(HostInfo hostInfo) { + private Map getActiveStandbyInformation(final HostInfo hostInfo) { final List currentQueries = engine.getPersistentQueries(); if (currentQueries.isEmpty()) { // empty response diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index c76b9efb0704..abe53257677e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.GenericRow; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.PrintTopic; @@ -29,7 +30,6 @@ import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.TableRowsEntity; import io.confluent.ksql.rest.entity.Versions; -import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java index 896b6b67487e..fca411bac3ce 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java @@ -22,6 +22,7 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.rest.entity.ClusterStatusResponse; +import io.confluent.ksql.rest.entity.HostInfoEntity; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.serde.Format; @@ -29,7 +30,6 @@ import io.confluent.ksql.util.PageViewDataProvider; import java.util.concurrent.TimeUnit; import kafka.zookeeper.ZooKeeperClientException; -import org.apache.kafka.streams.state.HostInfo; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -45,8 +45,8 @@ public class HeartbeatAgentFunctionalTest { private static final String PAGE_VIEW_TOPIC = PAGE_VIEWS_PROVIDER.topicName(); private static final String PAGE_VIEW_STREAM = PAGE_VIEWS_PROVIDER.kstreamName(); - private static final HostInfo host0 = new HostInfo("localhost",8088); - private static final HostInfo host1 = new HostInfo("localhost",8089); + private static final HostInfoEntity host0 = new HostInfoEntity("localhost", 8088); + private static final HostInfoEntity host1 = new HostInfoEntity("localhost",8089); private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) @@ -112,8 +112,8 @@ public void shouldMarkServersAsUp() { REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: - assertThat(clusterStatusResponseUp.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); - assertThat(clusterStatusResponseUp.getClusterStatus().get(host1.toString()).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp.getClusterStatus().get(host0).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp.getClusterStatus().get(host1).getHostAlive(), is(true)); } @Test(timeout = 60000) @@ -126,8 +126,8 @@ public void shouldMarkRemoteServerAsDown() { REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // Then: - assertThat(clusterStatusResponse.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); - assertThat(clusterStatusResponse.getClusterStatus().get(host1.toString()).getHostAlive(), is(false)); + assertThat(clusterStatusResponse.getClusterStatus().get(host0).getHostAlive(), is(true)); + assertThat(clusterStatusResponse.getClusterStatus().get(host1).getHostAlive(), is(false)); } @Test(timeout = 60000) @@ -141,16 +141,16 @@ public void shouldMarkRemoteServerAsUpThenDownThenUp() { REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: - assertThat(clusterStatusResponseUp1.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); - assertThat(clusterStatusResponseUp1.getClusterStatus().get(host1.toString()).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp1.getClusterStatus().get(host0).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp1.getClusterStatus().get(host1).getHostAlive(), is(true)); // When: ClusterStatusResponse clusterStatusResponseDown = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // Then: - assertThat(clusterStatusResponseDown.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); - assertThat(clusterStatusResponseDown.getClusterStatus().get(host1.toString()).getHostAlive(), is(false)); + assertThat(clusterStatusResponseDown.getClusterStatus().get(host0).getHostAlive(), is(true)); + assertThat(clusterStatusResponseDown.getClusterStatus().get(host1).getHostAlive(), is(false)); // When : HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); @@ -158,7 +158,7 @@ public void shouldMarkRemoteServerAsUpThenDownThenUp() { REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: - assertThat(clusterStatusResponseUp2.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); - assertThat(clusterStatusResponseUp2.getClusterStatus().get(host1.toString()).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp2.getClusterStatus().get(host0).getHostAlive(), is(true)); + assertThat(clusterStatusResponseUp2.getClusterStatus().get(host1).getHostAlive(), is(true)); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java index aab6cd0430d1..dc928d33f211 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.function.BiFunction; -import org.apache.kafka.streams.state.HostInfo; class HighAvailabilityTestUtil { @@ -44,7 +43,7 @@ static void waitForClusterToBeDiscovered(final int numServers, final TestKsqlRes static boolean allServersDiscovered( final int numServers, - final Map clusterStatus) { + final Map clusterStatus) { if(clusterStatus.size() < numServers) { return false; @@ -54,7 +53,7 @@ static boolean allServersDiscovered( static void sendHeartbeartsEveryIntervalForWindowLength( final TestKsqlRestApp receiverApp, - final HostInfo sender, + final HostInfoEntity sender, final long interval, final long window) { long start = System.currentTimeMillis(); @@ -70,8 +69,8 @@ static void sendHeartbeartsEveryIntervalForWindowLength( static ClusterStatusResponse waitForRemoteServerToChangeStatus( final TestKsqlRestApp restApp, - final HostInfo remoteServer, - final BiFunction, Boolean> function) + final HostInfoEntity remoteServer, + final BiFunction, Boolean> function) { while (true) { final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp); @@ -87,13 +86,13 @@ static ClusterStatusResponse waitForRemoteServerToChangeStatus( } static boolean remoteServerIsDown( - final HostInfo remoteServer, - final Map clusterStatus) { - if (!clusterStatus.containsKey(remoteServer.toString())) { + final HostInfoEntity remoteServer, + final Map clusterStatus) { + if (!clusterStatus.containsKey(remoteServer)) { return true; } - for( Entry entry: clusterStatus.entrySet()) { - if (entry.getKey().contains(String.valueOf(remoteServer.port())) + for( Entry entry: clusterStatus.entrySet()) { + if (entry.getKey().getPort() == remoteServer.getPort() && !entry.getValue().getHostAlive()) { return true; } @@ -102,10 +101,10 @@ static boolean remoteServerIsDown( } static boolean remoteServerIsUp( - final HostInfo remoteServer, - final Map clusterStatus) { - for( Entry entry: clusterStatus.entrySet()) { - if (entry.getKey().contains(String.valueOf(remoteServer.port())) + final HostInfoEntity remoteServer, + final Map clusterStatus) { + for( Entry entry: clusterStatus.entrySet()) { + if (entry.getKey().getPort() == remoteServer.getPort() && entry.getValue().getHostAlive()) { return true; } @@ -115,11 +114,11 @@ static boolean remoteServerIsUp( static void sendHeartbeatRequest( final TestKsqlRestApp restApp, - final HostInfo host, + final HostInfoEntity hostInfoEntity, final long timestamp) { try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - restClient.makeAsyncHeartbeatRequest(new HostInfoEntity(host.host(), host.port()), timestamp); + restClient.makeAsyncHeartbeatRequest(hostInfoEntity, timestamp); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java index ff42d18504a4..d355baecd991 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -31,6 +31,8 @@ import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.ActiveStandbyEntity; import io.confluent.ksql.rest.entity.ActiveStandbyResponse; +import io.confluent.ksql.rest.entity.ClusterStatusResponse; +import io.confluent.ksql.rest.entity.HostInfoEntity; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.TestKsqlRestApp; @@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicLong; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.HostInfo; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -79,9 +80,9 @@ public class PullQueryRoutingFunctionalTest { throw new AssertionError("Failed to init TMP", e); } } - private static final HostInfo host0 = new HostInfo("localhost", 8088); - private static final HostInfo host1 = new HostInfo("localhost",8089); - private static final HostInfo host2 = new HostInfo("localhost",8087); + private static final HostInfoEntity host0 = new HostInfoEntity("localhost", 8088); + private static final HostInfoEntity host1 = new HostInfoEntity("localhost",8089); + private static final HostInfoEntity host2 = new HostInfoEntity("localhost",8087); private static final String USER_TOPIC = "user_topic"; private static final String USERS_STREAM = "users"; private static final UserDataProvider USER_PROVIDER = new UserDataProvider(); @@ -266,9 +267,10 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() { + " GROUP BY " + USER_PROVIDER.key() + ";" ); waitForTableRows(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, REST_APP_0); waitForStreamsMetadataToInitialize(); ClusterFormation clusterFormation = findClusterFormation(); - HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.router.right); + System.out.println("---------------> Cluster Formation: " + clusterFormation); HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( clusterFormation.router.right, clusterFormation.standBy.left, 100, 2000); HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( @@ -298,8 +300,10 @@ private static void makeAdminRequest(final String sql) { private void waitForStreamsMetadataToInitialize() { while (true) { - ActiveStandbyResponse response0 = makeActiveStandbyRequest(REST_APP_0); - if(response0.getPerQueryInfo().get(QUERY_ID) != null) { + ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(REST_APP_0); + if(clusterStatusResponse.getClusterStatus().get(host0).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null + && clusterStatusResponse.getClusterStatus().get(host1).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null + && clusterStatusResponse.getClusterStatus().get(host2).getPerQueryActiveStandbyEntity().get(QUERY_ID) != null) { break; } try { @@ -312,12 +316,15 @@ private void waitForStreamsMetadataToInitialize() { private static ClusterFormation findClusterFormation() { ClusterFormation clusterFormation = new ClusterFormation(); - ActiveStandbyResponse response0 = makeActiveStandbyRequest(REST_APP_0); - ActiveStandbyEntity entity0 = response0.getPerQueryInfo().get(QUERY_ID); - ActiveStandbyResponse response1 = makeActiveStandbyRequest(REST_APP_1); - ActiveStandbyEntity entity1 = response1.getPerQueryInfo().get(QUERY_ID); - ActiveStandbyResponse response2 = makeActiveStandbyRequest(REST_APP_2); - ActiveStandbyEntity entity2 = response2.getPerQueryInfo().get(QUERY_ID); + ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(REST_APP_0); + ActiveStandbyEntity entity0 = clusterStatusResponse.getClusterStatus().get(host0) + .getPerQueryActiveStandbyEntity().get(QUERY_ID); + ActiveStandbyEntity entity1 = clusterStatusResponse.getClusterStatus().get(host1) + .getPerQueryActiveStandbyEntity().get(QUERY_ID); + ActiveStandbyEntity entity2 = clusterStatusResponse.getClusterStatus().get(host2) + .getPerQueryActiveStandbyEntity().get(QUERY_ID); + + System.out.println(" --------> ClusterStatusResponse = " + clusterStatusResponse); // find active if(!entity0.getActiveStores().isEmpty() && !entity0.getActivePartitions().isEmpty()) { @@ -367,24 +374,32 @@ private static ActiveStandbyResponse makeActiveStandbyRequest(final TestKsqlRest } static class ClusterFormation { - Pair active; - Pair standBy; - Pair router; + Pair active; + Pair standBy; + Pair router; ClusterFormation() { } - public void setActive(final Pair active) { + public void setActive(final Pair active) { this.active = active; } - public void setStandBy(final Pair standBy) { + public void setStandBy(final Pair standBy) { this.standBy = standBy; } - public void setRouter(final Pair router) { + public void setRouter(final Pair router) { this.router = router; } + + public String toString() { + return new StringBuilder() + .append("Active = ").append(active.left) + .append(", Standby = ").append(standBy.left) + .append(", Router = ").append(router.left) + .toString(); + } } private void waitForTableRows() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index c20f55fde6bb..371d59b81f90 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -33,7 +33,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; -import java.util.Optional; +import java.util.Collections; import org.eclipse.jetty.http.HttpStatus.Code; import org.junit.Rule; import org.junit.Test; @@ -73,8 +73,7 @@ public void shouldThrowExceptionIfConfigDisabled() { query, engine.getEngine(), engine.getServiceContext(), - false, - Optional.empty() + Collections.emptyList() ); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 322bc501fae6..192eb3c0376c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest.server.resources.streaming; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; @@ -37,7 +36,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Collection; -import java.util.Optional; +import java.util.Collections; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -77,11 +76,10 @@ public class PullQueryPublisherTest { @Before public void setUp() { - publisher = new PullQueryPublisher(engine, serviceContext, statement, false, - Optional.empty(), pullQueryExecutor); + publisher = new PullQueryPublisher(engine, serviceContext, statement, + Collections.emptyList(), pullQueryExecutor); - when(pullQueryExecutor.execute(any(), any(), any(), eq(false), eq(Optional.empty()))) - .thenReturn(entity); + when(pullQueryExecutor.execute(any(), any(), any(), any())).thenReturn(entity); when(entity.getSchema()).thenReturn(SCHEMA); @@ -106,7 +104,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, Optional.empty()); + verify(pullQueryExecutor).execute(statement, engine, serviceContext, Collections.emptyList()); } @Test @@ -119,7 +117,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, Optional.empty()); + verify(pullQueryExecutor).execute(statement, engine, serviceContext, Collections.emptyList()); } @Test @@ -155,7 +153,7 @@ public void shouldCallOnErrorOnFailure() { givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any(), any(), eq(false), eq(Optional.empty()))).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any(), any())).thenThrow(e); // When: subscription.request(1); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 004d5c06ab8e..f3f5b36cfe79 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -178,9 +178,7 @@ public void setup() { activenessRegistrar, Optional.of(authorizationValidator), errorsHandler, - false, - false, - Optional.empty() + Collections.emptyList() ); testResource.configure(VALID_CONFIG); @@ -207,9 +205,7 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { activenessRegistrar, Optional.of(authorizationValidator), errorsHandler, - false, - false, - Optional.empty() + Collections.emptyList() ); // Then: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index c2ff2a53fc11..3b0efeb33d09 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -209,8 +209,7 @@ public void setUp() { defaultServiceContextProvider, serverState, schemaRegistryClientSupplier, - false, - Optional.empty() + Collections.emptyList() ); } @@ -443,8 +442,7 @@ public void shouldHandlePullQuery() { eq(exec), eq(configuredStatement), any(), - eq(false), - eq(Optional.empty())); + any()); } @Test diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java index 9f3695c35c05..18d4ed36c033 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java @@ -82,4 +82,14 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(activeStores, standByStores, activePartitions, standByPartitions); } + + @Override + public String toString() { + return new StringBuilder() + .append("Active stores = ").append(activeStores) + .append(", Active partitions = ").append(activePartitions) + .append(", Standby stores = ").append(standByStores) + .append(", Standby partitions = ").append(standByPartitions) + .toString(); + } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java index cf500b54489f..d7a650b092a0 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java @@ -59,4 +59,9 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(clusterStatus); } + + @Override + public String toString() { + return "ClusterStatus = " + clusterStatus; + } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java index 8f36055e9a9b..9aff60d12f4e 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java @@ -60,7 +60,8 @@ public void setLastStatusUpdateMs(final long lastStatusUpdateMs) { this.lastStatusUpdateMs = lastStatusUpdateMs; } - public void setPerQueryActiveStandbyEntity(final Map perQueryActiveStandbyEntity) { + public void setPerQueryActiveStandbyEntity( + final Map perQueryActiveStandbyEntity) { this.perQueryActiveStandbyEntity = perQueryActiveStandbyEntity; } @@ -78,7 +79,6 @@ public boolean equals(final Object o) { return hostAlive == that.hostAlive && lastStatusUpdateMs == that.lastStatusUpdateMs && perQueryActiveStandbyEntity.equals(that.perQueryActiveStandbyEntity); - } @Override diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java index bc47136ecc09..c69e178181e6 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java @@ -53,18 +53,5 @@ interface KsqlNode { * @return The base URI of the node, including protocol, host and port. */ URI location(); - - /** - * If heartbeat is not enabled, the value is ignored. - * @return {@code true} if this is node is alive as determined by the heartbeat mechanism. - * - */ - boolean isAlive(); - - /** - * Specify liveness status of node as determined by heartbeat mechanism. - * @param alive Whether the node is alive or not - */ - void setIsAlive(boolean alive); } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index 4b8f0255bcf6..2a8adbe57aee 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -79,7 +79,7 @@ public List locate( LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); - List hosts = new ArrayList<>(); + final List hosts = new ArrayList<>(); hosts.add(activeHost); hosts.addAll(standByHosts); @@ -87,17 +87,18 @@ public List locate( // The list is ordered by routing preference: active node is first, then standby nodes // in order of increasing lag. // If heartbeat is not enabled, all hosts are considered alive. - List filteredHosts = new ArrayList<>(); - routingFilters - .forEach(routingFilter -> hosts - .stream() - .filter(hostInfo -> routingFilter + final List filteredHosts = new ArrayList<>(); + for (IRoutingFilter routingFilter : routingFilters) { + filteredHosts.addAll( + hosts + .stream() + .filter(hostInfo -> routingFilter .filter(hostInfo, stateStoreName, metadata.getPartition())) - .map(this::asNode) - .collect(Collectors.toList()) - .addAll(filteredHosts)); + .map(this::asNode) + .collect(Collectors.toList())); + } - LOG.debug("Filtered and ordered hosts: {}", filteredHosts); + LOG.info("Filtered and ordered hosts: {}", filteredHosts); return filteredHosts; } @@ -137,12 +138,10 @@ private static final class Node implements KsqlNode { private final boolean local; private final URI location; - private boolean isAlive; private Node(final boolean local, final URI location) { this.local = local; this.location = requireNonNull(location, "location"); - this.isAlive = false; } @Override @@ -155,22 +154,12 @@ public URI location() { return location; } - public boolean isAlive() { - return isAlive; - } - - public void setIsAlive(final boolean alive) { - isAlive = alive; - } - @Override public String toString() { return new StringBuilder() .append(String.format("local = %s ,", local)) .append(String.format("location = %s ,", location)) - .append(String.format("alive = %s .", isAlive)) .toString(); - } @Override @@ -185,13 +174,12 @@ public boolean equals(final Object o) { final Node that = (Node) o; return local == that.local - && location.equals(that.location) - && isAlive == that.isAlive; + && location.equals(that.location); } @Override public int hashCode() { - return Objects.hash(local, location, isAlive); + return Objects.hash(local, location); } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java index b813c9a03272..f540db388bab 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java @@ -91,11 +91,8 @@ public void setUp() { when(standByHostInfo2.port()).thenReturn(5678); activeNode = locator.asNode(activeHostInfo); - activeNode.setIsAlive(true); standByNode1 = locator.asNode(standByHostInfo1); - standByNode1.setIsAlive(true); standByNode2 = locator.asNode(standByHostInfo2); - standByNode2.setIsAlive(true); hostsStatus = Optional.of(ImmutableMap.of( activeHostInfo, new HostStatusEntity( @@ -245,8 +242,6 @@ public void shouldReturnRemoteOwnerForDifferentPortOnLocalHost() { public void shouldReturnActiveAndStandBysWhenHeartBeatNotEnabled() { // Given: getActiveAndStandbyMetadata(); - standByNode1.setIsAlive(false); - standByNode2.setIsAlive(false); when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(true); when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true);