Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Feb 6, 2020
1 parent 2fcc27b commit 0f1560b
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ public class KsqlConfig extends AbstractConfig {
"ksql.query.pull.stale.reads.lag.max.offsets";
public static final Long KSQL_QUERY_PULL_STALE_READS_LAG_MAX_OFFSETS_DEFAULT = 0L;
private static final String KSQL_QUERY_PULL_STALE_READS_LAG_MAX_OFFSETS_DOC =
"What the maximum offset lag allowed is. Only enabled when lag reporting is enabled. "
+ "By default, no lag is is allowed.";
"Controls the maximum lag tolerated by a pull query against a table. This is applied to all "
+ "hosts storing it, both active and standbys included. Only enabled when "
+ "lag.reporting.enable is true. By default, no lag is is allowed.";

public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.streamsstore.rebalancing.timeout.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
*/
public class ActiveHostFilter implements RoutingFilter {

public ActiveHostFilter() {
private final HostInfo activeHost;

public ActiveHostFilter(final HostInfo activeHost) {
this.activeHost = activeHost;
}

/**
* Returns true if the host is alive. If the heartbeat agent is not enabled, all hosts are
* assumed to be alive.
* @param activeHost the active host for a particular state store
* @param host The host for which the status is checked
* @return true if the host is alive, false otherwise.
*/
@Override
public boolean filter(final HostInfo activeHost, final KsqlHost host) {
public boolean filter(final KsqlHost host) {

return host.host().equals(activeHost.host()) && host.port() == activeHost.port();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
ErrorMessages.class
));

final RoutingFilterFactory routingFilterFactory = initializeRoutingFilters(
final RoutingFilterFactory routingFilterFactory = initializeRoutingFilterFactory(
ksqlConfigNoPort, heartbeatAgent, lagReportingAgent);
final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
ksqlEngine, heartbeatAgent, routingFilterFactory);
Expand Down Expand Up @@ -615,7 +615,7 @@ static KsqlRestApplication buildApplication(
initializeLagReportingAgent(restConfig, ksqlEngine, serviceContext);
final Optional<HeartbeatAgent> heartbeatAgent =
initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext, lagReportingAgent);
final RoutingFilterFactory routingFilterFactory = initializeRoutingFilters(ksqlConfig,
final RoutingFilterFactory routingFilterFactory = initializeRoutingFilterFactory(ksqlConfig,
heartbeatAgent, lagReportingAgent);

final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
Expand Down Expand Up @@ -745,21 +745,20 @@ private static Optional<LagReportingAgent> initializeLagReportingAgent(
return Optional.empty();
}

private static RoutingFilterFactory initializeRoutingFilters(
private static RoutingFilterFactory initializeRoutingFilterFactory(
final KsqlConfig ksqlConfig,
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent) {
return (routingOptions, hosts, applicationQueryId, storeName, partition) -> {
return (routingOptions, hosts, active, applicationQueryId, storeName, partition) -> {
final ImmutableList.Builder<RoutingFilter> filterBuilder = ImmutableList.builder();
if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS)) {
filterBuilder.add(new ActiveHostFilter());
filterBuilder.add(new ActiveHostFilter(active));
}
filterBuilder.add(new LivenessFilter(heartbeatAgent));
FreshnessFilter.create(lagReportingAgent, routingOptions, hosts, applicationQueryId,
MaxAllowedLagFilter.create(lagReportingAgent, routingOptions, hosts, applicationQueryId,
storeName, partition)
.map(filterBuilder::add);
final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build());
return routingFilters;
return new RoutingFilters(filterBuilder.build());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ public LivenessFilter(final Optional<HeartbeatAgent> heartbeatAgent) {
/**
* Returns true if the host is alive. If the heartbeat agent is not enabled, all hosts are
* assumed to be alive.
* @param activeHost the active host for a particular state store
* @param host The host for which the status is checked
* @return true if the host is alive, false otherwise.
*/
@Override
public boolean filter(final HostInfo activeHost, final KsqlHost host) {
public boolean filter(final KsqlHost host) {

if (!heartbeatAgent.isPresent()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;
import org.apache.kafka.streams.state.HostInfo;

/**
* A RoutingFilter that filters hosts based upon changelog processing lag.
*/
public class FreshnessFilter implements RoutingFilter {
public class MaxAllowedLagFilter implements RoutingFilter {

private final ImmutableMap<KsqlHost, Optional<LagInfoEntity>> lagByHost;
private final RoutingOptions routingOptions;
private final OptionalLong maxEndOffset;

private FreshnessFilter(
private MaxAllowedLagFilter(
final RoutingOptions routingOptions,
final ImmutableMap<KsqlHost, Optional<LagInfoEntity>> lagByHost,
final OptionalLong maxEndOffset
Expand All @@ -49,15 +48,15 @@ private FreshnessFilter(
}

@Override
public boolean filter(final HostInfo activeHost, final KsqlHost hostInfo) {
final long offsetLagAllowed = routingOptions.getOffsetLagAllowed();
if (offsetLagAllowed >= 0) {
public boolean filter(final KsqlHost hostInfo) {
final long allowedOffsetLag = routingOptions.getOffsetLagAllowed();
if (allowedOffsetLag >= 0) {
return lagByHost.getOrDefault(hostInfo, Optional.empty())
.map(hostLag -> {
// Compute the lag from the maximum end offset we've seen
long endOffset = maxEndOffset.orElse(hostLag.getEndOffsetPosition());
long offsetLag = Math.max(endOffset - hostLag.getCurrentOffsetPosition(), 0);
return offsetLag <= offsetLagAllowed;
return offsetLag <= allowedOffsetLag;
})
// If we don't have lag info, we'll be conservative and include the host
.orElse(true);
Expand All @@ -75,7 +74,7 @@ public boolean filter(final HostInfo activeHost, final KsqlHost hostInfo) {
* @param partition The partition of the topic
* @return a new FreshnessFilter, unless lag reporting is disabled.
*/
public static Optional<FreshnessFilter> create(
public static Optional<MaxAllowedLagFilter> create(
final Optional<LagReportingAgent> lagReportingAgent,
final RoutingOptions routingOptions,
final List<KsqlHost> hosts,
Expand All @@ -99,6 +98,6 @@ public static Optional<FreshnessFilter> create(
.map(Optional::get)
.mapToLong(LagInfoEntity::getEndOffsetPosition)
.max();
return Optional.of(new FreshnessFilter(routingOptions, lagByHost, maxEndOffset));
return Optional.of(new MaxAllowedLagFilter(routingOptions, lagByHost, maxEndOffset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ public RoutingFilters(final ImmutableList<RoutingFilter> routingFilters) {
this.routingFilters = Objects.requireNonNull(routingFilters, "routingFilters");
}

public boolean filter(
final HostInfo activeHost,
final KsqlHost host
) {
public boolean filter(final KsqlHost host) {
return routingFilters.stream()
.allMatch(f -> f.filter(activeHost, host));
.allMatch(f -> f.filter(host));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,21 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.rest.entity.LagInfoEntity;
import io.confluent.ksql.rest.entity.QueryStateStoreId;
import io.confluent.ksql.util.KsqlHost;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.state.HostInfo;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class FreshnessFilterTest {
public class MaxAllowedLagFilterTest {

private static HostInfo HOST_INFO = new HostInfo("host", 8088);
private static KsqlHost HOST = new KsqlHost("host", 8088);
private static KsqlHost HOST2 = new KsqlHost("host2", 8088);
private static List<KsqlHost> HOSTS = ImmutableList.of(HOST, HOST2);
Expand All @@ -43,7 +35,7 @@ public class FreshnessFilterTest {
@Mock
private RoutingOptions routingOptions;

private FreshnessFilter filter;
private MaxAllowedLagFilter filter;

@Before
public void setUp() {
Expand All @@ -64,12 +56,12 @@ public void filter_shouldIncludeBelowThreshold() {
when(routingOptions.getOffsetLagAllowed()).thenReturn(13L);

// When:
filter = FreshnessFilter.create(
filter = MaxAllowedLagFilter.create(
Optional.of(lagReportingAgent), routingOptions, HOSTS, APPLICATION_ID, STATE_STORE,
PARTITION).get();

// Then:
assertTrue(filter.filter(HOST_INFO, HOST));
assertTrue(filter.filter(HOST));
}

@Test
Expand All @@ -80,35 +72,35 @@ public void filter_shouldNotIncludeAboveThreshold() {
when(routingOptions.getOffsetLagAllowed()).thenReturn(11L);

// When:
filter = FreshnessFilter.create(
filter = MaxAllowedLagFilter.create(
Optional.of(lagReportingAgent), routingOptions, HOSTS, APPLICATION_ID, STATE_STORE,
PARTITION).get();

// Then:
assertFalse(filter.filter(HOST_INFO, HOST));
assertFalse(filter.filter(HOST));
}

@Test
public void hostNotReturned() {
public void filter_hostNotReturned() {
// Given:
when(lagReportingAgent.getHostsPartitionLagInfo(eq(HOST),
eq(QueryStateStoreId.of(APPLICATION_ID, STATE_STORE)), eq(PARTITION)))
.thenReturn(Optional.empty());
when(routingOptions.getOffsetLagAllowed()).thenReturn(13L);

// When:
filter = FreshnessFilter.create(
filter = MaxAllowedLagFilter.create(
Optional.of(lagReportingAgent), routingOptions, HOSTS, APPLICATION_ID, STATE_STORE,
PARTITION).get();

// Then:
assertTrue(filter.filter(HOST_INFO, HOST));
assertTrue(filter.filter(HOST));
}

@Test
public void lagReportingDisabled() {
public void filter_lagReportingDisabled() {
// When:
Optional<FreshnessFilter> filterOptional = FreshnessFilter.create(
Optional<MaxAllowedLagFilter> filterOptional = MaxAllowedLagFilter.create(
Optional.empty(), routingOptions, HOSTS, APPLICATION_ID, STATE_STORE, PARTITION);

// Then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
@RunWith(Enclosed.class)
public class PullQueryExecutorTest {
private static final RoutingFilterFactory ROUTING_FILTER_FACTORY =
(routingOptions, hosts, applicationQueryId, storeName, partition) ->
(routingOptions, hosts, active, applicationQueryId, storeName, partition) ->
new RoutingFilters(ImmutableList.of());

public static class Disabled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class StreamedQueryResourceTest {
private static final String PRINT_TOPIC = "Print TEST_TOPIC;";

private static final RoutingFilterFactory ROUTING_FILTER_FACTORY =
(routingOptions, hosts, applicationQueryId, storeName, partition) ->
(routingOptions, hosts, active, applicationQueryId, storeName, partition) ->
new RoutingFilters(ImmutableList.of());

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ public interface RoutingFilter {

/**
* If the given host should be included for consideration.
* @param activeHost The active host
* @param hostInfo The host to be considered for this filter
* @return If the host should be included for consideration.
*/
boolean filter(HostInfo activeHost, KsqlHost hostInfo);
boolean filter(KsqlHost hostInfo);

/**
* A factory for RoutingFilters.
Expand All @@ -43,6 +42,7 @@ interface RoutingFilterFactory {
* Creates a RoutingFilter
* @param routingOptions The options to use when filtering
* @param hosts The set of all hosts that have the store, including actives and standbys
* @param activeHost The active host
* @param applicationQueryId The application query id
* @param storeName The state store name
* @param partition The partition of the changelog topic
Expand All @@ -51,6 +51,7 @@ interface RoutingFilterFactory {
RoutingFilter createRoutingFilter(
RoutingOptions routingOptions,
List<KsqlHost> hosts,
HostInfo activeHost,
String applicationQueryId,
String storeName,
int partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,18 @@ public List<KsqlNode> locate(
final Set<HostInfo> standByHosts = metadata.getStandbyHosts();
LOG.info("Before filtering: Active host {} , standby hosts {}", activeHost, standByHosts);

final Stream<KsqlHost> active = Stream.of(asKsqlHost(activeHost));
final Stream<KsqlHost> standby = standByHosts
.stream()
.map(this::asKsqlHost);
final Stream<KsqlHost> hostStream = Stream.concat(active, standby);
final List<KsqlHost> allHosts = hostStream.collect(Collectors.toList());
final List<KsqlHost> allHosts = Stream.concat(Stream.of(activeHost), standByHosts.stream())
.map(this::asKsqlHost)
.collect(Collectors.toList());
final RoutingFilter routingFilter = routingFilterFactory.createRoutingFilter(routingOptions,
allHosts, applicationId, stateStoreName, metadata.getPartition());
allHosts, activeHost, applicationId, stateStoreName, metadata.getPartition());

// Filter out hosts based on liveness and lag filters.
// The list is ordered by routing preference: active node is first, then standby nodes
// in order of increasing lag.
// If heartbeat is not enabled, all hosts are considered alive.
final List<KsqlNode> filteredHosts = allHosts.stream()
.filter(hostInfo -> routingFilter.filter(activeHost, hostInfo))
.filter(routingFilter::filter)
.map(this::asNode)
.collect(Collectors.toList());

Expand Down
Loading

0 comments on commit 0f1560b

Please sign in to comment.