Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: RLQS Prototype #11456

Draft
wants to merge 46 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
db68eba
RlqsFilter WIP
sergiitk Feb 7, 2024
6d371f7
Basic GrpcService type
sergiitk Feb 7, 2024
78f63d8
Basic GrpcService
sergiitk Feb 7, 2024
c778be0
Basic interceptor
sergiitk Feb 7, 2024
f387e34
Notes from the sync with Eric
sergiitk Feb 12, 2024
581684c
post-rebase fix
sergiitk Mar 19, 2024
80f59d9
RlqsClientPool, RlqsClient, working on shutdown
sergiitk Mar 26, 2024
5c6daa0
another note
sergiitk Mar 26, 2024
2d314eb
categorize todos
sergiitk Mar 26, 2024
98f9969
Basic RlqsBucketSettings and Matcher parsing
sergiitk Mar 26, 2024
b6f2865
Minimal CelMatcher
sergiitk Mar 27, 2024
58eabcd
basic cel-java integration/test
sergiitk Mar 27, 2024
b80f4f9
Implement GrpcCelEnvironment and MetadataHelper
sergiitk Sep 4, 2024
655daa2
Use dev.cel:runtime in the prod code
sergiitk Sep 6, 2024
6388940
Add RlqsClientPool/RlqsClient/RlqsApiClient classes
sergiitk Sep 16, 2024
e8d10f0
Draft bucket processing logic
sergiitk Sep 24, 2024
28f6686
Filter chain lifecycle bookmarks - filter provider refactoring TBD
sergiitk Sep 24, 2024
011d1b4
Draft reports and timers
sergiitk Sep 25, 2024
1c94765
RlqsClient -> RlqsEngine
sergiitk Sep 25, 2024
305874f
Remove periodic cleanup logic from RlqsClientPool
sergiitk Sep 25, 2024
33abbfa
RlqsClientPool -> RlqsCache
sergiitk Sep 25, 2024
7114a08
RlqsApiClient -> RlqsClient
sergiitk Sep 25, 2024
5aaf0e5
More class drafting
sergiitk Sep 25, 2024
760a959
Create proper RateLimitResult
sergiitk Sep 26, 2024
c6f0aff
Draft Bucket: Usage Reports, RateLimitStrategy, TTLs
sergiitk Sep 26, 2024
563ab2d
Improve method names
sergiitk Sep 27, 2024
8c5951d
RateLimitResult -> RlqsRateLimitResult
sergiitk Sep 27, 2024
aa8c55e
More API improvements
sergiitk Sep 27, 2024
0c657bf
getOrCreate pattern for bucket cache and timers
sergiitk Sep 27, 2024
234cac9
RlqsClient doesn't know about the bucket cache anymore; uses callbacks
sergiitk Sep 28, 2024
9579a5c
Minor renames
sergiitk Sep 28, 2024
2db5499
improved getOrCreateRlqsEngine
sergiitk Sep 28, 2024
64bad61
Handle special case
sergiitk Sep 28, 2024
75d5b60
Dynamic bucket id builder processing initial logic.
sergiitk Oct 3, 2024
3122282
hash config to a long
sergiitk Oct 4, 2024
aee8091
Remove outdated note
sergiitk Oct 8, 2024
a5f7eb9
add Filter.isEnabled()
sergiitk Oct 15, 2024
258f72b
add GRPC_EXPERIMENTAL_RLQS_DRY_RUN
sergiitk Oct 15, 2024
b47f15a
XdsTestServer: add --xds_server_mode
sergiitk Oct 17, 2024
c2ec069
convert logid to local
sergiitk Oct 17, 2024
1a54283
PSM e2e: works!
sergiitk Oct 17, 2024
d13069d
RlqsEngine -> RlqsFilterState
sergiitk Oct 17, 2024
c81a4ab
Add CEL types to the message printer
sergiitk Oct 21, 2024
401da42
Add CEL macro verifications
sergiitk Oct 24, 2024
011f97f
Add CelMatcher.fromEnvoyProto - just to import dev.cel.expr
sergiitk Oct 25, 2024
6dd240f
LongAdder note
sergiitk Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1"
conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
cronet-api = "org.chromium.net:cronet-api:119.6045.31"
cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31"
dev-cel-compiler = "dev.cel:compiler:0.6.0"
dev-cel-runtime = "dev.cel:runtime:0.6.0"
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.28.0"
errorprone-core = "com.google.errorprone:error_prone_core:2.28.0"
google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.41.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
Expand Down Expand Up @@ -82,6 +83,7 @@ public final class XdsTestServer {
private int port = 8080;
private int maintenancePort = 8080;
private boolean secureMode = false;
private boolean xdsServerMode = false;
private boolean enableCsmObservability;
private String serverId = "java_server";
private HealthStatusManager health;
Expand Down Expand Up @@ -142,7 +144,10 @@ void parseArgs(String[] args) {
maintenancePort = Integer.valueOf(value);
} else if ("secure_mode".equals(key)) {
secureMode = Boolean.parseBoolean(value);
} else if ("enable_csm_observability".equals(key)) {
} else if ("xds_server_mode".equals(key)) {
xdsServerMode = Boolean.parseBoolean(value);
}
else if ("enable_csm_observability".equals(key)) {
enableCsmObservability = Boolean.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
Expand All @@ -163,6 +168,9 @@ void parseArgs(String[] args) {
+ maintenancePort);
usage = true;
}
if (secureMode) {
xdsServerMode = true;
}

if (usage) {
XdsTestServer s = new XdsTestServer();
Expand All @@ -179,6 +187,9 @@ void parseArgs(String[] args) {
+ " port and maintenance_port should be different for secure mode."
+ "\n Default: "
+ s.secureMode
+ "\n --xds_server_mode=BOOLEAN Start in xDS Server mode."
+ "\n Default: "
+ s.xdsServerMode
+ "\n --enable_csm_observability=BOOL Enable CSM observability reporting. Default: "
+ s.enableCsmObservability
+ "\n --server_id=STRING server ID for response."
Expand Down Expand Up @@ -211,74 +222,93 @@ void start() throws Exception {
logger.log(Level.SEVERE, "Failed to get host", e);
throw new RuntimeException(e);
}

health = new HealthStatusManager();
ServerServiceDefinition testServiceInterceptor = ServerInterceptors.intercept(
new TestServiceImpl(serverId, host),
new TestInfoInterceptor(host));
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();

if (secureMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type");
}
maintenanceServer =
Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create())
Grpc.newServerBuilderForPort(maintenancePort, insecureServerCreds)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
maintenanceServer.start();
server =
XdsServerBuilder.forPort(
port, XdsServerCredentials.create(InsecureServerCredentials.create()))
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.build();
server.start();
} else {
ServerBuilder<?> serverBuilder;
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

logger.info("Starting server on port " + port + " with address type " + addressType);
health.setStatus("", ServingStatus.SERVING);
return;
}

server =
serverBuilder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
if (xdsServerMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("xDS Server mode only supports IPV4_IPV6 address type");
}
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
return;
}

