Skip to content

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Jan 25, 2020
1 parent 80f4945 commit a60e2fa
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public class KsqlConfig extends AbstractConfig {
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;

// TODO shall we remove this config?
// Shall we remove this config?
public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.routing.timeout.ms";
public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ public long getLastStatusUpdateMs() {
public boolean isHostAlive() {
return hostAlive;
}

@Override
public String toString() {
return hostAlive + "," + lastStatusUpdateMs;
}
}

public static class HeartbeatInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ static KsqlRestApplication buildApplication(
final Optional<HeartbeatAgent> heartbeatAgent =
initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext);

final List<IRoutingFilter> routingFilters = ImmutableList.of(new LivenessFilter(heartbeatAgent));
final List<IRoutingFilter> routingFilters = ImmutableList.of(
new LivenessFilter(heartbeatAgent));

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static java.util.Objects.requireNonNull;

import io.confluent.ksql.execution.streams.IRoutingFilter;
import io.confluent.ksql.rest.entity.HostStatusEntity;
import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatus;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.streams.state.HostInfo;
Expand All @@ -27,18 +27,20 @@ public class LivenessFilter implements IRoutingFilter {

private final Optional<HeartbeatAgent> heartbeatAgent;

public LivenessFilter(Optional<HeartbeatAgent> heartbeatAgent) {
public LivenessFilter(final Optional<HeartbeatAgent> heartbeatAgent) {
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
}

@Override
public boolean filter(final HostInfo hostInfo, String storeName, int partition) {
public boolean filter(final HostInfo hostInfo, final String storeName, final int partition) {
if (heartbeatAgent.isPresent()) {
Map<String, HostStatusEntity> hostStatus = heartbeatAgent.get().getHostsStatus();
if (!hostStatus.containsKey(hostInfo.toString())) {
final Map<HostInfo, HostStatus> hostStatus = heartbeatAgent.get().getHostsStatus();
if (!hostStatus.containsKey(hostInfo)) {
return true;
}
return hostStatus.get(hostInfo.toString()).getHostAlive();
System.out.println("-------------> Host " + hostInfo
+ " is alive " + hostStatus.get(hostInfo).isHostAlive());
return hostStatus.get(hostInfo).isHostAlive();
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,12 @@ static TableRowsEntity handlePullQuery(
}

if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) {
System.out.println("---------> Query standby enabled");
for (KsqlNode node : filteredAndOrderedNodes) {
try {
return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext);
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error routing query {} to host {} ",
statement.getStatementText(), node, t);
}
LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t);
}
}
} else {
Expand All @@ -246,22 +244,20 @@ static TableRowsEntity routeQuery(
final PullQueryContext pullQueryContext
) {
try {
System.out.println("---------> Route Query to host " + node);
if (node.isLocal()) {
LOG.debug("Query {} executed locally at host {}.",
statement.getStatementText(), node.location());
LOG.info("Query {} executed locally at host {}.",
statement.getStatementText(), node.location());
return queryRowsLocally(
statement,
executionContext,
pullQueryContext);
} else {
LOG.debug("Query {} routed to host {}.",
statement.getStatementText(), node.location());
LOG.info("Query {} routed to host {}.", statement.getStatementText(), node.location());
return forwardTo(node, statement, serviceContext);
}
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error routing query " + statement.getStatementText() + " to " + node, t);
}
LOG.info("Error routing query " + statement.getStatementText() + " to " + node, t);
}
throw new MaterializationException(
"Unable to execute pull query :" + statement.getStatementText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public Response checkClusterStatus() {
}

private ClusterStatusResponse getResponse() {
Map<HostInfo, HostStatus> allHostStatus = heartbeatAgent.getHostsStatus();
final Map<HostInfo, HostStatus> allHostStatus = heartbeatAgent.getHostsStatus();

Map<HostInfoEntity, HostStatusEntity> response = allHostStatus
final Map<HostInfoEntity, HostStatusEntity> response = allHostStatus
.entrySet()
.stream()
.collect(Collectors.toMap(
Expand All @@ -78,7 +78,7 @@ private ClusterStatusResponse getResponse() {
return new ClusterStatusResponse(response);
}

private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(HostInfo hostInfo) {
private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(final HostInfo hostInfo) {
final List<PersistentQueryMetadata> currentQueries = engine.getPersistentQueries();
if (currentQueries.isEmpty()) {
// empty response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.execution.streams.IRoutingFilter;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
Expand All @@ -29,7 +30,6 @@
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.entity.TableRowsEntity;
import io.confluent.ksql.rest.entity.Versions;
import io.confluent.ksql.execution.streams.IRoutingFilter;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.HostInfoEntity;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.test.util.secure.ClientTrustStore;
import io.confluent.ksql.util.PageViewDataProvider;
import java.util.concurrent.TimeUnit;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.streams.state.HostInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -45,8 +45,8 @@ public class HeartbeatAgentFunctionalTest {
private static final String PAGE_VIEW_TOPIC = PAGE_VIEWS_PROVIDER.topicName();
private static final String PAGE_VIEW_STREAM = PAGE_VIEWS_PROVIDER.kstreamName();

private static final HostInfo host0 = new HostInfo("localhost",8088);
private static final HostInfo host1 = new HostInfo("localhost",8089);
private static final HostInfoEntity host0 = new HostInfoEntity("localhost", 8088);
private static final HostInfoEntity host1 = new HostInfoEntity("localhost",8089);
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
Expand Down Expand Up @@ -112,8 +112,8 @@ public void shouldMarkServersAsUp() {
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp);

// Then:
assertThat(clusterStatusResponseUp.getClusterStatus().get(host0.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp.getClusterStatus().get(host1.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp.getClusterStatus().get(host0).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp.getClusterStatus().get(host1).getHostAlive(), is(true));
}

@Test(timeout = 60000)
Expand All @@ -126,8 +126,8 @@ public void shouldMarkRemoteServerAsDown() {
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown);

// Then:
assertThat(clusterStatusResponse.getClusterStatus().get(host0.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponse.getClusterStatus().get(host1.toString()).getHostAlive(), is(false));
assertThat(clusterStatusResponse.getClusterStatus().get(host0).getHostAlive(), is(true));
assertThat(clusterStatusResponse.getClusterStatus().get(host1).getHostAlive(), is(false));
}

@Test(timeout = 60000)
Expand All @@ -141,24 +141,24 @@ public void shouldMarkRemoteServerAsUpThenDownThenUp() {
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp);

// Then:
assertThat(clusterStatusResponseUp1.getClusterStatus().get(host0.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp1.getClusterStatus().get(host1.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp1.getClusterStatus().get(host0).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp1.getClusterStatus().get(host1).getHostAlive(), is(true));

// When:
ClusterStatusResponse clusterStatusResponseDown = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus(
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown);

// Then:
assertThat(clusterStatusResponseDown.getClusterStatus().get(host0.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseDown.getClusterStatus().get(host1.toString()).getHostAlive(), is(false));
assertThat(clusterStatusResponseDown.getClusterStatus().get(host0).getHostAlive(), is(true));
assertThat(clusterStatusResponseDown.getClusterStatus().get(host1).getHostAlive(), is(false));

// When :
HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000);
ClusterStatusResponse clusterStatusResponseUp2 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus(
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp);

// Then:
assertThat(clusterStatusResponseUp2.getClusterStatus().get(host0.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp2.getClusterStatus().get(host1.toString()).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp2.getClusterStatus().get(host0).getHostAlive(), is(true));
assertThat(clusterStatusResponseUp2.getClusterStatus().get(host1).getHostAlive(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.BiFunction;
import org.apache.kafka.streams.state.HostInfo;

class HighAvailabilityTestUtil {

Expand All @@ -44,7 +43,7 @@ static void waitForClusterToBeDiscovered(final int numServers, final TestKsqlRes

static boolean allServersDiscovered(
final int numServers,
final Map<String, HostStatusEntity> clusterStatus) {
final Map<HostInfoEntity, HostStatusEntity> clusterStatus) {

if(clusterStatus.size() < numServers) {
return false;
Expand All @@ -54,7 +53,7 @@ static boolean allServersDiscovered(

static void sendHeartbeartsEveryIntervalForWindowLength(
final TestKsqlRestApp receiverApp,
final HostInfo sender,
final HostInfoEntity sender,
final long interval,
final long window) {
long start = System.currentTimeMillis();
Expand All @@ -70,8 +69,8 @@ static void sendHeartbeartsEveryIntervalForWindowLength(

static ClusterStatusResponse waitForRemoteServerToChangeStatus(
final TestKsqlRestApp restApp,
final HostInfo remoteServer,
final BiFunction<HostInfo, Map<String, HostStatusEntity>, Boolean> function)
final HostInfoEntity remoteServer,
final BiFunction<HostInfoEntity, Map<HostInfoEntity, HostStatusEntity>, Boolean> function)
{
while (true) {
final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp);
Expand All @@ -87,13 +86,13 @@ static ClusterStatusResponse waitForRemoteServerToChangeStatus(
}

static boolean remoteServerIsDown(
final HostInfo remoteServer,
final Map<String, HostStatusEntity> clusterStatus) {
if (!clusterStatus.containsKey(remoteServer.toString())) {
final HostInfoEntity remoteServer,
final Map<HostInfoEntity, HostStatusEntity> clusterStatus) {
if (!clusterStatus.containsKey(remoteServer)) {
return true;
}
for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) {
if (entry.getKey().contains(String.valueOf(remoteServer.port()))
for( Entry<HostInfoEntity, HostStatusEntity> entry: clusterStatus.entrySet()) {
if (entry.getKey().getPort() == remoteServer.getPort()
&& !entry.getValue().getHostAlive()) {
return true;
}
Expand All @@ -102,10 +101,10 @@ static boolean remoteServerIsDown(
}

static boolean remoteServerIsUp(
final HostInfo remoteServer,
final Map<String, HostStatusEntity> clusterStatus) {
for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) {
if (entry.getKey().contains(String.valueOf(remoteServer.port()))
final HostInfoEntity remoteServer,
final Map<HostInfoEntity, HostStatusEntity> clusterStatus) {
for( Entry<HostInfoEntity, HostStatusEntity> entry: clusterStatus.entrySet()) {
if (entry.getKey().getPort() == remoteServer.getPort()
&& entry.getValue().getHostAlive()) {
return true;
}
Expand All @@ -115,11 +114,11 @@ static boolean remoteServerIsUp(

static void sendHeartbeatRequest(
final TestKsqlRestApp restApp,
final HostInfo host,
final HostInfoEntity hostInfoEntity,
final long timestamp) {

try (final KsqlRestClient restClient = restApp.buildKsqlClient()) {
restClient.makeAsyncHeartbeatRequest(new HostInfoEntity(host.host(), host.port()), timestamp);
restClient.makeAsyncHeartbeatRequest(hostInfoEntity, timestamp);
}
}

Expand Down
Loading

0 comments on commit a60e2fa

Please sign in to comment.