Skip to content

Commit

Permalink
Added functional test for routing and endpoint for active standby info
Browse files Browse the repository at this point in the history
undo changes to log files

fixing tests

fixed tests

addressed vinoth's comments
  • Loading branch information
vpapavas committed Jan 24, 2020
1 parent 4722161 commit 80f4945
Show file tree
Hide file tree
Showing 39 changed files with 1,700 additions and 530 deletions.
19 changes: 19 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,25 @@ public class KsqlConfig extends AbstractConfig {
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;

// TODO shall we remove this config?
public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.routing.timeout.ms";
public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L;
public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC = "Timeout in milliseconds "
+ "when waiting for the lookup of the owner of a row key";


public static final String KSQL_QUERY_PULL_ALLOW_STALE_READS =
"ksql.query.pull.allow.stale.reads";
private static final String KSQL_QUERY_PULL_ALLOW_STALE_READS_DOC =
"Config to enable/disable forwarding pull queries to standby hosts when the active is dead. "
+ "Effectively, the accuracy of pull queries is sacrificed for higher availability. "
+ "Possible values are \"true\", \"false\". Setting to \"true\" guarantees high "
+ "availability for pull queries. If set to \"false\", pull queries will fail when"
+ "the active is dead and until a new active is elected. Default value is \"false\". ";
public static final boolean KSQL_QUERY_PULL_ALLOW_STALE_READS_DEFAULT = false;


public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.streamsstore.rebalancing.timeout.ms";
public static final Long KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT = 10000L;
Expand Down Expand Up @@ -490,6 +503,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC
).define(
KSQL_QUERY_PULL_ALLOW_STALE_READS,
Type.BOOLEAN,
KSQL_QUERY_PULL_ALLOW_STALE_READS_DEFAULT,
Importance.MEDIUM,
KSQL_QUERY_PULL_ALLOW_STALE_READS_DOC
).define(
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ActiveStandbyResponse;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
Expand Down Expand Up @@ -61,5 +62,10 @@ public void makeAsyncHeartbeatRequest(
public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(final URI serverEndPoint) {
throw new UnsupportedOperationException("KSQL client is disabled");
}

@Override
public RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(final URI serverEndPoint) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ActiveStandbyResponse;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
Expand Down Expand Up @@ -55,4 +56,12 @@ void makeAsyncHeartbeatRequest(
* @return response containing the cluster status.
*/
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);

/**
* Send a request to remote Ksql server to inquire about the state stores it is active and
* standby.
* @param serverEndPoint the remote destination.
* @return response containing the state stores for which the remote host is active and standby.
*/
RestResponse<ActiveStandbyResponse> makeActiveStandbyRequest(URI serverEndPoint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@
package io.confluent.ksql.rest.server;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.ServiceManager;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.HostInfoEntity;
import io.confluent.ksql.rest.entity.HostStatusEntity;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.net.URI;
import java.net.URL;
import java.time.Clock;
Expand All @@ -47,7 +41,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,13 +74,12 @@ public final class HeartbeatAgent {
private final KsqlEngine engine;
private final ServiceContext serviceContext;
private final HeartbeatConfig config;
private final ConcurrentHashMap<String, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<String, HostStatusEntity> hostsStatus;
private final ConcurrentHashMap<HostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<HostInfo, HostStatus> hostsStatus;
private final ScheduledExecutorService scheduledExecutorService;
private final ServiceManager serviceManager;
private final Clock clock;
private HostInfo localHostInfo;
private String localHostString;
private URL localURL;

public static HeartbeatAgent.Builder builder() {
Expand Down Expand Up @@ -115,11 +107,10 @@ private HeartbeatAgent(final KsqlEngine engine,
* @param timestamp The timestamp the heartbeat was sent.
*/
public void receiveHeartbeat(final HostInfo hostInfo, final long timestamp) {
final String hostKey = hostInfo.toString();
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.computeIfAbsent(
hostKey, key -> new TreeMap<>());
hostInfo, key -> new TreeMap<>());
synchronized (heartbeats) {
LOG.debug("Receive heartbeat at: {} from host: {} ", timestamp, hostKey);
LOG.debug("Receive heartbeat at: {} from host: {} ", timestamp, hostInfo);
heartbeats.put(timestamp, new HeartbeatInfo(timestamp));
}
}
Expand All @@ -128,12 +119,12 @@ public void receiveHeartbeat(final HostInfo hostInfo, final long timestamp) {
* Returns the current view of the cluster containing all hosts discovered (whether alive or dead)
* @return status of discovered hosts
*/
public Map<String, HostStatusEntity> getHostsStatus() {
public Map<HostInfo, HostStatus> getHostsStatus() {
return Collections.unmodifiableMap(hostsStatus);
}

@VisibleForTesting
void setHostsStatus(final Map<String, HostStatusEntity> status) {
void setHostsStatus(final Map<HostInfo, HostStatus> status) {
hostsStatus.putAll(status);
}

Expand All @@ -157,37 +148,17 @@ void stopAgent() {

void setLocalAddress(final String applicationServer) {

this.localHostInfo = parseHostInfo(applicationServer);
this.localHostString = localHostInfo.toString();
this.localHostInfo = ServerUtil.parseHostInfo(applicationServer);
try {
this.localURL = new URL(applicationServer);
} catch (final Exception e) {
throw new IllegalStateException("Failed to convert remote host info to URL."
+ " remoteInfo: " + localHostInfo.host() + ":"
+ localHostInfo.host());
}
this.hostsStatus.putIfAbsent(localHostString, new HostStatusEntity(
new HostInfoEntity(localHostInfo.host(), localHostInfo.port()),
true,
clock.millis()));
this.hostsStatus.putIfAbsent(localHostInfo, new HostStatus(true, clock.millis()));
}

private static HostInfo parseHostInfo(final String endPoint) {
if (endPoint == null || endPoint.trim().isEmpty()) {
return StreamsMetadataState.UNKNOWN_HOST;
}
final String host = getHost(endPoint);
final Integer port = getPort(endPoint);

if (host == null || port == null) {
throw new KsqlException(String.format(
"Error parsing host address %s. Expected format host:port.", endPoint));
}

return new HostInfo(host, port);
}


/**
* Check the heartbeats received from remote hosts and apply policy to determine whether a host
* is alive or not.
Expand Down Expand Up @@ -236,39 +207,41 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
// No heartbeats received -> mark all hosts as dead
if (receivedHeartbeats.isEmpty()) {
hostsStatus.forEach((host, status) -> {
if (!host.equals(localHostString)) {
if (!host.equals(localHostInfo)) {
status.setHostAlive(false);
}
});
}

for (String host: hostsStatus.keySet()) {
if (host.equals(localHostString)) {
for (HostInfo hostInfo: hostsStatus.keySet()) {
if (hostInfo.equals(localHostInfo)) {
continue;
}
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(host);
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(hostInfo);
//For previously discovered hosts, if they have not received any heartbeats, mark them dead
if (heartbeats == null || heartbeats.isEmpty()) {
hostsStatus.get(host).setHostAlive(false);
hostsStatus.get(hostInfo).setHostAlive(false);
} else {
final TreeMap<Long, HeartbeatInfo> copy;
synchronized (heartbeats) {
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, host);
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, hostInfo);
// 1. remove heartbeats older than window
heartbeats.headMap(windowStart).clear();
copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true));
}
// 2. count consecutive missed heartbeats and mark as alive or dead
final boolean isAlive = decideStatus(host, windowStart, windowEnd, copy);
final HostStatusEntity status = hostsStatus.get(host);
final boolean isAlive = decideStatus(hostInfo, windowStart, windowEnd, copy);
final HostStatus status = hostsStatus.get(hostInfo);
status.setHostAlive(isAlive);
status.setLastStatusUpdateMs(windowEnd);
}
}
}

private boolean decideStatus(final String host, final long windowStart, final long windowEnd,
final TreeMap<Long, HeartbeatInfo> heartbeats) {
private boolean decideStatus(
final HostInfo hostInfo, final long windowStart, final long windowEnd,
final TreeMap<Long, HeartbeatInfo> heartbeats
) {
long missedCount = 0;
long prev = windowStart;
// No heartbeat received in this window
Expand Down Expand Up @@ -297,7 +270,7 @@ private boolean decideStatus(final String host, final long windowStart, final lo
missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs;
}

LOG.debug("Host: {} has {} missing heartbeats", host, missedCount);
LOG.debug("Host: {} has {} missing heartbeats", hostInfo, missedCount);
return (missedCount < config.heartbeatMissedThreshold);
}
}
Expand All @@ -314,22 +287,20 @@ class SendHeartbeatService extends AbstractScheduledService {

@Override
protected void runOneIteration() {
for (Entry<String, HostStatusEntity> hostStatusEntry: hostsStatus.entrySet()) {
final String host = hostStatusEntry.getKey();
final HostStatusEntity status = hostStatusEntry.getValue();
for (Entry<HostInfo, HostStatus> hostStatusEntry: hostsStatus.entrySet()) {
final HostInfo remoteHostInfo = hostStatusEntry.getKey();
final HostStatus status = hostStatusEntry.getValue();
try {
if (!host.equals(localHostString)) {
final URI remoteUri = buildLocation(localURL, status.getHostInfoEntity().getHost(),
status.getHostInfoEntity().getPort());
LOG.debug("Send heartbeat to host {} at {}", status.getHostInfoEntity().getHost(),
clock.millis());
if (!remoteHostInfo.equals(localHostInfo)) {
final URI remoteUri = buildLocation(
localURL, remoteHostInfo.host(), remoteHostInfo.port());
LOG.debug("Send heartbeat to host {} at {}", remoteHostInfo, clock.millis());
serviceContext.getKsqlClient().makeAsyncHeartbeatRequest(remoteUri, localHostInfo,
clock.millis());
}
} catch (Throwable t) {
LOG.error("Request to server: " + status.getHostInfoEntity().getHost() + ":"
+ status.getHostInfoEntity().getPort()
+ " failed with exception: " + t.getMessage(), t);
LOG.error("Request to server: " + remoteHostInfo + " failed with exception: "
+ t.getMessage(), t);
}
}
}
Expand Down Expand Up @@ -371,9 +342,10 @@ protected void runOneIteration() {
}

final Set<HostInfo> uniqueHosts = currentQueries.stream()
.map(queryMetadata -> ((QueryMetadata) queryMetadata).getAllMetadata())
.map(queryMetadata -> queryMetadata.getAllMetadata())
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.filter(streamsMetadata -> streamsMetadata != StreamsMetadata.NOT_AVAILABLE)
.map(StreamsMetadata::hostInfo)
.filter(hostInfo -> !(hostInfo.host().equals(localHostInfo.host())
&& hostInfo.port() == (localHostInfo.port())))
Expand All @@ -383,10 +355,7 @@ protected void runOneIteration() {
// Only add to map if it is the first time it is discovered. Design decision to
// optimistically consider every newly discovered server as alive to avoid situations of
// unavailability until the heartbeating kicks in.
hostsStatus.computeIfAbsent(hostInfo.toString(), key -> new HostStatusEntity(
new HostInfoEntity(hostInfo.host(), hostInfo.port()),
true,
clock.millis()));
hostsStatus.computeIfAbsent(hostInfo, key -> new HostStatus(true, clock.millis()));
}
} catch (Throwable t) {
LOG.error("Failed to discover cluster with exception " + t.getMessage(), t);
Expand Down Expand Up @@ -478,6 +447,35 @@ static class HeartbeatConfig {
}
}

public static class HostStatus {
private boolean hostAlive;
private long lastStatusUpdateMs;

public HostStatus(
final boolean hostAlive,
final long lastStatusUpdateMs
) {
this.hostAlive = hostAlive;
this.lastStatusUpdateMs = lastStatusUpdateMs;
}

public void setHostAlive(final boolean hostAlive) {
this.hostAlive = hostAlive;
}

public void setLastStatusUpdateMs(final long lastStatusUpdateMs) {
this.lastStatusUpdateMs = lastStatusUpdateMs;
}

public long getLastStatusUpdateMs() {
return lastStatusUpdateMs;
}

public boolean isHostAlive() {
return hostAlive;
}
}

public static class HeartbeatInfo {
private final long timestamp;

Expand Down
Loading

0 comments on commit 80f4945

Please sign in to comment.