Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Diff) compare changes for CDS only in XdsClient #2

Draft
wants to merge 31 commits into
base: impl/integrate_lds_rds_eds_in_xds_client
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d4e8962
cronet: disable code shrinking for library build (#6448)
voidzcy Nov 20, 2019
44b26da
stub: Provide bread-crumb to user how to configure StreamObserver bef…
ejona86 Nov 20, 2019
eaf99cf
core,xds: add missing JUnit RunWith annotation (#6457)
creamsoup Nov 21, 2019
81efecd
remove getLoadStatsStore method from LocalityStore
dapengzhang0 Nov 21, 2019
eb21c64
api, core: make scheduled executor service accessible for NameResolve…
voidzcy Nov 22, 2019
b0e00fd
netty: enable io.grpc.netty.useCustomAllocator by default (#6459)
zhangkun83 Nov 23, 2019
0a0d9f6
xds: integrate EDS protocol into XdsClient (#6370)
voidzcy Nov 25, 2019
531df7e
Implemented EDS response handler.
voidzcy Nov 9, 2019
d5c67d1
Add tests for EDS protocol and endpoint watchers.
voidzcy Nov 11, 2019
950cdd6
Fixed typo in comments.
voidzcy Nov 11, 2019
ee4d0f4
Add tests for EDS protocol and endpoint watchers.
voidzcy Nov 11, 2019
3a98dcb
Add tests for EDS protocol and endpoint watchers.
voidzcy Nov 11, 2019
5cfbf9e
Added CDS handling with add/cancel cluster watchers in XdsClient.
voidzcy Nov 11, 2019
ccbc1e5
Do not forward error to endpoint watchers when the proto message is b…
voidzcy Nov 12, 2019
6801aaa
CDS response handler should notify cluster watchers with for clusters…
voidzcy Nov 12, 2019
a9fba1f
Fixed bug of looking up wrong watcher for CDS response.
voidzcy Nov 12, 2019
2fc7900
Make a copy (instead of keep the reference) to latest requested CDS r…
voidzcy Nov 12, 2019
617c239
Added tests for CDS response handling and add/remove cluster watchers.
voidzcy Nov 12, 2019
e38400f
Complete retry tests for incorperating all xds protocols.
voidzcy Nov 14, 2019
81f875b
Changed flag name from invalidData to dataInvalid
voidzcy Nov 15, 2019
1230951
Run due tasks for scheduled 0-delay retries.
voidzcy Nov 19, 2019
23da2df
Enhance logging for adding/canceling watchers.
voidzcy Nov 25, 2019
17e6569
Enhance NACK request with error details.
voidzcy Nov 25, 2019
320b796
Added comment for describing subtlety of unsubscribing last CDS resou…
voidzcy Nov 25, 2019
1737080
Make adding the same watcher for the same cluster more than once a no…
voidzcy Nov 26, 2019
e69e2b3
Enhanced error message to locate the specific cluster message that co…
voidzcy Nov 26, 2019
3a6430f
Change a loop.
voidzcy Nov 26, 2019
d325ccc
Updated specification for adding/canceling cluster/endpoint watchers.
voidzcy Nov 26, 2019
9351f74
Lower log level for removing a watcher more than once.
voidzcy Nov 26, 2019
cb5c26f
Eliminate if sub-expression to be absorbed.
voidzcy Nov 26, 2019
510f848
Return emppty string to ClusterUpdate for LRS server name.
voidzcy Nov 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -412,6 +413,7 @@ public static final class Args {
private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser;
@Nullable private final ScheduledExecutorService scheduledExecutorService;
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;

Expand All @@ -420,12 +422,14 @@ private Args(
ProxyDetector proxyDetector,
SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser,
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
this.scheduledExecutorService = scheduledExecutorService;
this.channelLogger = channelLogger;
this.executor = executor;
}
Expand Down Expand Up @@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() {
return syncContext;
}

/**
* Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
*
* <p>This service is a shared resource and is only meant for quick tasks. DO NOT block or run
* time-consuming tasks.
*
* <p>The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
* and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
*
* @since 1.26.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public ScheduledExecutorService getScheduledExecutorService() {
if (scheduledExecutorService == null) {
throw new IllegalStateException("ScheduledExecutorService not set in Builder");
}
return scheduledExecutorService;
}

/**
* Returns the {@link ServiceConfigParser}.
*
Expand Down Expand Up @@ -501,6 +524,7 @@ public String toString() {
.add("proxyDetector", proxyDetector)
.add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser)
.add("scheduledExecutorService", scheduledExecutorService)
.add("channelLogger", channelLogger)
.add("executor", executor)
.toString();
Expand All @@ -517,6 +541,7 @@ public Builder toBuilder() {
builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser);
builder.setScheduledExecutorService(scheduledExecutorService);
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
return builder;
Expand All @@ -541,6 +566,7 @@ public static final class Builder {
private ProxyDetector proxyDetector;
private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser;
private ScheduledExecutorService scheduledExecutorService;
private ChannelLogger channelLogger;
private Executor executor;

Expand Down Expand Up @@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) {
return this;
}

/**
* See {@link Args#getScheduledExecutorService}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public Builder setScheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
return this;
}

/**
* See {@link Args#getServiceConfigParser}. This is a required field.
*
Expand Down Expand Up @@ -618,7 +654,7 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
channelLogger, executor);
scheduledExecutorService, channelLogger, executor);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -45,6 +46,8 @@ public class NameResolverTest {
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
private final ScheduledExecutorService scheduledExecutorService =
mock(ScheduledExecutorService.class);
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri;
Expand All @@ -62,6 +65,7 @@ public void args() {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);

Expand All @@ -70,6 +74,7 @@ public void args() {
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);

Expand Down Expand Up @@ -254,6 +259,7 @@ private NameResolver.Args createArgs() {
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(parser)
.setScheduledExecutorService(scheduledExecutorService)
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.build();
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final NameResolver.Args nameResolverArgs;
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
private final ClientTransportFactory transportFactory;
private final ScheduledExecutorForBalancer scheduledExecutorForBalancer;
private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
Expand Down Expand Up @@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutor =
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
Expand All @@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
.setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext)
.setScheduledExecutorService(scheduledExecutor)
.setServiceConfigParser(
new ScParser(
retryEnabled,
Expand All @@ -598,18 +605,11 @@ public void execute(Runnable command) {
})
.build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutorForBalancer =
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());

serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig;
Expand Down Expand Up @@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() {

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorForBalancer;
return scheduledExecutor;
}

@Override
Expand Down Expand Up @@ -1736,10 +1736,10 @@ synchronized void release() {
}
}

private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService {
private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
final ScheduledExecutorService delegate;

private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) {
private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import static org.junit.Assert.assertTrue;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(byte[],
* int, int)}.
*/
@RunWith(JUnit4.class)
public class ReadableBuffersArrayTest extends ReadableBufferTestBase {

@Test
Expand Down
4 changes: 2 additions & 2 deletions cronet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ android {
buildTypes {
debug { minifyEnabled false }
release {
minifyEnabled true
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
minifyEnabled false
consumerProguardFiles 'proguard-rules.pro'
}
}
testOptions { unitTests { includeAndroidResources = true } }
Expand Down
2 changes: 1 addition & 1 deletion netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class Utils {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
Expand Down
6 changes: 4 additions & 2 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,17 @@ public boolean isReady() {
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
if (frozen) {
throw new IllegalStateException("Cannot alter onReadyHandler after call started");
throw new IllegalStateException(
"Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
}
this.onReadyHandler = onReadyHandler;
}

@Override
public void disableAutoInboundFlowControl() {
if (frozen) {
throw new IllegalStateException("Cannot disable auto flow control call started");
throw new IllegalStateException(
"Cannot disable auto flow control after call started. Use ClientResponseObserver");
}
autoFlowControlEnabled = false;
}
Expand Down
8 changes: 6 additions & 2 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ public boolean isReady() {

@Override
public void setOnReadyHandler(Runnable r) {
checkState(!frozen, "Cannot alter onReadyHandler after initialization");
checkState(!frozen, "Cannot alter onReadyHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onReadyHandler = r;
}

Expand All @@ -391,7 +393,9 @@ public boolean isCancelled() {

@Override
public void setOnCancelHandler(Runnable onCancelHandler) {
checkState(!frozen, "Cannot alter onCancelHandler after initialization");
checkState(!frozen, "Cannot alter onCancelHandler after initialization. May only be called "
+ "during the initial call to the application, before the service returns its "
+ "StreamObserver");
this.onCancelHandler = onCancelHandler;
}

Expand Down
8 changes: 8 additions & 0 deletions xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ static final class LbEndpoint {
private final int loadBalancingWeight;
private final boolean isHealthy;

@VisibleForTesting
LbEndpoint(String address, int port, int loadBalancingWeight, boolean isHealthy) {
this(
new EquivalentAddressGroup(
new InetSocketAddress(address, port)),
loadBalancingWeight, isHealthy);
}

@VisibleForTesting
LbEndpoint(EquivalentAddressGroup eag, int loadBalancingWeight, boolean isHealthy) {
this.eag = eag;
Expand Down
20 changes: 8 additions & 12 deletions xds/src/main/java/io/grpc/xds/LocalityStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,23 @@ interface LocalityStore {

void updateOobMetricsReportInterval(long reportIntervalNano);

LoadStatsStore getLoadStatsStore();

@VisibleForTesting
abstract class LocalityStoreFactory {
private static final LocalityStoreFactory DEFAULT_INSTANCE =
new LocalityStoreFactory() {
@Override
LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) {
return new LocalityStoreImpl(helper, lbRegistry);
LocalityStore newLocalityStore(
Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
return new LocalityStoreImpl(helper, lbRegistry, loadStatsStore);
}
};

static LocalityStoreFactory getInstance() {
return DEFAULT_INSTANCE;
}

abstract LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry);
abstract LocalityStore newLocalityStore(
Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore);
}

final class LocalityStoreImpl implements LocalityStore {
Expand All @@ -113,9 +113,10 @@ final class LocalityStoreImpl implements LocalityStore {
private List<DropOverload> dropOverloads = ImmutableList.of();
private long metricsReportIntervalNano = -1;

LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
LocalityStoreImpl(
Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance,
new LoadStatsStoreImpl(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
loadStatsStore, OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
}

@VisibleForTesting
Expand Down Expand Up @@ -284,11 +285,6 @@ public String toString() {
TimeUnit.MINUTES, helper.getScheduledExecutorService());
}

@Override
public LoadStatsStore getLoadStatsStore() {
return loadStatsStore;
}

@Override
public void updateOobMetricsReportInterval(long reportIntervalNano) {
metricsReportIntervalNano = reportIntervalNano;
Expand Down
6 changes: 4 additions & 2 deletions xds/src/main/java/io/grpc/xds/LookasideLb.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, node);
localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry);
LoadStatsStore loadStatsStore = new LoadStatsStoreImpl();
localityStore = localityStoreFactory.newLocalityStore(
helper, lbRegistry, loadStatsStore);
// TODO(zdapeng): Use XdsClient to do Lrs directly.
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(),
localityStore.getLoadStatsStore());
loadStatsStore);
final LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
Expand Down
Loading