diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index bacc45658bd7..a8223060e913 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -165,12 +165,25 @@ public class KsqlConfig extends AbstractConfig { "Config to enable or disable transient pull queries on a specific KSQL server."; public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true; + // TODO shall we remove this config? public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG = "ksql.query.pull.routing.timeout.ms"; public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L; public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC = "Timeout in milliseconds " + "when waiting for the lookup of the owner of a row key"; + + public static final String KSQL_QUERY_PULL_ALLOW_STALE_READS = + "ksql.query.pull.allow.stale.reads"; + private static final String KSQL_QUERY_PULL_ALLOW_STALE_READS_DOC = + "Config to enable/disable forwarding pull queries to standby hosts when the active is dead. " + + "Effectively, the accuracy of pull queries is sacrificed for higher availability. " + + "Possible values are \"true\", \"false\". Setting to \"true\" guarantees high " + + "availability for pull queries. If set to \"false\", pull queries will fail when" + + "the active is dead and until a new active is elected. Default value is \"false\". "; + public static final boolean KSQL_QUERY_PULL_ALLOW_STALE_READS_DEFAULT = false; + + public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG = "ksql.query.pull.streamsstore.rebalancing.timeout.ms"; public static final Long KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT = 10000L; @@ -490,6 +503,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT, Importance.LOW, KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC + ).define( + KSQL_QUERY_PULL_ALLOW_STALE_READS, + Type.BOOLEAN, + KSQL_QUERY_PULL_ALLOW_STALE_READS_DEFAULT, + Importance.MEDIUM, + KSQL_QUERY_PULL_ALLOW_STALE_READS_DOC ).define( KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java index 634c3cff98ea..c8da6f847479 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java @@ -16,6 +16,7 @@ package io.confluent.ksql.services; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.StreamedRow; @@ -61,5 +62,10 @@ public void makeAsyncHeartbeatRequest( public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(final URI serverEndPoint) { throw new UnsupportedOperationException("KSQL client is disabled"); } + + @Override + public RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(final URI serverEndPoint) { + throw new UnsupportedOperationException("KSQL client is disabled"); + } } diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java index 3df0bbcbf000..8cf13779f54a 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java @@ -16,6 +16,7 @@ package io.confluent.ksql.services; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.StreamedRow; @@ -55,4 +56,12 @@ void makeAsyncHeartbeatRequest( * @return response containing the cluster status. */ RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint); + + /** + * Send a request to remote Ksql server to inquire about the state stores it is active and + * standby. + * @param serverEndPoint the remote destination. + * @return response containing the state stores for which the remote host is active and standby. + */ + RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(URI serverEndPoint); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java index 4255bfb12335..7c026483421c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java @@ -16,19 +16,13 @@ package io.confluent.ksql.rest.server; import static java.util.Objects.requireNonNull; -import static org.apache.kafka.common.utils.Utils.getHost; -import static org.apache.kafka.common.utils.Utils.getPort; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractScheduledService; import com.google.common.util.concurrent.ServiceManager; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.rest.entity.HostInfoEntity; -import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.services.ServiceContext; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; -import io.confluent.ksql.util.QueryMetadata; import java.net.URI; import java.net.URL; import java.time.Clock; @@ -47,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.StreamsMetadata; import org.slf4j.Logger; @@ -81,13 +74,12 @@ public final class HeartbeatAgent { private final KsqlEngine engine; private final ServiceContext serviceContext; private final HeartbeatConfig config; - private final ConcurrentHashMap<String, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats; - private final ConcurrentHashMap<String, HostStatusEntity> hostsStatus; + private final ConcurrentHashMap<HostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats; + private final ConcurrentHashMap<HostInfo, HostStatus> hostsStatus; private final ScheduledExecutorService scheduledExecutorService; private final ServiceManager serviceManager; private final Clock clock; private HostInfo localHostInfo; - private String localHostString; private URL localURL; public static HeartbeatAgent.Builder builder() { @@ -115,11 +107,10 @@ private HeartbeatAgent(final KsqlEngine engine, * @param timestamp The timestamp the heartbeat was sent. */ public void receiveHeartbeat(final HostInfo hostInfo, final long timestamp) { - final String hostKey = hostInfo.toString(); final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.computeIfAbsent( - hostKey, key -> new TreeMap<>()); + hostInfo, key -> new TreeMap<>()); synchronized (heartbeats) { - LOG.debug("Receive heartbeat at: {} from host: {} ", timestamp, hostKey); + LOG.debug("Receive heartbeat at: {} from host: {} ", timestamp, hostInfo); heartbeats.put(timestamp, new HeartbeatInfo(timestamp)); } } @@ -128,12 +119,12 @@ public void receiveHeartbeat(final HostInfo hostInfo, final long timestamp) { * Returns the current view of the cluster containing all hosts discovered (whether alive or dead) * @return status of discovered hosts */ - public Map<String, HostStatusEntity> getHostsStatus() { + public Map<HostInfo, HostStatus> getHostsStatus() { return Collections.unmodifiableMap(hostsStatus); } @VisibleForTesting - void setHostsStatus(final Map<String, HostStatusEntity> status) { + void setHostsStatus(final Map<HostInfo, HostStatus> status) { hostsStatus.putAll(status); } @@ -157,8 +148,7 @@ void stopAgent() { void setLocalAddress(final String applicationServer) { - this.localHostInfo = parseHostInfo(applicationServer); - this.localHostString = localHostInfo.toString(); + this.localHostInfo = ServerUtil.parseHostInfo(applicationServer); try { this.localURL = new URL(applicationServer); } catch (final Exception e) { @@ -166,28 +156,9 @@ void setLocalAddress(final String applicationServer) { + " remoteInfo: " + localHostInfo.host() + ":" + localHostInfo.host()); } - this.hostsStatus.putIfAbsent(localHostString, new HostStatusEntity( - new HostInfoEntity(localHostInfo.host(), localHostInfo.port()), - true, - clock.millis())); + this.hostsStatus.putIfAbsent(localHostInfo, new HostStatus(true, clock.millis())); } - private static HostInfo parseHostInfo(final String endPoint) { - if (endPoint == null || endPoint.trim().isEmpty()) { - return StreamsMetadataState.UNKNOWN_HOST; - } - final String host = getHost(endPoint); - final Integer port = getPort(endPoint); - - if (host == null || port == null) { - throw new KsqlException(String.format( - "Error parsing host address %s. Expected format host:port.", endPoint)); - } - - return new HostInfo(host, port); - } - - /** * Check the heartbeats received from remote hosts and apply policy to determine whether a host * is alive or not. @@ -236,39 +207,41 @@ private void processHeartbeats(final long windowStart, final long windowEnd) { // No heartbeats received -> mark all hosts as dead if (receivedHeartbeats.isEmpty()) { hostsStatus.forEach((host, status) -> { - if (!host.equals(localHostString)) { + if (!host.equals(localHostInfo)) { status.setHostAlive(false); } }); } - for (String host: hostsStatus.keySet()) { - if (host.equals(localHostString)) { + for (HostInfo hostInfo: hostsStatus.keySet()) { + if (hostInfo.equals(localHostInfo)) { continue; } - final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(host); + final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(hostInfo); //For previously discovered hosts, if they have not received any heartbeats, mark them dead if (heartbeats == null || heartbeats.isEmpty()) { - hostsStatus.get(host).setHostAlive(false); + hostsStatus.get(hostInfo).setHostAlive(false); } else { final TreeMap<Long, HeartbeatInfo> copy; synchronized (heartbeats) { - LOG.debug("Process heartbeats: {} of host: {}", heartbeats, host); + LOG.debug("Process heartbeats: {} of host: {}", heartbeats, hostInfo); // 1. remove heartbeats older than window heartbeats.headMap(windowStart).clear(); copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true)); } // 2. count consecutive missed heartbeats and mark as alive or dead - final boolean isAlive = decideStatus(host, windowStart, windowEnd, copy); - final HostStatusEntity status = hostsStatus.get(host); + final boolean isAlive = decideStatus(hostInfo, windowStart, windowEnd, copy); + final HostStatus status = hostsStatus.get(hostInfo); status.setHostAlive(isAlive); status.setLastStatusUpdateMs(windowEnd); } } } - private boolean decideStatus(final String host, final long windowStart, final long windowEnd, - final TreeMap<Long, HeartbeatInfo> heartbeats) { + private boolean decideStatus( + final HostInfo hostInfo, final long windowStart, final long windowEnd, + final TreeMap<Long, HeartbeatInfo> heartbeats + ) { long missedCount = 0; long prev = windowStart; // No heartbeat received in this window @@ -297,7 +270,7 @@ private boolean decideStatus(final String host, final long windowStart, final lo missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs; } - LOG.debug("Host: {} has {} missing heartbeats", host, missedCount); + LOG.debug("Host: {} has {} missing heartbeats", hostInfo, missedCount); return (missedCount < config.heartbeatMissedThreshold); } } @@ -314,22 +287,20 @@ class SendHeartbeatService extends AbstractScheduledService { @Override protected void runOneIteration() { - for (Entry<String, HostStatusEntity> hostStatusEntry: hostsStatus.entrySet()) { - final String host = hostStatusEntry.getKey(); - final HostStatusEntity status = hostStatusEntry.getValue(); + for (Entry<HostInfo, HostStatus> hostStatusEntry: hostsStatus.entrySet()) { + final HostInfo remoteHostInfo = hostStatusEntry.getKey(); + final HostStatus status = hostStatusEntry.getValue(); try { - if (!host.equals(localHostString)) { - final URI remoteUri = buildLocation(localURL, status.getHostInfoEntity().getHost(), - status.getHostInfoEntity().getPort()); - LOG.debug("Send heartbeat to host {} at {}", status.getHostInfoEntity().getHost(), - clock.millis()); + if (!remoteHostInfo.equals(localHostInfo)) { + final URI remoteUri = buildLocation( + localURL, remoteHostInfo.host(), remoteHostInfo.port()); + LOG.debug("Send heartbeat to host {} at {}", remoteHostInfo, clock.millis()); serviceContext.getKsqlClient().makeAsyncHeartbeatRequest(remoteUri, localHostInfo, clock.millis()); } } catch (Throwable t) { - LOG.error("Request to server: " + status.getHostInfoEntity().getHost() + ":" - + status.getHostInfoEntity().getPort() - + " failed with exception: " + t.getMessage(), t); + LOG.error("Request to server: " + remoteHostInfo + " failed with exception: " + + t.getMessage(), t); } } } @@ -371,9 +342,10 @@ protected void runOneIteration() { } final Set<HostInfo> uniqueHosts = currentQueries.stream() - .map(queryMetadata -> ((QueryMetadata) queryMetadata).getAllMetadata()) + .map(queryMetadata -> queryMetadata.getAllMetadata()) .filter(Objects::nonNull) .flatMap(Collection::stream) + .filter(streamsMetadata -> streamsMetadata != StreamsMetadata.NOT_AVAILABLE) .map(StreamsMetadata::hostInfo) .filter(hostInfo -> !(hostInfo.host().equals(localHostInfo.host()) && hostInfo.port() == (localHostInfo.port()))) @@ -383,10 +355,7 @@ protected void runOneIteration() { // Only add to map if it is the first time it is discovered. Design decision to // optimistically consider every newly discovered server as alive to avoid situations of // unavailability until the heartbeating kicks in. - hostsStatus.computeIfAbsent(hostInfo.toString(), key -> new HostStatusEntity( - new HostInfoEntity(hostInfo.host(), hostInfo.port()), - true, - clock.millis())); + hostsStatus.computeIfAbsent(hostInfo, key -> new HostStatus(true, clock.millis())); } } catch (Throwable t) { LOG.error("Failed to discover cluster with exception " + t.getMessage(), t); @@ -478,6 +447,35 @@ static class HeartbeatConfig { } } + public static class HostStatus { + private boolean hostAlive; + private long lastStatusUpdateMs; + + public HostStatus( + final boolean hostAlive, + final long lastStatusUpdateMs + ) { + this.hostAlive = hostAlive; + this.lastStatusUpdateMs = lastStatusUpdateMs; + } + + public void setHostAlive(final boolean hostAlive) { + this.hostAlive = hostAlive; + } + + public void setLastStatusUpdateMs(final long lastStatusUpdateMs) { + this.lastStatusUpdateMs = lastStatusUpdateMs; + } + + public long getLastStatusUpdateMs() { + return lastStatusUpdateMs; + } + + public boolean isHostAlive() { + return hostAlive; + } + } + public static class HeartbeatInfo { private final long timestamp; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index ec3088dcd443..222837ae3a0b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -30,6 +30,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.ServiceInfo; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.function.MutableFunctionRegistry; import io.confluent.ksql.function.UserFunctionLoader; @@ -52,6 +53,7 @@ import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor; import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; +import io.confluent.ksql.rest.server.resources.ActiveStandbyResource; import io.confluent.ksql.rest.server.resources.ClusterStatusResource; import io.confluent.ksql.rest.server.resources.HealthCheckResource; import io.confluent.ksql.rest.server.resources.HeartbeatResource; @@ -159,6 +161,11 @@ public final class KsqlRestApplication extends ExecutableApplication<KsqlRestCon private final List<KsqlConfigurable> configurables; private final Consumer<KsqlConfig> rocksDBConfigSetterHandler; private final Optional<HeartbeatAgent> heartbeatAgent; + private final ActiveStandbyResource activeStandbyResource; + + // Cannot be set in constructor, depends on parent server start + private KsqlConfig ksqlConfigWithPort; + public static SourceName getCommandsStreamName() { return COMMANDS_STREAM_NAME; @@ -186,7 +193,8 @@ public static SourceName getCommandsStreamName() { final List<KsqlServerPrecondition> preconditions, final List<KsqlConfigurable> configurables, final Consumer<KsqlConfig> rocksDBConfigSetterHandler, - final Optional<HeartbeatAgent> heartbeatAgent + final Optional<HeartbeatAgent> heartbeatAgent, + final ActiveStandbyResource activeStandbyResource ) { super(restConfig); @@ -211,6 +219,7 @@ public static SourceName getCommandsStreamName() { this.rocksDBConfigSetterHandler = requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler"); this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); + this.activeStandbyResource = requireNonNull(activeStandbyResource, "activeStandbyResource"); } @Override @@ -224,8 +233,9 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap config.register(HealthCheckResource.create(ksqlResource, serviceContext, this.config)); if (heartbeatAgent.isPresent()) { config.register(new HeartbeatResource(heartbeatAgent.get())); - config.register(new ClusterStatusResource(heartbeatAgent.get())); + config.register(new ClusterStatusResource(ksqlEngine, heartbeatAgent.get())); } + config.register(activeStandbyResource); config.register(new KsqlExceptionMapper()); config.register(new ServerStateDynamicBinding(serverState)); } @@ -233,21 +243,30 @@ public void setupResources(final Configurable<?> config, final KsqlRestConfig ap @Override public void startAsync() { log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", ")); - final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); + if (ksqlConfigWithPort == null) { + ksqlConfigWithPort = buildConfigWithPort(); + } configurables.forEach(c -> c.configure(ksqlConfigWithPort)); - startKsql(ksqlConfigWithPort); + startKsql(); final Properties metricsProperties = new Properties(); metricsProperties.putAll(getConfiguration().getOriginals()); if (versionCheckerAgent != null) { versionCheckerAgent.start(KsqlModuleType.SERVER, metricsProperties); } + if (heartbeatAgent.isPresent()) { + heartbeatAgent.get().setLocalAddress((String)ksqlConfigWithPort + .getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG)); + heartbeatAgent.get().startAgent(); + } + activeStandbyResource.setLocalHostInfo((String)ksqlConfigWithPort + .getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG)); displayWelcomeMessage(); } @VisibleForTesting - void startKsql(final KsqlConfig ksqlConfigWithPort) { + void startKsql() { waitForPreconditions(); - initialize(ksqlConfigWithPort); + initialize(); } @VisibleForTesting @@ -288,7 +307,7 @@ private void waitForPreconditions() { ); } - private void initialize(final KsqlConfig configWithApplicationServer) { + private void initialize() { rocksDBConfigSetterHandler.accept(ksqlConfigNoPort); registerCommandTopic(); @@ -310,12 +329,6 @@ private void initialize(final KsqlConfig configWithApplicationServer) { serviceContext ); - if (heartbeatAgent.isPresent()) { - heartbeatAgent.get().setLocalAddress((String)configWithApplicationServer - .getKsqlStreamConfigProps().get(StreamsConfig.APPLICATION_SERVER_CONFIG)); - heartbeatAgent.get().startAgent(); - } - serverState.setReady(); } @@ -448,6 +461,9 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { ErrorMessages.class )); + final List<IRoutingFilter> routingFilters = ImmutableList.of( + new LivenessFilter(heartbeatAgent)); + container.addEndpoint( ServerEndpointConfig.Builder .create( @@ -473,10 +489,7 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) { securityExtension, serverState, serviceContext.getSchemaRegistryClientFactory(), - restConfig.getBoolean(KsqlRestConfig.KSQL_QUERY_STANDBY_ENABLE_CONFIG), - // TODO fix after merge - restConfig.getBoolean(KsqlRestConfig.KSQL_QUERY_STANDBY_ENABLE_CONFIG), - Optional.empty() + routingFilters ); } }) @@ -578,6 +591,11 @@ static KsqlRestApplication buildApplication( ErrorMessages.class )); + final Optional<HeartbeatAgent> heartbeatAgent = + initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext); + + final List<IRoutingFilter> routingFilters = ImmutableList.of(new LivenessFilter(heartbeatAgent)); + final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, commandStore, @@ -587,10 +605,7 @@ static KsqlRestApplication buildApplication( versionChecker::updateLastRequestTime, authorizationValidator, errorHandler, - restConfig.getBoolean(KsqlRestConfig.KSQL_QUERY_STANDBY_ENABLE_CONFIG), - // TODO fix after merge - restConfig.getBoolean(KsqlRestConfig.KSQL_QUERY_STANDBY_ENABLE_CONFIG), - Optional.empty() + routingFilters ); final KsqlResource ksqlResource = new KsqlResource( @@ -602,6 +617,8 @@ static KsqlRestApplication buildApplication( errorHandler ); + final ActiveStandbyResource activeStandbyResource = new ActiveStandbyResource(ksqlEngine); + final List<String> managedTopics = new LinkedList<>(); managedTopics.add(commandTopicName); if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) { @@ -634,9 +651,6 @@ static KsqlRestApplication buildApplication( final Consumer<KsqlConfig> rocksDBConfigSetterHandler = RocksDBConfigSetterHandler::maybeConfigureRocksDBConfigSetter; - final Optional<HeartbeatAgent> heartbeatAgent = - initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext); - return new KsqlRestApplication( serviceContext, ksqlEngine, @@ -656,7 +670,8 @@ static KsqlRestApplication buildApplication( preconditions, configurables, rocksDBConfigSetterHandler, - heartbeatAgent + heartbeatAgent, + activeStandbyResource ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index f0cf7c4b215c..2200d8e479d6 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -151,12 +151,6 @@ public class KsqlRestConfig extends RestConfig { private static final String KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG_DOC = "Size of thread pool used for sending / processing heartbeats and cluster discovery."; - public static final String KSQL_QUERY_STANDBY_ENABLE_CONFIG = - KSQL_CONFIG_PREFIX + "query.standby.enable"; - private static final String KSQL_QUERY_STANDBY_ENABLE_DOC = - "Whether the queries are forwarded to standby hosts when the active is down." - + " It is disabled by default."; - private static final ConfigDef CONFIG_DEF; @@ -265,12 +259,6 @@ public class KsqlRestConfig extends RestConfig { 3, Importance.MEDIUM, KSQL_HEARTBEAT_THREAD_POOL_SIZE_CONFIG_DOC - ).define( - KSQL_QUERY_STANDBY_ENABLE_CONFIG, - Type.BOOLEAN, - false, - Importance.MEDIUM, - KSQL_QUERY_STANDBY_ENABLE_DOC ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java new file mode 100644 index 000000000000..13d1d330ac92 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LivenessFilter.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static java.util.Objects.requireNonNull; + +import io.confluent.ksql.execution.streams.IRoutingFilter; +import io.confluent.ksql.rest.entity.HostStatusEntity; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.streams.state.HostInfo; + +public class LivenessFilter implements IRoutingFilter { + + private final Optional<HeartbeatAgent> heartbeatAgent; + + public LivenessFilter(Optional<HeartbeatAgent> heartbeatAgent) { + this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent"); + } + + @Override + public boolean filter(final HostInfo hostInfo, String storeName, int partition) { + if (heartbeatAgent.isPresent()) { + Map<String, HostStatusEntity> hostStatus = heartbeatAgent.get().getHostsStatus(); + if (!hostStatus.containsKey(hostInfo.toString())) { + return true; + } + return hostStatus.get(hostInfo.toString()).getHostAlive(); + } + return true; + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java index 423cb68199aa..11d80bb20695 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ServerUtil.java @@ -15,11 +15,17 @@ package io.confluent.ksql.rest.server; +import static org.apache.kafka.common.utils.Utils.getHost; +import static org.apache.kafka.common.utils.Utils.getPort; + +import io.confluent.ksql.util.KsqlException; import io.confluent.rest.RestConfig; import java.net.URI; import java.net.URL; import java.util.List; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.streams.processor.internals.StreamsMetadataState; +import org.apache.kafka.streams.state.HostInfo; public final class ServerUtil { @@ -40,4 +46,19 @@ public static URI getServerAddress(final KsqlRestConfig restConfig) { throw new ConfigException(RestConfig.LISTENERS_CONFIG, listeners, e.getMessage()); } } + + public static HostInfo parseHostInfo(final String applicationServerId) { + if (applicationServerId == null || applicationServerId.trim().isEmpty()) { + return StreamsMetadataState.UNKNOWN_HOST; + } + final String host = getHost(applicationServerId); + final Integer port = getPort(applicationServerId); + + if (host == null || port == null) { + throw new KsqlException(String.format( + "Error parsing host address %s. Expected format host:port.", applicationServerId)); + } + + return new HostInfo(host, port); + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 23bee4b79f63..b159114a159e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server.execution; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.BoundType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -45,6 +46,7 @@ import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; import io.confluent.ksql.execution.plan.SelectExpression; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.execution.streams.materialization.Locator; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; import io.confluent.ksql.execution.streams.materialization.Materialization; @@ -98,7 +100,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,26 +132,11 @@ public static void validate( throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText())); } - /*public static Optional<KsqlEntity> execute( - final ConfiguredStatement<Query> statement, - final Map<String, ?> sessionProperties, - final KsqlExecutionContext executionContext, - final ServiceContext serviceContext, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Map<String, HostInfo> hostStatuses - ) { - return Optional.of(execute(statement, executionContext, serviceContext,queryStandbysEnabled, - heartbeatEnabled, hostStatuses)); - }*/ - public static TableRowsEntity execute( final ConfiguredStatement<Query> statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { if (!statement.getStatement().isPullQuery()) { throw new IllegalArgumentException("Executor can only handle pull queries"); @@ -176,6 +162,7 @@ public static TableRowsEntity execute( final WhereInfo whereInfo = extractWhereInfo(analysis, query); final QueryId queryId = uniqueQueryId(); + final QueryContext.Stacker contextStacker = new Stacker(); final Materialization mat = query @@ -184,22 +171,20 @@ public static TableRowsEntity execute( final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema()); - final KsqlConfig ksqlConfig = statement.getConfig(); - - return handlePullQuery( - statement, - executionContext, - serviceContext, + final PullQueryContext pullQueryContext = new PullQueryContext( rowKey, mat, analysis, - query, whereInfo, queryId, - contextStacker, - queryStandbysEnabled, - heartbeatEnabled, - hostStatuses + contextStacker); + + return handlePullQuery( + statement, + executionContext, + serviceContext, + pullQueryContext, + routingFilters ); } catch (final Exception e) { @@ -211,104 +196,121 @@ public static TableRowsEntity execute( } } - // CHECKSTYLE_RULES.OFF: ParameterNumberCheck - private static TableRowsEntity handlePullQuery( - // CHECKSTYLE_RULES.OFF: ParameterNumberCheck + @VisibleForTesting + static TableRowsEntity handlePullQuery( final ConfiguredStatement<Query> statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext, - final Struct rowKey, - final Materialization mat, - final Analysis analysis, - final PersistentQueryMetadata query, - final WhereInfo whereInfo, - final QueryId queryId, - final QueryContext.Stacker contextStacker, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final PullQueryContext pullQueryContext, + final List<IRoutingFilter> routingFilters ) { - - // Get active and standby state stores for this key - final List<KsqlNode> owners = getOwners(rowKey, mat, hostStatuses); - - if (owners.isEmpty()) { - throw new MaterializationException("Unable to execute pull query :" + query); + // Get active and standby nodes for this key + final Locator locator = pullQueryContext.mat.locator(); + final List<KsqlNode> filteredAndOrderedNodes = locator.locate( + pullQueryContext.rowKey, routingFilters); + + if (filteredAndOrderedNodes.isEmpty()) { + throw new MaterializationException(String.format( + "Unable to execute pull query : %s. Streams Metadata not initialized", + statement.getStatementText())); + } + + if (statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS)) { + for (KsqlNode node : filteredAndOrderedNodes) { + try { + return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext); + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error routing query {} to host {} ", + statement.getStatementText(), node, t); + } + } + } + } else { + // Only active handles the query + // Fail fast if active is dead: Let client handle retries. + return routeQuery( + filteredAndOrderedNodes.get(0), statement, executionContext, serviceContext, + pullQueryContext); } + throw new MaterializationException( + "Unable to execute pull query :" + statement.getStatementText()); + } - // TODO What if the local node is not the active? - - for (final KsqlNode node: owners) { - try { - if (node.isLocal()) { - return queryRowsLocally( - statement, - executionContext, - whereInfo, - rowKey, - mat, - analysis, - queryId, - contextStacker); - } else if (!node.isLocal() && queryStandbysEnabled) { - return routeTo(node, statement, serviceContext); - } - } catch (Throwable t) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error routing query " + query + " to " + node, t); - } + @VisibleForTesting + static TableRowsEntity routeQuery( + final KsqlNode node, + final ConfiguredStatement<Query> statement, + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext, + final PullQueryContext pullQueryContext + ) { + try { + if (node.isLocal()) { + LOG.debug("Query {} executed locally at host {}.", + statement.getStatementText(), node.location()); + return queryRowsLocally( + statement, + executionContext, + pullQueryContext); + } else { + LOG.debug("Query {} routed to host {}.", + statement.getStatementText(), node.location()); + return forwardTo(node, statement, serviceContext); + } + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error routing query " + statement.getStatementText() + " to " + node, t); } } - throw new MaterializationException("Unable to execute pull query :" + query); + throw new MaterializationException( + "Unable to execute pull query :" + statement.getStatementText()); } - private static TableRowsEntity queryRowsLocally( + @VisibleForTesting + static TableRowsEntity queryRowsLocally( final ConfiguredStatement<Query> statement, final KsqlExecutionContext executionContext, - final WhereInfo whereInfo, - final Struct rowKey, - final Materialization mat, - final Analysis analysis, - final QueryId queryId, - final QueryContext.Stacker contextStacker + final PullQueryContext pullQueryContext ) { final Result result; - if (whereInfo.windowStartBounds.isPresent()) { - final Range<Instant> windowStart = whereInfo.windowStartBounds.get(); + if (pullQueryContext.whereInfo.windowStartBounds.isPresent()) { + final Range<Instant> windowStart = pullQueryContext.whereInfo.windowStartBounds.get(); - final List<? extends TableRow> rows = mat.windowed() - .get(rowKey, windowStart); + final List<? extends TableRow> rows = pullQueryContext.mat.windowed() + .get(pullQueryContext.rowKey, windowStart); - result = new Result(mat.schema(), rows); + result = new Result(pullQueryContext.mat.schema(), rows); } else { - final List<? extends TableRow> rows = mat.nonWindowed() - .get(rowKey) + final List<? extends TableRow> rows = pullQueryContext.mat.nonWindowed() + .get(pullQueryContext.rowKey) .map(ImmutableList::of) .orElse(ImmutableList.of()); - result = new Result(mat.schema(), rows); + result = new Result(pullQueryContext.mat.schema(), rows); } final LogicalSchema outputSchema; final List<List<?>> rows; if (isSelectStar(statement.getStatement().getSelect())) { - outputSchema = TableRowsEntityFactory.buildSchema(result.schema, mat.windowType()); + outputSchema = TableRowsEntityFactory.buildSchema( + result.schema, pullQueryContext.mat.windowType()); rows = TableRowsEntityFactory.createRows(result.rows); } else { - outputSchema = selectOutputSchema(result, executionContext, analysis); + outputSchema = selectOutputSchema(result, executionContext, pullQueryContext.analysis); rows = handleSelects( result, statement, executionContext, - analysis, + pullQueryContext.analysis, outputSchema, - queryId, - contextStacker + pullQueryContext.queryId, + pullQueryContext.contextStacker ); } return new TableRowsEntity( statement.getStatementText(), - queryId, + pullQueryContext.queryId, outputSchema, rows ); @@ -331,6 +333,55 @@ private static ImmutableAnalysis analyze( return queryAnalyzer.analyze(statement.getStatement(), Optional.empty()); } + private static final class PullQueryContext { + private final Struct rowKey; + private final Materialization mat; + private final ImmutableAnalysis analysis; + private final WhereInfo whereInfo; + private final QueryId queryId; + private final QueryContext.Stacker contextStacker; + + private PullQueryContext( + final Struct rowKey, + final Materialization mat, + final ImmutableAnalysis analysis, + final WhereInfo whereInfo, + final QueryId queryId, + final QueryContext.Stacker contextStacker + ) { + this.rowKey = Objects.requireNonNull(rowKey, "rowkey"); + this.mat = Objects.requireNonNull(mat, "materialization"); + this.analysis = Objects.requireNonNull(analysis, "analysis"); + this.whereInfo = Objects.requireNonNull(whereInfo, "whereInfo"); + this.queryId = Objects.requireNonNull(queryId, "queryId"); + this.contextStacker = Objects.requireNonNull(contextStacker, "contextStacker"); + } + + public Struct getRowKey() { + return rowKey; + } + + public Materialization getMat() { + return mat; + } + + public ImmutableAnalysis getAnalysis() { + return analysis; + } + + public WhereInfo getWhereInfo() { + return whereInfo; + } + + public QueryId getQueryId() { + return queryId; + } + + public QueryContext.Stacker getContextStacker() { + return contextStacker; + } + } + private static final class WhereInfo { private final Object rowkey; @@ -783,16 +834,8 @@ private static SourceName getSourceName(final ImmutableAnalysis analysis) { return source.getName(); } - private static List<KsqlNode> getOwners( - final Struct rowKey, - final Materialization mat, - final Optional<Map<String, HostInfo>> hostStatuses - ) { - final Locator locator = mat.locator(); - return locator.locate(rowKey, hostStatuses); - } - - private static TableRowsEntity routeTo( + @VisibleForTesting + static TableRowsEntity forwardTo( final KsqlNode owner, final ConfiguredStatement<Query> statement, final ServiceContext serviceContext @@ -911,4 +954,4 @@ public Optional<Expression> visitQualifiedColumnReference( return Optional.of(new UnqualifiedColumnReferenceExp(node.getReference())); } } -} +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java new file mode 100644 index 000000000000..8ae4fe9e2bb2 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResource.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.rest.entity.ActiveStandbyEntity; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; +import io.confluent.ksql.rest.entity.Versions; +import io.confluent.ksql.rest.server.ServerUtil; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.QueryMetadata; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.StreamsMetadata; + +/** + * Endpoint for registering heartbeats received from remote servers. The heartbeats are used + * to determine the status of the remote servers, i.e. whether they are alive or dead. + */ + +@Path("/activeStandby") +@Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) +public class ActiveStandbyResource { + + private final KsqlEngine engine; + private HostInfo localHostInfo; + + public ActiveStandbyResource(final KsqlEngine engine) { + this.engine = engine; + } + + public void setLocalHostInfo(final String applicationServerId) { + this.localHostInfo = ServerUtil.parseHostInfo(applicationServerId); + } + + @GET + public Response getActiveStandbyInformation() { + return Response.ok(getResponse()).build(); + } + + @VisibleForTesting + ActiveStandbyResponse getResponse() { + final List<PersistentQueryMetadata> currentQueries = engine.getPersistentQueries(); + if (currentQueries.isEmpty()) { + // empty response + return new ActiveStandbyResponse(Collections.emptyMap()); + } + + final Map<String, ActiveStandbyEntity> perQueryMap = new HashMap<>(); + for (PersistentQueryMetadata persistentMetadata: currentQueries) { + for (StreamsMetadata streamsMetadata : ((QueryMetadata)persistentMetadata).getAllMetadata()) { + if (streamsMetadata == null || !streamsMetadata.hostInfo().equals(localHostInfo)) { + continue; + } + final ActiveStandbyEntity entity = new ActiveStandbyEntity( + streamsMetadata.stateStoreNames(), + streamsMetadata.topicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet()), + streamsMetadata.standbyStateStoreNames(), + streamsMetadata.standbyTopicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet())); + perQueryMap.put(((QueryMetadata)persistentMetadata).getQueryApplicationId(), entity); + } + + } + return new ActiveStandbyResponse(perQueryMap); + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java index 53819c6ac13e..920f41379421 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java @@ -15,14 +15,29 @@ package io.confluent.ksql.rest.server.resources; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.rest.entity.ActiveStandbyEntity; import io.confluent.ksql.rest.entity.ClusterStatusResponse; +import io.confluent.ksql.rest.entity.HostInfoEntity; +import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.rest.entity.Versions; import io.confluent.ksql.rest.server.HeartbeatAgent; +import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatus; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.QueryMetadata; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.StreamsMetadata; /** * Endpoint that reports the view of the cluster that this server has. @@ -34,9 +49,11 @@ @Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) public class ClusterStatusResource { + private final KsqlEngine engine; private final HeartbeatAgent heartbeatAgent; - public ClusterStatusResource(final HeartbeatAgent heartbeatAgent) { + public ClusterStatusResource(final KsqlEngine engine,final HeartbeatAgent heartbeatAgent) { + this.engine = engine; this.heartbeatAgent = heartbeatAgent; } @@ -47,6 +64,51 @@ public Response checkClusterStatus() { } private ClusterStatusResponse getResponse() { - return new ClusterStatusResponse(heartbeatAgent.getHostsStatus()); + Map<HostInfo, HostStatus> allHostStatus = heartbeatAgent.getHostsStatus(); + + Map<HostInfoEntity, HostStatusEntity> response = allHostStatus + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> new HostInfoEntity(entry.getKey().host(), entry.getKey().port()) , + entry -> new HostStatusEntity(entry.getValue().isHostAlive(), + entry.getValue().getLastStatusUpdateMs(), + getActiveStandbyInformation(entry.getKey())))); + + return new ClusterStatusResponse(response); + } + + private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(HostInfo hostInfo) { + final List<PersistentQueryMetadata> currentQueries = engine.getPersistentQueries(); + if (currentQueries.isEmpty()) { + // empty response + return Collections.emptyMap(); + } + + final Map<String, ActiveStandbyEntity> perQueryMap = new HashMap<>(); + for (PersistentQueryMetadata persistentMetadata: currentQueries) { + for (StreamsMetadata streamsMetadata : persistentMetadata + .getAllMetadata()) { + if (streamsMetadata == null || !streamsMetadata.hostInfo().equals(hostInfo)) { + continue; + } + if (streamsMetadata == StreamsMetadata.NOT_AVAILABLE) { + continue; + } + final ActiveStandbyEntity entity = new ActiveStandbyEntity( + streamsMetadata.stateStoreNames(), + streamsMetadata.topicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet()), + streamsMetadata.standbyStateStoreNames(), + streamsMetadata.standbyTopicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet())); + perQueryMap.put(((QueryMetadata) persistentMetadata).getQueryApplicationId(), entity); + } + } + return perQueryMap; } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java index cefaecc9a6a4..6ba62213d281 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java @@ -22,6 +22,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.TableRowsEntity; @@ -31,11 +32,8 @@ import io.confluent.ksql.statement.ConfiguredStatement; import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.Callable; import java.util.stream.Collectors; -import org.apache.kafka.streams.state.HostInfo; class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> { @@ -43,20 +41,15 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> { private final ServiceContext serviceContext; private final ConfiguredStatement<Query> query; private final TheQueryExecutor pullQueryExecutor; - private final boolean queryStandbysEnabled; - private final boolean heartbeatEnabled; - private final Optional<Map<String, HostInfo>> hostStatuses; + private final List<IRoutingFilter> routingFilters; PullQueryPublisher( final KsqlEngine ksqlEngine, final ServiceContext serviceContext, final ConfiguredStatement<Query> query, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { - this(ksqlEngine, serviceContext, query, queryStandbysEnabled, - heartbeatEnabled, hostStatuses, PullQueryExecutor::execute); + this(ksqlEngine, serviceContext, query, routingFilters, PullQueryExecutor::execute); } @VisibleForTesting @@ -64,26 +57,21 @@ class PullQueryPublisher implements Flow.Publisher<Collection<StreamedRow>> { final KsqlEngine ksqlEngine, final ServiceContext serviceContext, final ConfiguredStatement<Query> query, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses, + final List<IRoutingFilter> routingFilters, final TheQueryExecutor pullQueryExecutor ) { this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); this.serviceContext = requireNonNull(serviceContext, "serviceContext"); this.query = requireNonNull(query, "query"); this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor"); - this.queryStandbysEnabled = queryStandbysEnabled; - this.heartbeatEnabled = heartbeatEnabled; - this.hostStatuses = hostStatuses; + this.routingFilters = requireNonNull(routingFilters, "routingFilters"); } @Override public synchronized void subscribe(final Subscriber<Collection<StreamedRow>> subscriber) { final PullQuerySubscription subscription = new PullQuerySubscription( subscriber, - () -> pullQueryExecutor.execute(query, ksqlEngine, serviceContext, queryStandbysEnabled, - heartbeatEnabled, hostStatuses) + () -> pullQueryExecutor.execute(query, ksqlEngine, serviceContext, routingFilters) ); subscriber.onSubscribe(subscription); @@ -146,9 +134,7 @@ TableRowsEntity execute( ConfiguredStatement<Query> statement, KsqlExecutionContext executionContext, ServiceContext serviceContext, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + List<IRoutingFilter> routingFilters ); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 2b5d042c3b87..c76b9efb0704 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -29,6 +29,7 @@ import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.entity.TableRowsEntity; import io.confluent.ksql.rest.entity.Versions; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.execution.PullQueryExecutor; @@ -61,7 +62,6 @@ import javax.ws.rs.core.Response; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,9 +82,7 @@ public class StreamedQueryResource implements KsqlConfigurable { private final Optional<KsqlAuthorizationValidator> authorizationValidator; private final Errors errorHandler; private KsqlConfig ksqlConfig; - private final boolean queryStandbysEnabled; - private final boolean heartbeatEnabled; - private final Optional<Map<String, HostInfo>> hostStatuses; + private final List<IRoutingFilter> routingFilters; public StreamedQueryResource( final KsqlEngine ksqlEngine, @@ -94,9 +92,7 @@ public StreamedQueryResource( final ActivenessRegistrar activenessRegistrar, final Optional<KsqlAuthorizationValidator> authorizationValidator, final Errors errorHandler, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { this( ksqlEngine, @@ -107,9 +103,7 @@ public StreamedQueryResource( activenessRegistrar, authorizationValidator, errorHandler, - queryStandbysEnabled, - heartbeatEnabled, - hostStatuses + routingFilters ); } @@ -125,9 +119,7 @@ public StreamedQueryResource( final ActivenessRegistrar activenessRegistrar, final Optional<KsqlAuthorizationValidator> authorizationValidator, final Errors errorHandler, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); @@ -141,9 +133,7 @@ public StreamedQueryResource( Objects.requireNonNull(activenessRegistrar, "activenessRegistrar"); this.authorizationValidator = authorizationValidator; this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); - this.queryStandbysEnabled = queryStandbysEnabled; - this.heartbeatEnabled = heartbeatEnabled; - this.hostStatuses = hostStatuses; + this.routingFilters = Objects.requireNonNull(routingFilters, "routingFilters"); } @Override @@ -250,10 +240,8 @@ private Response handlePullQuery( final ConfiguredStatement<Query> configured = ConfiguredStatement.of(statement,streamsProperties, ksqlConfig); - // TODO fix after heartbeat merge final TableRowsEntity entity = PullQueryExecutor - .execute(configured, ksqlEngine, serviceContext, queryStandbysEnabled, heartbeatEnabled, - hostStatuses); + .execute(configured, ksqlEngine, serviceContext, routingFilters); final StreamedRow header = StreamedRow.header(entity.getQueryId(), entity.getSchema()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 140a50a673e2..436e56bc7c08 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; @@ -65,7 +66,6 @@ import javax.websocket.server.ServerEndpoint; import javax.ws.rs.core.Response; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,9 +101,7 @@ public class WSQueryEndpoint { private final ServerState serverState; private final Errors errorHandler; private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory; - private final boolean queryStandbysEnabled; - private final boolean heartbeatEnabled; - private final Optional<Map<String, HostInfo>> hostStatuses; + private final List<IRoutingFilter> routingFilters; private WebSocketSubscriber<?> subscriber; private KsqlSecurityContext securityContext; @@ -124,9 +122,7 @@ public WSQueryEndpoint( final KsqlSecurityExtension securityExtension, final ServerState serverState, final Supplier<SchemaRegistryClient> schemaRegistryClientFactory, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { this(ksqlConfig, mapper, @@ -146,9 +142,8 @@ public WSQueryEndpoint( RestServiceContextFactory::create, serverState, schemaRegistryClientFactory, - queryStandbysEnabled, - heartbeatEnabled, - hostStatuses); + routingFilters + ); } // CHECKSTYLE_RULES.OFF: ParameterNumberCheck @@ -172,9 +167,7 @@ public WSQueryEndpoint( final DefaultServiceContextFactory defaultServiceContextFactory, final ServerState serverState, final Supplier<SchemaRegistryClient> schemaRegistryClientFactory, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.mapper = Objects.requireNonNull(mapper, "mapper"); @@ -201,9 +194,7 @@ public WSQueryEndpoint( this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler"); this.schemaRegistryClientFactory = Objects.requireNonNull(schemaRegistryClientFactory, "schemaRegistryClientFactory"); - this.queryStandbysEnabled = queryStandbysEnabled; - this.heartbeatEnabled = heartbeatEnabled; - this.hostStatuses = hostStatuses; + this.routingFilters = Objects.requireNonNull(routingFilters, "routingFilters"); } @SuppressWarnings("unused") @@ -415,9 +406,7 @@ private void handleQuery(final RequestContext info, final Query query) { exec, configured, streamSubscriber, - queryStandbysEnabled, - heartbeatEnabled, - hostStatuses + routingFilters ); } else { pushQueryPublisher.start( @@ -479,13 +468,10 @@ private static void startPullQueryPublisher( final ListeningScheduledExecutorService ignored, final ConfiguredStatement<Query> query, final WebSocketSubscriber<StreamedRow> streamSubscriber, - final boolean queryStandbysEnabled, - final boolean heartbeatEnabled, - final Optional<Map<String, HostInfo>> hostStatuses + final List<IRoutingFilter> routingFilters ) { - new PullQueryPublisher(ksqlEngine, serviceContext, query, queryStandbysEnabled, - heartbeatEnabled, hostStatuses) + new PullQueryPublisher(ksqlEngine, serviceContext, query, routingFilters) .subscribe(streamSubscriber); } @@ -519,9 +505,7 @@ void start( ListeningScheduledExecutorService exec, ConfiguredStatement<Query> query, WebSocketSubscriber<StreamedRow> subscriber, - boolean queryStandbysEnabled, - boolean heartbeatEnabled, - Optional<Map<String, HostInfo>> hostStatuses); + List<IRoutingFilter> routingFilters); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java index 75a1745d0eaa..a6e7e10c612b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java @@ -26,6 +26,7 @@ import io.confluent.ksql.rest.client.KsqlTarget; import io.confluent.ksql.rest.client.QueryStream; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.HostInfoEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; @@ -126,4 +127,15 @@ public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(final URI se .orElse(target) .getClusterStatus(); } + + @Override + public RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(final URI serverEndPoint) { + final KsqlTarget target = sharedClient + .target(serverEndPoint); + + return authHeader + .map(target::authorizationHeader) + .orElse(target) + .getActiveStandByInformation(); + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java index efd76d739f21..20b98c68bae2 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java @@ -19,6 +19,7 @@ import io.confluent.ksql.rest.client.KsqlClientUtil; import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -87,4 +88,10 @@ public void makeAsyncHeartbeatRequest( public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(final URI serverEndPoint) { throw new UnsupportedOperationException(); } + + @Override + public RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(final URI serverEndPoint) { + throw new UnsupportedOperationException(); + } + } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java index 90b6ac5c5e75..896b6b67487e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java @@ -21,20 +21,13 @@ import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; -import io.confluent.ksql.rest.client.KsqlRestClient; -import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; -import io.confluent.ksql.rest.entity.HostInfoEntity; -import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.rest.server.KsqlRestConfig; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.serde.Format; import io.confluent.ksql.test.util.secure.ClientTrustStore; import io.confluent.ksql.util.PageViewDataProvider; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.state.HostInfo; import org.junit.After; @@ -58,6 +51,7 @@ public class HeartbeatAgentFunctionalTest { private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8088") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8088") .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, 600000) .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, 200) @@ -67,6 +61,7 @@ public class HeartbeatAgentFunctionalTest { private static final TestKsqlRestApp REST_APP_1 = TestKsqlRestApp .builder(TEST_HARNESS::kafkaBootstrapServers) .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8089") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8089") .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, 600000) .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, 200) @@ -107,13 +102,14 @@ public void tearDown() { @Test(timeout = 60000) public void shouldMarkServersAsUp() { // Given: - waitForClusterToBeDiscovered(); - waitForRemoteServerToChangeStatus(this::remoteServerIsDown); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(2, REST_APP_0); + HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // When: - sendHeartbeartsEveryIntervalForWindowLength(100, 3000); - final ClusterStatusResponse clusterStatusResponseUp = waitForRemoteServerToChangeStatus( - this::remoteServerIsUp); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); + final ClusterStatusResponse clusterStatusResponseUp = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: assertThat(clusterStatusResponseUp.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); @@ -123,11 +119,11 @@ public void shouldMarkServersAsUp() { @Test(timeout = 60000) public void shouldMarkRemoteServerAsDown() { // Given: - waitForClusterToBeDiscovered(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(2, REST_APP_0); // When: - ClusterStatusResponse clusterStatusResponse = waitForRemoteServerToChangeStatus( - this::remoteServerIsDown); + ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // Then: assertThat(clusterStatusResponse.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); @@ -137,126 +133,32 @@ public void shouldMarkRemoteServerAsDown() { @Test(timeout = 60000) public void shouldMarkRemoteServerAsUpThenDownThenUp() { // Given: - waitForClusterToBeDiscovered(); - sendHeartbeartsEveryIntervalForWindowLength(100, 2000); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(2, REST_APP_0); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); // When: - final ClusterStatusResponse clusterStatusResponseUp1 = waitForRemoteServerToChangeStatus( - this::remoteServerIsUp); + final ClusterStatusResponse clusterStatusResponseUp1 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: assertThat(clusterStatusResponseUp1.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); assertThat(clusterStatusResponseUp1.getClusterStatus().get(host1.toString()).getHostAlive(), is(true)); // When: - ClusterStatusResponse clusterStatusResponseDown = waitForRemoteServerToChangeStatus( - this::remoteServerIsDown); + ClusterStatusResponse clusterStatusResponseDown = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsDown); // Then: assertThat(clusterStatusResponseDown.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); assertThat(clusterStatusResponseDown.getClusterStatus().get(host1.toString()).getHostAlive(), is(false)); // When : - sendHeartbeartsEveryIntervalForWindowLength(100, 2000); - ClusterStatusResponse clusterStatusResponseUp2 = waitForRemoteServerToChangeStatus( - this::remoteServerIsUp); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength(REST_APP_0, host1, 100, 3000); + ClusterStatusResponse clusterStatusResponseUp2 = HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + REST_APP_0, host1, HighAvailabilityTestUtil::remoteServerIsUp); // Then: assertThat(clusterStatusResponseUp2.getClusterStatus().get(host0.toString()).getHostAlive(), is(true)); assertThat(clusterStatusResponseUp2.getClusterStatus().get(host1.toString()).getHostAlive(), is(true)); } - - private void waitForClusterToBeDiscovered() { - while (true) { - final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(REST_APP_0); - if(allServersDiscovered(clusterStatusResponse.getClusterStatus())) { - break; - } - try { - Thread.sleep(200); - } catch (final Exception e) { - // Meh - } - } - } - - private boolean allServersDiscovered(Map<String, HostStatusEntity> clusterStatus) { - if(clusterStatus.size() < 2) { - return false; - } - return true; - } - - private void sendHeartbeartsEveryIntervalForWindowLength(long interval, long window) { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < window) { - sendHeartbeatRequest(REST_APP_0, host1, System.currentTimeMillis()); - try { - Thread.sleep(interval); - } catch (final Exception e) { - // Meh - } - } - } - - private ClusterStatusResponse waitForRemoteServerToChangeStatus( - Function<Map<String, HostStatusEntity>, Boolean> function) - { - while (true) { - final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(REST_APP_0); - if(function.apply(clusterStatusResponse.getClusterStatus())) { - return clusterStatusResponse; - } - try { - Thread.sleep(200); - } catch (final Exception e) { - // Meh - } - } - } - - private boolean remoteServerIsDown(Map<String, HostStatusEntity> clusterStatus) { - if (!clusterStatus.containsKey(host1.toString())) { - return true; - } - for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) { - if (entry.getKey().contains("8089") && !entry.getValue().getHostAlive()) { - return true; - } - } - return false; - } - - private boolean remoteServerIsUp(Map<String, HostStatusEntity> clusterStatus) { - for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) { - if (entry.getKey().contains("8089") && entry.getValue().getHostAlive()) { - return true; - } - } - return false; - } - - private static void sendHeartbeatRequest( - final TestKsqlRestApp restApp, - final HostInfo host, - final long timestamp) { - - try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - restClient.makeAsyncHeartbeatRequest(new HostInfoEntity(host.host(), host.port()), timestamp); - } - } - - private static ClusterStatusResponse sendClusterStatusRequest(final TestKsqlRestApp restApp) { - - try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { - - final RestResponse<ClusterStatusResponse> res = restClient.makeClusterStatusRequest(); - - if (res.isErroneous()) { - throw new AssertionError("Erroneous result: " + res.getErrorMessage()); - } - - return res.getResponse(); - } - } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java new file mode 100644 index 000000000000..aab6cd0430d1 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import io.confluent.ksql.rest.client.KsqlRestClient; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ClusterStatusResponse; +import io.confluent.ksql.rest.entity.HostInfoEntity; +import io.confluent.ksql.rest.entity.HostStatusEntity; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.BiFunction; +import org.apache.kafka.streams.state.HostInfo; + +class HighAvailabilityTestUtil { + + static void waitForClusterToBeDiscovered(final int numServers, final TestKsqlRestApp restApp) { + while (true) { + final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp); + if(allServersDiscovered(numServers, clusterStatusResponse.getClusterStatus())) { + break; + } + try { + Thread.sleep(200); + } catch (final Exception e) { + // Meh + } + } + } + + static boolean allServersDiscovered( + final int numServers, + final Map<String, HostStatusEntity> clusterStatus) { + + if(clusterStatus.size() < numServers) { + return false; + } + return true; + } + + static void sendHeartbeartsEveryIntervalForWindowLength( + final TestKsqlRestApp receiverApp, + final HostInfo sender, + final long interval, + final long window) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < window) { + sendHeartbeatRequest(receiverApp, sender, System.currentTimeMillis()); + try { + Thread.sleep(interval); + } catch (final Exception e) { + // Meh + } + } + } + + static ClusterStatusResponse waitForRemoteServerToChangeStatus( + final TestKsqlRestApp restApp, + final HostInfo remoteServer, + final BiFunction<HostInfo, Map<String, HostStatusEntity>, Boolean> function) + { + while (true) { + final ClusterStatusResponse clusterStatusResponse = sendClusterStatusRequest(restApp); + if(function.apply(remoteServer, clusterStatusResponse.getClusterStatus())) { + return clusterStatusResponse; + } + try { + Thread.sleep(200); + } catch (final Exception e) { + // Meh + } + } + } + + static boolean remoteServerIsDown( + final HostInfo remoteServer, + final Map<String, HostStatusEntity> clusterStatus) { + if (!clusterStatus.containsKey(remoteServer.toString())) { + return true; + } + for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) { + if (entry.getKey().contains(String.valueOf(remoteServer.port())) + && !entry.getValue().getHostAlive()) { + return true; + } + } + return false; + } + + static boolean remoteServerIsUp( + final HostInfo remoteServer, + final Map<String, HostStatusEntity> clusterStatus) { + for( Entry<String, HostStatusEntity> entry: clusterStatus.entrySet()) { + if (entry.getKey().contains(String.valueOf(remoteServer.port())) + && entry.getValue().getHostAlive()) { + return true; + } + } + return false; + } + + static void sendHeartbeatRequest( + final TestKsqlRestApp restApp, + final HostInfo host, + final long timestamp) { + + try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { + restClient.makeAsyncHeartbeatRequest(new HostInfoEntity(host.host(), host.port()), timestamp); + } + } + + static ClusterStatusResponse sendClusterStatusRequest(final TestKsqlRestApp restApp) { + + try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { + + final RestResponse<ClusterStatusResponse> res = restClient.makeClusterStatusRequest(); + + if (res.isErroneous()) { + throw new AssertionError("Erroneous result: " + res.getErrorMessage()); + } + + return res.getResponse(); + } + } + +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java new file mode 100644 index 000000000000..ff42d18504a4 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -0,0 +1,407 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.integration; + +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.client.KsqlRestClient; +import io.confluent.ksql.rest.client.RestResponse; +import io.confluent.ksql.rest.entity.ActiveStandbyEntity; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; +import io.confluent.ksql.rest.entity.StreamedRow; +import io.confluent.ksql.rest.server.KsqlRestConfig; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.test.util.KsqlIdentifierTestUtil; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.UserDataProvider; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +/** + * Test to ensure pull queries route across multiple KSQL nodes correctly. + * + * <p>For tests on general syntax and handled see RestQueryTranslationTest's + * materialized-aggregate-static-queries.json + */ +@SuppressWarnings("OptionalGetWithoutIsPresent") +@Category({IntegrationTest.class}) +public class PullQueryRoutingFunctionalTest { + + private static final TemporaryFolder TMP = new TemporaryFolder(); + + static { + try { + TMP.create(); + } catch (final IOException e) { + throw new AssertionError("Failed to init TMP", e); + } + } + private static final HostInfo host0 = new HostInfo("localhost", 8088); + private static final HostInfo host1 = new HostInfo("localhost",8089); + private static final HostInfo host2 = new HostInfo("localhost",8087); + private static final String USER_TOPIC = "user_topic"; + private static final String USERS_STREAM = "users"; + private static final UserDataProvider USER_PROVIDER = new UserDataProvider(); + private static final Format VALUE_FORMAT = Format.JSON; + private static final int HEADER = 1; + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + private static final int BASE_TIME = 1_000_000; + + private static final PhysicalSchema AGGREGATE_SCHEMA = PhysicalSchema.from( + LogicalSchema.builder() + .valueColumn(ColumnName.of("COUNT"), SqlTypes.BIGINT) + .build(), + SerdeOption.none() + ); + + private static final String QUERY_ID = "_confluent-ksql-default_query_CTAS_ID_0_0"; + + private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir()) + .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8088") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8088") + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, 600000) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, 200) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG, 2000) + .withProperty(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS, true) + .withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + "num.standby.replicas", 1) + .build(); + + private static final TestKsqlRestApp REST_APP_1 = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir()) + .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8089") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8089") + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, 600000) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, 200) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG, 2000) + .withProperty(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS, true) + .withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + "num.standby.replicas", 1) + .build(); + + private static final TestKsqlRestApp REST_APP_2 = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir()) + .withProperty(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:8087") + .withProperty(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:8087") + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_SEND_INTERVAL_MS_CONFIG, 600000) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_CHECK_INTERVAL_MS_CONFIG, 200) + .withProperty(KsqlRestConfig.KSQL_HEARTBEAT_DISCOVER_CLUSTER_MS_CONFIG, 2000) + .withProperty(KsqlConfig.KSQL_QUERY_PULL_ALLOW_STALE_READS, true) + .withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + "num.standby.replicas", 1) + .build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS) + .around(REST_APP_0) + .around(REST_APP_1) + .around(REST_APP_2); + + private static String output; + + @BeforeClass + public static void setUpClass() { + //Create topic with 1 partition to control who is active and standby + TEST_HARNESS.ensureTopics(1, USER_TOPIC); + + final AtomicLong timestampSupplier = new AtomicLong(BASE_TIME); + + TEST_HARNESS.produceRows( + USER_TOPIC, + USER_PROVIDER, + VALUE_FORMAT, + timestampSupplier::getAndIncrement + ); + + //Create stream + makeAdminRequest( + "CREATE STREAM " + USERS_STREAM + + " (" + USER_PROVIDER.ksqlSchemaString() + ")" + + " WITH (" + + " kafka_topic='" + USER_TOPIC + "', " + + " key='" + USER_PROVIDER.key() + "', " + + " value_format='" + VALUE_FORMAT + "'" + + ");" + ); + } + + @Before + public void setUp() { + REST_APP_0.start(); + REST_APP_1.start(); + REST_APP_2.start(); + output = KsqlIdentifierTestUtil.uniqueIdentifierName(); + } + + @After + public void cleanUp() { + REST_APP_0.stop(); + REST_APP_1.stop(); + REST_APP_2.stop(); + } + + + @Test(timeout = 60000) + public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { + // Given: + final String key = Iterables.get(USER_PROVIDER.data().keySet(), 0); + final String sql = "SELECT * FROM " + output + " WHERE ROWKEY = '" + key + "';"; + makeAdminRequest( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(1) AS COUNT FROM " + USERS_STREAM + + " GROUP BY " + USER_PROVIDER.key() + ";" + ); + waitForTableRows(); + waitForStreamsMetadataToInitialize(); + ClusterFormation clusterFormation = findClusterFormation(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.standBy.right); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( + clusterFormation.standBy.right, clusterFormation.active.left, 100, 2000); + HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + clusterFormation.standBy.right, clusterFormation.active.left, HighAvailabilityTestUtil::remoteServerIsUp); + + // When: + final List<StreamedRow> rows_0 = makePullQueryRequest(clusterFormation.standBy.right, sql); + + // Then: + assertThat(rows_0, hasSize(HEADER + 1)); + assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); + assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + } + + + @Test(timeout = 60000) + public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { + // Given: + final String key = Iterables.get(USER_PROVIDER.data().keySet(), 0); + final String sql = "SELECT * FROM " + output + " WHERE ROWKEY = '" + key + "';"; + makeAdminRequest( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(1) AS COUNT FROM " + USERS_STREAM + + " GROUP BY " + USER_PROVIDER.key() + ";" + ); + waitForTableRows(); + waitForStreamsMetadataToInitialize(); + ClusterFormation clusterFormation = findClusterFormation(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.router.right); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( + clusterFormation.router.right, clusterFormation.active.left, 100, 2000); + HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + clusterFormation.router.right, + clusterFormation.active.left, + HighAvailabilityTestUtil::remoteServerIsUp); + + // When: + final List<StreamedRow> rows_0 = makePullQueryRequest(clusterFormation.router.right, sql); + + // Then: + assertThat(rows_0, hasSize(HEADER + 1)); + assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); + assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + } + + + @Test(timeout = 60000) + public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() { + // Given: + final String key = Iterables.get(USER_PROVIDER.data().keySet(), 0); + final String sql = "SELECT * FROM " + output + " WHERE ROWKEY = '" + key + "';"; + makeAdminRequest( + "CREATE TABLE " + output + " AS" + + " SELECT COUNT(1) AS COUNT FROM " + USERS_STREAM + + " GROUP BY " + USER_PROVIDER.key() + ";" + ); + waitForTableRows(); + waitForStreamsMetadataToInitialize(); + ClusterFormation clusterFormation = findClusterFormation(); + HighAvailabilityTestUtil.waitForClusterToBeDiscovered(3, clusterFormation.router.right); + HighAvailabilityTestUtil.sendHeartbeartsEveryIntervalForWindowLength( + clusterFormation.router.right, clusterFormation.standBy.left, 100, 2000); + HighAvailabilityTestUtil.waitForRemoteServerToChangeStatus( + clusterFormation.router.right, + clusterFormation.standBy.left, + HighAvailabilityTestUtil::remoteServerIsUp); + + // When: + final List<StreamedRow> rows_0 = makePullQueryRequest(clusterFormation.router.right, sql); + + // Then: + assertThat(rows_0, hasSize(HEADER + 1)); + assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); + assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + } + + private static List<StreamedRow> makePullQueryRequest( + final TestKsqlRestApp target, + final String sql + ) { + return RestIntegrationTestUtil.makeQueryRequest(target, sql, Optional.empty()); + } + + private static void makeAdminRequest(final String sql) { + RestIntegrationTestUtil.makeKsqlRequest(REST_APP_0, sql, Optional.empty()); + } + + private void waitForStreamsMetadataToInitialize() { + while (true) { + ActiveStandbyResponse response0 = makeActiveStandbyRequest(REST_APP_0); + if(response0.getPerQueryInfo().get(QUERY_ID) != null) { + break; + } + try { + Thread.sleep(200); + } catch (final Exception e) { + // Meh + } + } + } + + private static ClusterFormation findClusterFormation() { + ClusterFormation clusterFormation = new ClusterFormation(); + ActiveStandbyResponse response0 = makeActiveStandbyRequest(REST_APP_0); + ActiveStandbyEntity entity0 = response0.getPerQueryInfo().get(QUERY_ID); + ActiveStandbyResponse response1 = makeActiveStandbyRequest(REST_APP_1); + ActiveStandbyEntity entity1 = response1.getPerQueryInfo().get(QUERY_ID); + ActiveStandbyResponse response2 = makeActiveStandbyRequest(REST_APP_2); + ActiveStandbyEntity entity2 = response2.getPerQueryInfo().get(QUERY_ID); + + // find active + if(!entity0.getActiveStores().isEmpty() && !entity0.getActivePartitions().isEmpty()) { + clusterFormation.setActive(Pair.of(host0, REST_APP_0)); + } + else if(!entity1.getActiveStores().isEmpty() && !entity1.getActivePartitions().isEmpty()) { + clusterFormation.setActive(Pair.of(host1, REST_APP_1)); + } else { + clusterFormation.setActive(Pair.of(host2, REST_APP_2)); + } + + //find standby + if(!entity0.getStandByStores().isEmpty() && !entity0.getStandByPartitions().isEmpty()) { + clusterFormation.setStandBy(Pair.of(host0, REST_APP_0)); + } + else if(!entity1.getStandByStores().isEmpty() && !entity1.getStandByPartitions().isEmpty()) { + clusterFormation.setStandBy(Pair.of(host1, REST_APP_1)); + } else { + clusterFormation.setStandBy(Pair.of(host2, REST_APP_2)); + } + + //find router + if(entity0.getStandByStores().isEmpty() && entity0.getActiveStores().isEmpty()) { + clusterFormation.setRouter(Pair.of(host0, REST_APP_0)); + } + else if(entity1.getStandByStores().isEmpty() && entity1.getActiveStores().isEmpty()) { + clusterFormation.setRouter(Pair.of(host1, REST_APP_1)); + } else { + clusterFormation.setRouter(Pair.of(host2, REST_APP_2)); + } + + return clusterFormation; + } + + private static ActiveStandbyResponse makeActiveStandbyRequest(final TestKsqlRestApp restApp) { + + try (final KsqlRestClient restClient = restApp.buildKsqlClient()) { + + final RestResponse<ActiveStandbyResponse> res = restClient.makeActiveStandbyRequest(); + + if (res.isErroneous()) { + throw new AssertionError("Erroneous result: " + res.getErrorMessage()); + } + + return res.getResponse(); + } + } + + static class ClusterFormation { + Pair<HostInfo, TestKsqlRestApp> active; + Pair<HostInfo, TestKsqlRestApp> standBy; + Pair<HostInfo, TestKsqlRestApp> router; + + ClusterFormation() { + } + + public void setActive(final Pair<HostInfo, TestKsqlRestApp> active) { + this.active = active; + } + + public void setStandBy(final Pair<HostInfo, TestKsqlRestApp> standBy) { + this.standBy = standBy; + } + + public void setRouter(final Pair<HostInfo, TestKsqlRestApp> router) { + this.router = router; + } + } + + private void waitForTableRows() { + TEST_HARNESS.verifyAvailableUniqueRows( + output.toUpperCase(), + USER_PROVIDER.data().size(), + VALUE_FORMAT, + AGGREGATE_SCHEMA + ); + } + + private static String getNewStateDir() { + try { + return TMP.newFolder().getAbsolutePath(); + } catch (final IOException e) { + throw new AssertionError("Failed to create new state dir", e); + } + } +} + diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java index 6ca019020180..8871a9db452c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java @@ -22,11 +22,10 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.rest.entity.HostInfoEntity; -import io.confluent.ksql.rest.entity.HostStatusEntity; import io.confluent.ksql.rest.server.HeartbeatAgent.Builder; import io.confluent.ksql.rest.server.HeartbeatAgent.CheckHeartbeatService; import io.confluent.ksql.rest.server.HeartbeatAgent.DiscoverClusterService; +import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatus; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.PersistentQueryMetadata; import java.util.List; @@ -73,11 +72,9 @@ public void setUp() { .heartbeatMissedThreshold(2) .build(ksqlEngine, serviceContext); heartbeatAgent.setLocalAddress(LOCALHOST_URL); - Map<String, HostStatusEntity> hostsStatus = new ConcurrentHashMap<>(); - hostsStatus.put(localHostInfo.toString(), new HostStatusEntity( - new HostInfoEntity(localHostInfo.host(), localHostInfo.port()), true, 0L)); - hostsStatus.put(remoteHostInfo.toString(), new HostStatusEntity( - new HostInfoEntity(remoteHostInfo.host(), remoteHostInfo.port()), true, 0L)); + Map<HostInfo, HostStatus> hostsStatus = new ConcurrentHashMap<>(); + hostsStatus.put(localHostInfo, new HostStatus(true, 0L)); + hostsStatus.put(remoteHostInfo, new HostStatus(true, 0L)); heartbeatAgent.setHostsStatus(hostsStatus); allMetadata0 = ImmutableList.of(streamsMetadata0); allMetadata1 = ImmutableList.of(streamsMetadata1); @@ -100,7 +97,7 @@ public void shouldDiscoverServersInCluster() { discoverService.runOneIteration(); // Then: - assertThat(heartbeatAgent.getHostsStatus().keySet().contains(remoteHostInfo.toString()), is(true)); + assertThat(heartbeatAgent.getHostsStatus().containsKey(remoteHostInfo), is(true)); } @Test @@ -121,7 +118,7 @@ public void shouldMarkServerAsUpNoMissingHeartbeat() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(true)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(true)); } @Test @@ -142,7 +139,7 @@ public void shouldMarkServerAsUpMissOneHeartbeat() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(true)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(true)); } @Test @@ -158,7 +155,7 @@ public void shouldMarkServerAsUpMissAtBeginning() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(true)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(true)); } @Test @@ -177,7 +174,7 @@ public void shouldMarkServerAsUpMissInterleaved() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(true)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(true)); } @Test @@ -196,7 +193,7 @@ public void shouldMarkServerAsUpOutOfOrderHeartbeats() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(true)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(true)); } @Test @@ -216,7 +213,7 @@ public void shouldMarkServerAsDownMissAtEnd() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(false)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(false)); } @Test @@ -238,7 +235,7 @@ public void shouldMarkServerAsDownIgnoreHeartbeatsOutOfWindow() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(false)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(false)); } @Test @@ -260,7 +257,7 @@ public void shouldMarkServerAsDownOutOfOrderHeartbeats() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); - assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo.toString()).getHostAlive(), is(false)); + assertThat(heartbeatAgent.getHostsStatus().get(remoteHostInfo).isHostAlive(), is(false)); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 1e8d1a4ffc2c..4b582e1b88db 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; +import io.confluent.ksql.rest.server.resources.ActiveStandbyResource; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.resources.RootDocument; import io.confluent.ksql.rest.server.resources.StatusResource; @@ -128,6 +129,8 @@ public class KsqlRestApplicationTest { private Consumer<KsqlConfig> rocksDBConfigSetterHandler; @Mock private HeartbeatAgent heartbeatAgent; + @Mock + private ActiveStandbyResource activeStandbyResource; @Mock private SchemaRegistryClient schemaRegistryClient; @@ -221,7 +224,7 @@ public void shouldRegisterAuthorizationFilterWithAuthorizationProvider() { @Test public void shouldCreateLogStreamThroughKsqlResource() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: verify(ksqlResource).handleKsqlStatements( @@ -239,7 +242,7 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() { .thenReturn(false); // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: verify(ksqlResource, never()).handleKsqlStatements( @@ -251,7 +254,7 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() { @Test public void shouldStartCommandStoreAndCommandRunnerBeforeCreatingLogStream() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(commandQueue, commandRunner, ksqlResource); @@ -269,7 +272,7 @@ public void shouldStartCommandStoreAndCommandRunnerBeforeCreatingLogStream() { @Test public void shouldCreateLogTopicBeforeSendingCreateStreamRequest() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(topicClient, ksqlResource); @@ -285,7 +288,7 @@ public void shouldCreateLogTopicBeforeSendingCreateStreamRequest() { @Test public void shouldInitializeCommandStoreCorrectly() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(topicClient, commandQueue, commandRunner); @@ -297,7 +300,7 @@ public void shouldInitializeCommandStoreCorrectly() { @Test public void shouldReplayCommandsBeforeSettingReady() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(commandRunner, serverState); @@ -308,7 +311,7 @@ public void shouldReplayCommandsBeforeSettingReady() { @Test public void shouldSendCreateStreamRequestBeforeSettingReady() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(ksqlResource, serverState); @@ -330,7 +333,7 @@ public void shouldCheckPreconditionsBeforeUsingServiceContext() { }); // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(precondition1, precondition2, serviceContext); @@ -352,7 +355,7 @@ public void shouldNotInitializeUntilPreconditionsChecked() { }); // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: final InOrder inOrder = Mockito.inOrder(precondition1, precondition2, serverState); @@ -369,7 +372,7 @@ public void shouldNotInitializeUntilPreconditionsChecked() { @Test public void shouldConfigureRocksDBConfigSetter() { // When: - app.startKsql(ksqlConfig); + app.startKsql(); // Then: verify(rocksDBConfigSetterHandler).accept(ksqlConfig); @@ -434,7 +437,8 @@ private void givenAppWithRestConfig(final Map<String, Object> restConfigMap) { ImmutableList.of(precondition1, precondition2), ImmutableList.of(ksqlResource, streamedQueryResource), rocksDBConfigSetterHandler, - Optional.of(heartbeatAgent) + Optional.of(heartbeatAgent), + activeStandbyResource ); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index a27c3aa7dcf6..2ea7fdc97d64 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -400,6 +400,7 @@ private static KsqlRestConfig buildConfig( config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get()); config.putIfAbsent(KsqlRestConfig.LISTENERS_CONFIG, "http://localhost:0,https://localhost:0"); + //config.putIfAbsent(KsqlRestConfig.ADVERTISED_LISTENER_CONFIG, "http://localhost:1234"); return new KsqlRestConfig(config); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index 9a06d9af0efe..c20f55fde6bb 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -74,7 +74,6 @@ public void shouldThrowExceptionIfConfigDisabled() { engine.getEngine(), engine.getServiceContext(), false, - false, Optional.empty() ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java new file mode 100644 index 000000000000..b054ae82740e --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ActiveStandbyResourceTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; +import io.confluent.ksql.rest.server.ServerUtil; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.QueryMetadata; +import java.util.List; +import javax.ws.rs.core.Response; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ActiveStandbyResourceTest { + + @Mock + private KsqlEngine ksqlEngine; + @Mock + private PersistentQueryMetadata query; + @Mock + private StreamsMetadata streamsMetadata; + + private static final String APPLICATION_SERVER_ID = "http://localhost:8088"; + private HostInfo hostInfo; + private List<StreamsMetadata> allMetadata; + private ActiveStandbyResource activeStandbyResource; + + @Before + public void setUp() { + hostInfo = ServerUtil.parseHostInfo(APPLICATION_SERVER_ID); + activeStandbyResource = new ActiveStandbyResource(ksqlEngine); + activeStandbyResource.setLocalHostInfo(APPLICATION_SERVER_ID); + allMetadata = ImmutableList.of(streamsMetadata); + } + + @Test + public void shouldReturnResponse() { + // When: + final Response response = activeStandbyResource.getActiveStandbyInformation(); + + // Then: + assertThat(response.getStatus(), is(200)); + assertThat(response.getEntity(), instanceOf(ActiveStandbyResponse.class)); + } + + @Test + public void shouldReturnActiveAndStandByStores() { + // Given: + when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(query)); + when(query.getAllMetadata()).thenReturn(allMetadata); + when(((QueryMetadata)query).getQueryApplicationId()).thenReturn("test"); + when(streamsMetadata.hostInfo()).thenReturn(hostInfo); + when(streamsMetadata.stateStoreNames()).thenReturn(ImmutableSet.of("activeStore1", "activeStore2")); + when(streamsMetadata.standbyStateStoreNames()).thenReturn(ImmutableSet.of("standByStore1", "standByStore2")); + + // When: + final ActiveStandbyResponse response = activeStandbyResource.getResponse(); + + // Then: + assertThat(response.getPerQueryInfo().get("test").getActiveStores().containsAll( + ImmutableSet.of("activeStore1", "activeStore2")), is(true)); + assertThat(response.getPerQueryInfo().get("test").getStandByStores().containsAll( + ImmutableSet.of("standByStore1", "standByStore2")), is(true)); + } + +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ClusterStatusResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ClusterStatusResourceTest.java index 7c4c655d51b2..76e87f1f57e3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ClusterStatusResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/ClusterStatusResourceTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.server.HeartbeatAgent; import javax.ws.rs.core.Response; @@ -33,12 +34,14 @@ public class ClusterStatusResourceTest { @Mock private HeartbeatAgent heartbeatAgent; + @Mock + private KsqlEngine ksqlEngine; private ClusterStatusResource clusterStatusResource; @Before public void setUp() { - clusterStatusResource = new ClusterStatusResource(heartbeatAgent); + clusterStatusResource = new ClusterStatusResource(ksqlEngine, heartbeatAgent); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java index 7001d65f0ca4..322bc501fae6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisherTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.resources.streaming; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; @@ -76,10 +77,11 @@ public class PullQueryPublisherTest { @Before public void setUp() { - publisher = new PullQueryPublisher(engine, serviceContext, statement, false, false, - Optional.empty(),pullQueryExecutor); + publisher = new PullQueryPublisher(engine, serviceContext, statement, false, + Optional.empty(), pullQueryExecutor); - when(pullQueryExecutor.execute(any(), any(), any(), any(), any(), any())).thenReturn(entity); + when(pullQueryExecutor.execute(any(), any(), any(), eq(false), eq(Optional.empty()))) + .thenReturn(entity); when(entity.getSchema()).thenReturn(SCHEMA); @@ -104,8 +106,7 @@ public void shouldRunQueryWithCorrectParams() { subscription.request(1); // Then: - verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, false, - Optional.empty()); + verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, Optional.empty()); } @Test @@ -118,8 +119,7 @@ public void shouldOnlyExecuteOnce() { // Then: verify(subscriber).onNext(any()); - verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, false, - Optional.empty()); + verify(pullQueryExecutor).execute(statement, engine, serviceContext, false, Optional.empty()); } @Test @@ -155,7 +155,7 @@ public void shouldCallOnErrorOnFailure() { givenSubscribed(); final Throwable e = new RuntimeException("Boom!"); - when(pullQueryExecutor.execute(any(), any(), any(), any(), any(), any())).thenThrow(e); + when(pullQueryExecutor.execute(any(), any(), any(), eq(false), eq(Optional.empty()))).thenThrow(e); // When: subscription.request(1); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index 2f0883196776..c2ff2a53fc11 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -210,7 +210,6 @@ public void setUp() { serverState, schemaRegistryClientSupplier, false, - false, Optional.empty() ); } @@ -445,7 +444,6 @@ public void shouldHandlePullQuery() { eq(configuredStatement), any(), eq(false), - eq(false), eq(Optional.empty())); } diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java index 1b9d2bba6d06..ab22abfaa438 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java +++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import io.confluent.ksql.properties.LocalProperties; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatuses; @@ -101,6 +102,10 @@ public RestResponse<ClusterStatusResponse> makeClusterStatusRequest() { return target().getClusterStatus(); } + public RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest() { + return target().getActiveStandByInformation(); + } + public RestResponse<KsqlEntityList> makeKsqlRequest(final String ksql) { return target().postKsqlRequest(ksql, Optional.empty()); } diff --git a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java index d3411a323f21..a911d18d9b53 100644 --- a/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java +++ b/ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java @@ -18,6 +18,7 @@ import static java.util.Objects.requireNonNull; import io.confluent.ksql.properties.LocalProperties; +import io.confluent.ksql.rest.entity.ActiveStandbyResponse; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatuses; @@ -53,6 +54,7 @@ public final class KsqlTarget { private static final String QUERY_PATH = "/query"; private static final String HEARTBEAT_PATH = "/heartbeat"; private static final String CLUSTERSTATUS_PATH = "/clusterStatus"; + private static final String ACTIVESTANDBY_PATH = "/activeStandby"; private final WebTarget target; private final LocalProperties localProperties; @@ -95,6 +97,10 @@ public RestResponse<ClusterStatusResponse> getClusterStatus() { return get(CLUSTERSTATUS_PATH, ClusterStatusResponse.class); } + public RestResponse<ActiveStandbyResponse> getActiveStandByInformation() { + return get(ACTIVESTANDBY_PATH, ActiveStandbyResponse.class); + } + public RestResponse<CommandStatuses> getStatuses() { return get(STATUS_PATH, CommandStatuses.class); } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java new file mode 100644 index 000000000000..9f3695c35c05 --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyEntity.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; +import java.util.Objects; +import java.util.Set; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ActiveStandbyEntity { + + private final Set<String> activeStores; + private final Set<String> activePartitions; + private final Set<String> standByStores; + private final Set<String> standByPartitions; + + @JsonCreator + public ActiveStandbyEntity( + @JsonProperty("activeStores") final Set<String> activeStores, + @JsonProperty("activePartitions") final Set<String> activePartitions, + @JsonProperty("standByStores") final Set<String> standByStores, + @JsonProperty("standByPartitions") final Set<String> standByPartitions + ) { + this.activeStores = ImmutableSet.copyOf(requireNonNull(activeStores)); + this.activePartitions = ImmutableSet.copyOf(requireNonNull(activePartitions)); + this.standByStores = ImmutableSet.copyOf(requireNonNull(standByStores)); + this.standByPartitions = ImmutableSet.copyOf(requireNonNull(standByPartitions)); + } + + public Set<String> getActiveStores() { + return activeStores; + } + + public Set<String> getStandByStores() { + return standByStores; + } + + public Set<String> getActivePartitions() { + return activePartitions; + } + + public Set<String> getStandByPartitions() { + return standByPartitions; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final ActiveStandbyEntity that = (ActiveStandbyEntity) o; + return activeStores.equals(that.activeStores) + && standByStores.equals(that.standByStores) + && activePartitions.equals(that.activePartitions) + && standByPartitions.equals(that.standByPartitions); + } + + @Override + public int hashCode() { + return Objects.hash(activeStores, standByStores, activePartitions, standByPartitions); + } +} diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyResponse.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyResponse.java new file mode 100644 index 000000000000..8ad1925335be --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ActiveStandbyResponse.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import static java.util.Objects.requireNonNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.Immutable; +import java.util.Map; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@Immutable +public class ActiveStandbyResponse { + + private final ImmutableMap<String, ActiveStandbyEntity> perQueryInfo; + + @JsonCreator + public ActiveStandbyResponse( + @JsonProperty("perQueryInfo") final Map<String, ActiveStandbyEntity> perQueryInfo) { + this.perQueryInfo = ImmutableMap.copyOf(requireNonNull(perQueryInfo, "perQueryInfo")); + } + + public Map<String, ActiveStandbyEntity> getPerQueryInfo() { + return perQueryInfo; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final ActiveStandbyResponse that = (ActiveStandbyResponse) o; + return Objects.equals(perQueryInfo, that.perQueryInfo); + } + + @Override + public int hashCode() { + return Objects.hash(perQueryInfo); + } + +} diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java index 9cc7dc8ea20f..cf500b54489f 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/ClusterStatusResponse.java @@ -29,15 +29,15 @@ @Immutable public final class ClusterStatusResponse { - private final ImmutableMap<String, HostStatusEntity> clusterStatus; + private final ImmutableMap<HostInfoEntity, HostStatusEntity> clusterStatus; @JsonCreator public ClusterStatusResponse( - @JsonProperty("clusterStatus") final Map<String, HostStatusEntity> clusterStatus) { + @JsonProperty("clusterStatus") final Map<HostInfoEntity, HostStatusEntity> clusterStatus) { this.clusterStatus = ImmutableMap.copyOf(requireNonNull(clusterStatus, "status")); } - public Map<String, HostStatusEntity> getClusterStatus() { + public Map<HostInfoEntity, HostStatusEntity> getClusterStatus() { return clusterStatus; } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostInfoEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostInfoEntity.java index e0e95f7f99fb..6ed227de084a 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostInfoEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostInfoEntity.java @@ -17,7 +17,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.base.Preconditions; import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) @@ -26,15 +27,22 @@ public class HostInfoEntity { private final String host; private final int port; - @JsonCreator public HostInfoEntity( - @JsonProperty("host") final String host, - @JsonProperty("port") final int port + final String host, + final int port ) { this.host = Objects.requireNonNull(host, "host"); this.port = Objects.requireNonNull(port, "port"); } + @JsonCreator + public HostInfoEntity(final String serializedPair) { + final String [] parts = serializedPair.split(":"); + Preconditions.checkArgument(parts.length == 2); + this.host = Objects.requireNonNull(parts[0], "host"); + this.port = Integer.parseInt(parts[1]); + } + public String getHost() { return host; } @@ -63,8 +71,9 @@ public int hashCode() { return Objects.hash(host, port); } + @JsonValue @Override public String toString() { - return host + "," + port; + return host + ":" + port; } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java index 8565177c01bc..8f36055e9a9b 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java @@ -18,28 +18,26 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; import java.util.Objects; @JsonIgnoreProperties(ignoreUnknown = true) public class HostStatusEntity { - private HostInfoEntity hostInfoEntity; private boolean hostAlive; private long lastStatusUpdateMs; + private Map<String, ActiveStandbyEntity> perQueryActiveStandbyEntity; @JsonCreator public HostStatusEntity( - @JsonProperty("hostInfoEntity") final HostInfoEntity hostInfoEntity, @JsonProperty("hostAlive") final boolean hostAlive, - @JsonProperty("lastStatusUpdateMs") final long lastStatusUpdateMs + @JsonProperty("lastStatusUpdateMs") final long lastStatusUpdateMs, + @JsonProperty("perQueryActiveStandbyEntity") + final Map<String, ActiveStandbyEntity> perQueryActiveStandbyEntity ) { - this.hostInfoEntity = Objects.requireNonNull(hostInfoEntity, "hostInfoEntity"); this.hostAlive = hostAlive; this.lastStatusUpdateMs = lastStatusUpdateMs; - } - - public HostInfoEntity getHostInfoEntity() { - return hostInfoEntity; + this.perQueryActiveStandbyEntity = perQueryActiveStandbyEntity; } public boolean getHostAlive() { @@ -50,8 +48,8 @@ public long getLastStatusUpdateMs() { return lastStatusUpdateMs; } - public void setHostInfoEntity(final HostInfoEntity hostInfoEntity) { - this.hostInfoEntity = hostInfoEntity; + public Map<String, ActiveStandbyEntity> getPerQueryActiveStandbyEntity() { + return perQueryActiveStandbyEntity; } public void setHostAlive(final boolean hostAlive) { @@ -62,6 +60,10 @@ public void setLastStatusUpdateMs(final long lastStatusUpdateMs) { this.lastStatusUpdateMs = lastStatusUpdateMs; } + public void setPerQueryActiveStandbyEntity(final Map<String, ActiveStandbyEntity> perQueryActiveStandbyEntity) { + this.perQueryActiveStandbyEntity = perQueryActiveStandbyEntity; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -73,17 +75,19 @@ public boolean equals(final Object o) { } final HostStatusEntity that = (HostStatusEntity) o; - return Objects.equals(hostInfoEntity, that.hostInfoEntity) - && hostAlive == that.hostAlive && lastStatusUpdateMs == that.lastStatusUpdateMs; + return hostAlive == that.hostAlive + && lastStatusUpdateMs == that.lastStatusUpdateMs + && perQueryActiveStandbyEntity.equals(that.perQueryActiveStandbyEntity); + } @Override public int hashCode() { - return Objects.hash(hostInfoEntity, hostAlive, lastStatusUpdateMs); + return Objects.hash(hostAlive, lastStatusUpdateMs, perQueryActiveStandbyEntity); } @Override public String toString() { - return hostInfoEntity + "," + hostAlive + "," + lastStatusUpdateMs; + return hostAlive + "," + lastStatusUpdateMs + "," + perQueryActiveStandbyEntity; } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/IRoutingFilter.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/IRoutingFilter.java new file mode 100644 index 000000000000..41f039e9c5df --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/IRoutingFilter.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import org.apache.kafka.streams.state.HostInfo; + +public interface IRoutingFilter { + + boolean filter(HostInfo hostInfo, String storeName, int partition); + +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java index 3942b0a1dbec..bc47136ecc09 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/Locator.java @@ -15,12 +15,11 @@ package io.confluent.ksql.execution.streams.materialization; +import io.confluent.ksql.execution.streams.IRoutingFilter; import java.net.URI; import java.util.List; -import java.util.Map; import java.util.Optional; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.streams.state.HostInfo; /** * Type used to locate on which KSQL node materialized data is stored. @@ -39,8 +38,9 @@ public interface Locator { * @param key the required key. * @return the list of nodes, that can potentially serve the key. */ - List<KsqlNode> locate(Struct key, Optional<Map<String, HostInfo>> hostStatuses); - + List<KsqlNode> locate( + Struct key, + List<IRoutingFilter> routingFilters); interface KsqlNode { @@ -53,5 +53,18 @@ interface KsqlNode { * @return The base URI of the node, including protocol, host and port. */ URI location(); + + /** + * If heartbeat is not enabled, the value is ignored. + * @return {@code true} if this is node is alive as determined by the heartbeat mechanism. + * + */ + boolean isAlive(); + + /** + * Specify liveness status of node as determined by heartbeat mechanism. + * @param alive Whether the node is alive or not + */ + void setIsAlive(boolean alive); } } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java new file mode 100644 index 000000000000..0d836a786a60 --- /dev/null +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/ActiveAndStandByNodes.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams.materialization.ks; + +import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +public class ActiveAndStandByNodes { + + private final KsqlNode active; + private final Set<KsqlNode> standBys; + + public ActiveAndStandByNodes(final KsqlNode active) { + this.active = Objects.requireNonNull(active, "active"); + this.standBys = new HashSet<>(); + } + + public KsqlNode getActive() { + return active; + } + + public Set<KsqlNode> getStandBys() { + return standBys; + } + + public void addStandBy(final KsqlNode standBy) { + this.standBys.add(standBy); + } + + @Override + public String toString() { + return new StringBuilder() + .append(String.format("active = %s ,", active)) + .append(String.format("stanbys = %s .", standBys)) + .toString(); + } +} diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index b6e8eb21e96e..4b8f0255bcf6 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -17,16 +17,16 @@ import static java.util.Objects.requireNonNull; -import com.google.common.collect.Lists; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.execution.streams.materialization.Locator; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serializer; @@ -34,12 +34,15 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.state.HostInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Kafka Streams implementation of {@link Locator}. */ final class KsLocator implements Locator { + private static final Logger LOG = LoggerFactory.getLogger(KsLocator.class); private final String stateStoreName; private final KafkaStreams kafkaStreams; private final Serializer<Struct> keySerializer; @@ -58,37 +61,48 @@ final class KsLocator implements Locator { } @Override - public List<KsqlNode> locate(final Struct key, Optional<Map<String, HostInfo>> hostStatuses) { + public List<KsqlNode> locate( + final Struct key, + final List<IRoutingFilter> routingFilters + ) { final KeyQueryMetadata metadata = kafkaStreams .queryMetadataForKey(stateStoreName, key, keySerializer); + // FAil fast if Streams not ready. Let client handle it if (metadata == KeyQueryMetadata.NOT_AVAILABLE) { + LOG.debug("Streams Metadata not available"); return Collections.emptyList(); } - // TODO Do we need a timeout here when accessing metadata? - ActiveAndStandByNodes activeAndStandByNodes = new ActiveAndStandByNodes(); - activeAndStandByNodes.setActive(asNode(metadata.getActiveHost())); - metadata.getStandbyHosts() - .stream() - .map(this::asNode) - .forEach(activeAndStandByNodes::addStandBy); - - - - final List<HostInfo> servingHosts = Lists.newArrayList(); - servingHosts.add(metadata.getActiveHost()); - servingHosts.addAll(metadata.getStandbyHosts()); - - // filter out nodes that are dead - if (hostStatuses.isPresent()) { - // TODO fix after heartbeat merge - servingHosts.stream().filter(hostInfo -> hostStatuses.get().containsKey(hostInfo)); - } - return servingHosts.stream().map(this::asNode).collect(Collectors.toList()); + final HostInfo activeHost = metadata.getActiveHost(); + final Set<HostInfo> standByHosts = metadata.getStandbyHosts(); + + LOG.debug("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts); + + List<HostInfo> hosts = new ArrayList<>(); + hosts.add(activeHost); + hosts.addAll(standByHosts); + + // Filter out hosts based on liveness and lag filters. + // The list is ordered by routing preference: active node is first, then standby nodes + // in order of increasing lag. + // If heartbeat is not enabled, all hosts are considered alive. + List<KsqlNode> filteredHosts = new ArrayList<>(); + routingFilters + .forEach(routingFilter -> hosts + .stream() + .filter(hostInfo -> routingFilter + .filter(hostInfo, stateStoreName, metadata.getPartition())) + .map(this::asNode) + .collect(Collectors.toList()) + .addAll(filteredHosts)); + + LOG.debug("Filtered and ordered hosts: {}", filteredHosts); + return filteredHosts; } - private KsqlNode asNode(final HostInfo hostInfo) { + @VisibleForTesting + KsqlNode asNode(final HostInfo hostInfo) { return new Node( isLocalHost(hostInfo), buildLocation(hostInfo) @@ -118,31 +132,6 @@ private URI buildLocation(final HostInfo remoteInfo) { } } - class ActiveAndStandByNodes { - private KsqlNode active; - private final Set<KsqlNode> standBys; - - private ActiveAndStandByNodes() { - standBys = new HashSet<>(); - } - - public KsqlNode getActive() { - return active; - } - - public Set<KsqlNode> getStandBys() { - return standBys; - } - - public void setActive(KsqlNode active) { - this.active = active; - } - - public void addStandBy(KsqlNode standBy) { - this.standBys.add(standBy); - } - } - @Immutable private static final class Node implements KsqlNode { @@ -170,8 +159,40 @@ public boolean isAlive() { return isAlive; } - public void setIsAlive(boolean alive) { + public void setIsAlive(final boolean alive) { isAlive = alive; } + + @Override + public String toString() { + return new StringBuilder() + .append(String.format("local = %s ,", local)) + .append(String.format("location = %s ,", location)) + .append(String.format("alive = %s .", isAlive)) + .toString(); + + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final Node that = (Node) o; + return local == that.local + && location.equals(that.location) + && isAlive == that.isAlive; + } + + @Override + public int hashCode() { + return Objects.hash(local, location, isAlive); + } + } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java index b62879f288b4..b813c9a03272 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocatorTest.java @@ -16,9 +16,9 @@ package io.confluent.ksql.execution.streams.materialization.ks; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -26,10 +26,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.testing.NullPointerTester; import com.google.common.testing.NullPointerTester.Visibility; +import io.confluent.ksql.execution.streams.IRoutingFilter; import io.confluent.ksql.execution.streams.materialization.Locator.KsqlNode; +import io.confluent.ksql.rest.entity.HostStatusEntity; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -64,8 +67,15 @@ public class KsLocatorTest { private HostInfo standByHostInfo1; @Mock private HostInfo standByHostInfo2; + @Mock + private IRoutingFilter livenessFilter; private KsLocator locator; + private KsqlNode activeNode; + private KsqlNode standByNode1; + private KsqlNode standByNode2; + private Optional<Map<HostInfo, HostStatusEntity>> hostsStatus; + private List<IRoutingFilter> routingFilters; @Before public void setUp() { @@ -79,6 +89,30 @@ public void setUp() { when(standByHostInfo2.host()).thenReturn("standBy2"); when(standByHostInfo2.port()).thenReturn(5678); + + activeNode = locator.asNode(activeHostInfo); + activeNode.setIsAlive(true); + standByNode1 = locator.asNode(standByHostInfo1); + standByNode1.setIsAlive(true); + standByNode2 = locator.asNode(standByHostInfo2); + standByNode2.setIsAlive(true); + + hostsStatus = Optional.of(ImmutableMap.of( + activeHostInfo, new HostStatusEntity( + true, + 0L, + Collections.emptyMap()), + standByHostInfo1, new HostStatusEntity( + true, + 0L, + Collections.emptyMap()), + standByHostInfo2, new HostStatusEntity( + true, + 0L, + Collections.emptyMap()) + )); + + routingFilters.add(livenessFilter); } @Test @@ -97,7 +131,7 @@ public void shouldRequestMetadata() { getEmtpyMetadata(); // When: - locator.locate(SOME_KEY, Optional.empty()); + locator.locate(SOME_KEY, routingFilters); // Then: verify(kafkaStreams).queryMetadataForKey(STORE_NAME, SOME_KEY, keySerializer); @@ -109,7 +143,7 @@ public void shouldReturnEmptyIfOwnerNotKnown() { getEmtpyMetadata(); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.isEmpty(), is(true)); @@ -119,9 +153,10 @@ public void shouldReturnEmptyIfOwnerNotKnown() { public void shouldReturnOwnerIfKnown() { // Given: getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: final Optional<URI> url = result.stream().findFirst().map(KsqlNode::location); @@ -137,9 +172,10 @@ public void shouldReturnLocalOwnerIfSameAsSuppliedLocalHost() { when(activeHostInfo.host()).thenReturn(LOCAL_HOST_URL.getHost()); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(true))); @@ -151,9 +187,10 @@ public void shouldReturnLocalOwnerIfExplicitlyLocalHostOnSamePortAsSuppliedLocal when(activeHostInfo.host()).thenReturn("LocalHOST"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(true))); @@ -165,9 +202,10 @@ public void shouldReturnRemoteOwnerForDifferentHost() { when(activeHostInfo.host()).thenReturn("different"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort()); getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(false))); @@ -179,9 +217,10 @@ public void shouldReturnRemoteOwnerForDifferentPort() { when(activeHostInfo.host()).thenReturn(LOCAL_HOST_URL.getHost()); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort() + 1); getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(false))); @@ -193,39 +232,65 @@ public void shouldReturnRemoteOwnerForDifferentPortOnLocalHost() { when(activeHostInfo.host()).thenReturn("LOCALhost"); when(activeHostInfo.port()).thenReturn(LOCAL_HOST_URL.getPort() + 1); getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(false))); } + @Test + public void shouldReturnActiveAndStandBysWhenHeartBeatNotEnabled() { + // Given: + getActiveAndStandbyMetadata(); + standByNode1.setIsAlive(false); + standByNode2.setIsAlive(false); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(true); + when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(true); + when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); + + // When: + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); + + // Then: + assertThat(result.size(), is(3)); + assertThat(result.stream().findFirst(), is(activeNode)); + assertThat(result, containsInAnyOrder(standByNode1, standByNode2)); + } + @Test public void shouldReturnStandBysWhenActiveDown() { // Given: getActiveAndStandbyMetadata(); - Map<String, HostInfo> hostStatus = ImmutableMap.of() + hostsStatus.get().get(activeHostInfo.toString()).setHostAlive(false); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(false); + when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(true); + when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); // When: - final List<KsqlNode> result = locator.locate(SOME_KEY, Optional.empty()); + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); // Then: - assertThat(result.stream().findFirst().map(KsqlNode::isLocal), is(Optional.of(false))); + assertThat(result.size(), is(2)); + assertThat(result, containsInAnyOrder(standByNode1, standByNode2)); } - @SuppressWarnings({"unchecked", "deprecation"}) - private void givenOwnerMetadata(final Optional<HostInfo> hostInfo) { - final KeyQueryMetadata metadata = hostInfo - .map(hi -> { - final KeyQueryMetadata md = mock(KeyQueryMetadata.class); - when(md.getActiveHost()).thenReturn(hostInfo.get()); - return md; - }) - .orElse(KeyQueryMetadata.NOT_AVAILABLE); + @Test + public void shouldReturnOneStandByWhenActiveAndOtherStandByDown() { + // Given: + getActiveAndStandbyMetadata(); + when(livenessFilter.filter(activeHostInfo, any(), any())).thenReturn(false); + when(livenessFilter.filter(standByHostInfo1, any(), any())).thenReturn(false); + when(livenessFilter.filter(standByHostInfo2, any(), any())).thenReturn(true); - when(kafkaStreams.queryMetadataForKey(any(), any(), any(Serializer.class))) - .thenReturn(metadata); + // When: + final List<KsqlNode> result = locator.locate(SOME_KEY, routingFilters); + + // Then: + assertThat(result.size(), is(1)); + assertThat(result.stream().findFirst(), is(standByNode2)); } @SuppressWarnings("unchecked")