ServerBuilder<?> serverBuilder;
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
}

Expand Down
4 changes: 3 additions & 1 deletion xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
project(':grpc-services'),
project(':grpc-auth'),
project(path: ':grpc-alts', configuration: 'shadow'),
libraries.dev.cel.runtime,
libraries.guava,
libraries.gson,
libraries.re2j,
Expand All @@ -70,7 +71,8 @@ dependencies {
compileOnly libraries.netty.transport.epoll

testImplementation project(':grpc-testing'),
project(':grpc-testing-proto')
project(':grpc-testing-proto'),
libraries.dev.cel.compiler
testImplementation (libraries.netty.transport.epoll) {
artifact {
classifier = "linux-x86_64"
Expand Down
19 changes: 19 additions & 0 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
*/
String[] typeUrls();

default boolean isEnabled() {
return true;
}

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
Expand All @@ -50,6 +54,12 @@
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);

default void shutdown() {
// Implement as needed.
// TODO(sergiitk): [DESIGN] important to cover and discuss in the design.
// TODO(sergiitk): [QUESTION] should it be in ServerInterceptorBuilder?
}

Check warning on line 61 in xds/src/main/java/io/grpc/xds/Filter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/Filter.java#L61

Added line #L61 was not covered by tests

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
Expand All @@ -68,6 +78,15 @@
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);

@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config,
@Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return buildServerInterceptor(config, overrideConfig);
}

}

/** Filter config with instance name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
drainGraceTime = drainGraceNanosObj;
drainGraceTimeUnit = TimeUnit.NANOSECONDS;
}
// TODO(sergiitk): [design] drains connections on LDS update.
FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer(
new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit));
FilterChainSelector selector = filterChainSelectorManager.register(closer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void updateSelector(FilterChainSelector newSelector) {
closers = new TreeSet<Closer>(closers.comparator());
selector = newSelector;
}
// TODO(sergiitk): [design] calls the closer of FilterChainMatchingNegotiatorServerFactory
for (Closer closer : oldClosers) {
closer.closer.run();
}
Expand Down
6 changes: 5 additions & 1 deletion xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
RbacFilter.INSTANCE,
RlqsFilter.INSTANCE);
}
return instance;
}
Expand All @@ -50,6 +51,9 @@ static FilterRegistry newRegistry() {
@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
if (!filter.isEnabled()) {
continue;
}
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
Expand Down
9 changes: 9 additions & 0 deletions xds/src/main/java/io/grpc/xds/MessagePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.xds;

import com.github.xds.type.matcher.v3.CelMatcher;
import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
Expand All @@ -28,6 +30,8 @@
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
Expand Down Expand Up @@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() {
.add(RBAC.getDescriptor())
.add(RBACPerRoute.getDescriptor())
.add(Router.getDescriptor())
// RLQS
.add(RateLimitQuotaFilterConfig.getDescriptor())
.add(RateLimitQuotaOverride.getDescriptor())
.add(HttpAttributesCelMatchInput.getDescriptor())
.add(CelMatcher.getDescriptor())
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
// by top-level resource types.
.add(UpstreamTlsContext.getDescriptor())
Expand Down
Loading
Loading