Skip to content

Commit

Permalink
fixed broken test after kafka merge
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Jan 27, 2020
1 parent a60e2fa commit 29d8057
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ void makeAsyncHeartbeatRequest(
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);

/**
* Send a request to remote Ksql server to inquire about the state stores it is active and
* standby.
* Send a request to remote Ksql server to inquire to inquire about which state stores the
* remote server maintains as an active and standby.
* @param serverEndPoint the remote destination.
* @return response containing the state stores for which the remote host is active and standby.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor;
import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder;
import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter;
import io.confluent.ksql.rest.server.resources.ActiveStandbyResource;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
import io.confluent.ksql.rest.server.resources.HeartbeatResource;
Expand Down Expand Up @@ -161,7 +160,6 @@ public final class KsqlRestApplication extends ExecutableApplication<KsqlRestCon
private final List<KsqlConfigurable> configurables;
private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;
private final Optional<HeartbeatAgent> heartbeatAgent;
private final ActiveStandbyResource activeStandbyResource;

// Cannot be set in constructor, depends on parent server start
private KsqlConfig ksqlConfigWithPort;
Expand Down Expand Up @@ -193,8 +191,7 @@ public static SourceName getCommandsStreamName() {
final List<KsqlServerPrecondition> preconditions,
final List<KsqlConfigurable> configurables,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final Optional<HeartbeatAgent> heartbeatAgent,
final ActiveStandbyResource activeStandbyResource
final Optional<HeartbeatAgent> heartbeatAgent
) {
super(restConfig);

Expand All @@ -219,7 +216,6 @@ public static SourceName getCommandsStreamName() {
this.rocksDBConfigSetterHandler =
requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.activeStandbyResource = requireNonNull(activeStandbyResource, "activeStandbyResource");
}

@Override
Expand All @@ -235,7 +231,6 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap
config.register(new HeartbeatResource(heartbeatAgent.get()));
config.register(new ClusterStatusResource(ksqlEngine, heartbeatAgent.get()));
}
config.register(activeStandbyResource);
config.register(new KsqlExceptionMapper());
config.register(new ServerStateDynamicBinding(serverState));
}
Expand All @@ -258,8 +253,6 @@ public void startAsync() {
.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG));
heartbeatAgent.get().startAgent();
}
activeStandbyResource.setLocalHostInfo((String)ksqlConfigWithPort
.getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG));
displayWelcomeMessage();
}

Expand Down Expand Up @@ -618,8 +611,6 @@ static KsqlRestApplication buildApplication(
errorHandler
);

final ActiveStandbyResource activeStandbyResource = new ActiveStandbyResource(ksqlEngine);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopicName);
if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) {
Expand Down Expand Up @@ -671,8 +662,7 @@ static KsqlRestApplication buildApplication(
preconditions,
configurables,
rocksDBConfigSetterHandler,
heartbeatAgent,
activeStandbyResource
heartbeatAgent
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public boolean filter(final HostInfo hostInfo, final String storeName, final int
if (!hostStatus.containsKey(hostInfo)) {
return true;
}
System.out.println("-------------> Host " + hostInfo
+ " is alive " + hostStatus.get(hostInfo).isHostAlive());
return hostStatus.get(hostInfo).isHostAlive();
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,55 +196,55 @@ public static TableRowsEntity execute(
}
}

@VisibleForTesting
static TableRowsEntity handlePullQuery(
private static TableRowsEntity handlePullQuery(
final ConfiguredStatement<Query> statement,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext,
final PullQueryContext pullQueryContext,
final List<IRoutingFilter> routingFilters
) {
// Get active and standby nodes for this key
final Locator locator = pullQueryContext.mat.locator();
final List<KsqlNode> filteredAndOrderedNodes = locator.locate(
pullQueryContext.rowKey, routingFilters);
try {
// Get active and standby nodes for this key
final Locator locator = pullQueryContext.mat.locator();
final List<KsqlNode> filteredAndOrderedNodes = locator.locate(
pullQueryContext.rowKey, routingFilters);

if (filteredAndOrderedNodes.isEmpty()) {
throw new MaterializationException(String.format(
"Unable to execute pull query : %s. Streams Metadata not initialized",
statement.getStatementText()));
}
if (filteredAndOrderedNodes.isEmpty()) {
throw new MaterializationException("All nodes are dead or exceed max allowed lag.");
}

if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) {
System.out.println("---------> Query standby enabled");
for (KsqlNode node : filteredAndOrderedNodes) {
try {
return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext);
} catch (Throwable t) {
LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t);
if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) {
// Nodes are ordered by preference: active is first if alive then standby nodes in
// increasing order of lag.
for (KsqlNode node : filteredAndOrderedNodes) {
try {
return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext);
} catch (Throwable t) {
LOG.info("Error routing query {} to host {} ", statement.getStatementText(), node, t);
}
}
}
} else {
// Only active handles the query
// Only active handles the query.
// Fail fast if active is dead: Let client handle retries.
return routeQuery(
filteredAndOrderedNodes.get(0), statement, executionContext, serviceContext,
pullQueryContext);

} catch (Throwable t) {
throw new MaterializationException(String.format(
"Unable to execute pull query: %s. Error: %s", statement.getStatementText(),
t.getMessage()), t);
}
throw new MaterializationException(
"Unable to execute pull query :" + statement.getStatementText());
}

@VisibleForTesting
static TableRowsEntity routeQuery(
private static TableRowsEntity routeQuery(
final KsqlNode node,
final ConfiguredStatement<Query> statement,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext,
final PullQueryContext pullQueryContext
) {
try {
System.out.println("---------> Route Query to host " + node);
if (node.isLocal()) {
LOG.info("Query {} executed locally at host {}.",
statement.getStatementText(), node.location());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void shouldMarkServersAsUp() {
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown);

// When:
HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000);
HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000);
final ClusterStatusResponse clusterStatusResponseUp = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus(
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp);

Expand All @@ -134,7 +134,7 @@ public void shouldMarkRemoteServerAsDown() {
public void shouldMarkRemoteServerAsUpThenDownThenUp() {
// Given:
HighAvailabilityTestUtil.waitForClusterToBeDiscovered(2, REST_APP_0);
HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000);
HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000);

// When:
final ClusterStatusResponse clusterStatusResponseUp1 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus(
Expand All @@ -153,7 +153,7 @@ public void shouldMarkRemoteServerAsUpThenDownThenUp() {
assertThat(clusterStatusResponseDown.getClusterStatus().get(host1).getHostAlive(), is(false));

// When :
HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000);
HighAvailabilityTestUtil.sendHeartbeartsForWindowLength(REST_APP_0, host1, 3000);
ClusterStatusResponse clusterStatusResponseUp2 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus(
REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp);

Expand Down
Loading

0 comments on commit 29d8057

Please sign in to comment.