From b683fcec55e5537ff12360d41327b1dfc4f11045 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 18 Oct 2024 07:41:25 -0700 Subject: [PATCH] Fix waitForSidecar to respect timeout. (#1146) * Fix waitForSidecar to respect timeout. Signed-off-by: Artur Souza * Bring back 500ms interval for retries and log for waitForSidecar Signed-off-by: Artur Souza * Fix flaky test for ConfigurationClientIT Signed-off-by: Artur Souza --------- Signed-off-by: Artur Souza --- .../configuration/ConfigurationClientIT.java | 1 + .../dapr/it/resiliency/WaitForSidecarIT.java | 98 +++++++++++++++++++ .../java/io/dapr/client/DaprClientImpl.java | 56 +++-------- .../io/dapr/client/DaprClientHttpTest.java | 11 +-- 4 files changed, 116 insertions(+), 50 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/ConfigurationClientIT.java b/sdk-tests/src/test/java/io/dapr/it/configuration/ConfigurationClientIT.java index ee1343467..adbe4ee1c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/configuration/ConfigurationClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/ConfigurationClientIT.java @@ -69,6 +69,7 @@ public class ConfigurationClientIT extends BaseIT { public static void init() throws Exception { daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000); daprClient = daprRun.newDaprClientBuilder().build(); + daprClient.waitForSidecar(10000).block(); } @AfterAll diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java new file mode 100644 index 000000000..13d095470 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java @@ -0,0 +1,98 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.resiliency; + +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.ToxiProxyRun; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test SDK resiliency. + */ +public class WaitForSidecarIT extends BaseIT { + + // Use a number large enough to make sure it will respect the entire timeout. + private static final Duration LATENCY = Duration.ofSeconds(5); + + private static final Duration JITTER = Duration.ofSeconds(0); + + private static DaprRun daprRun; + + private static ToxiProxyRun toxiProxyRun; + + private static DaprRun daprNotRunning; + + @BeforeAll + public static void init() throws Exception { + daprRun = startDaprApp(WaitForSidecarIT.class.getSimpleName(), 5000); + daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName()+"NotRunning", 5000); + daprNotRunning.stop(); + + toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); + toxiProxyRun.start(); + } + + @Test + public void waitSucceeds() throws Exception { + try(var client = daprRun.newDaprClient()) { + client.waitForSidecar(5000).block(); + } + } + + @Test + public void waitTimeout() { + int timeoutInMillis = (int)LATENCY.minusMillis(100).toMillis(); + long started = System.currentTimeMillis(); + assertThrows(RuntimeException.class, () -> { + try(var client = toxiProxyRun.newDaprClientBuilder().build()) { + client.waitForSidecar(timeoutInMillis).block(); + } + }); + long duration = System.currentTimeMillis() - started; + assertTrue(duration >= timeoutInMillis); + } + + @Test + public void waitSlow() throws Exception { + int timeoutInMillis = (int)LATENCY.plusMillis(100).toMillis(); + long started = System.currentTimeMillis(); + try(var client = toxiProxyRun.newDaprClientBuilder().build()) { + client.waitForSidecar(timeoutInMillis).block(); + } + long duration = System.currentTimeMillis() - started; + assertTrue(duration >= LATENCY.toMillis()); + } + + @Test + public void waitNotRunningTimeout() { + // Does not make this number too smaller since bug does not repro when <= 2.5s. + // This has to do with a previous bug in the implementation. + int timeoutMilliseconds = 5000; + long started = System.currentTimeMillis(); + assertThrows(RuntimeException.class, () -> { + try(var client = daprNotRunning.newDaprClientBuilder().build()) { + client.waitForSidecar(timeoutMilliseconds).block(); + } + }); + long duration = System.currentTimeMillis() - started; + assertTrue(duration >= timeoutMilliseconds); + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 803488c0a..bea0bc217 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -81,6 +81,8 @@ import io.grpc.stub.AbstractStub; import io.grpc.stub.StreamObserver; import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -97,7 +99,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -114,6 +115,8 @@ */ public class DaprClientImpl extends AbstractDaprClient { + private final Logger logger; + /** * The GRPC managed channel to be used. */ @@ -235,6 +238,7 @@ private DaprClientImpl( this.httpClient = httpClient; this.retryPolicy = retryPolicy; this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy); + this.logger = LoggerFactory.getLogger(DaprClientImpl.class); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -273,53 +277,21 @@ public > T newGrpcStub(String appId, Function waitForSidecar(int timeoutInMilliseconds) { String[] pathSegments = new String[] { DaprHttp.API_VERSION, "healthz", "outbound"}; - int maxRetries = 5; - - Retry retrySpec = Retry - .fixedDelay(maxRetries, Duration.ofMillis(500)) - .doBeforeRetry(retrySignal -> { - System.out.println("Retrying component health check..."); - }); - - /* - NOTE: (Cassie) Uncomment this once it actually gets implemented: - https://github.com/grpc/grpc-java/issues/4359 - - int maxChannelStateRetries = 5; - - // Retry logic for checking the channel state - Retry channelStateRetrySpec = Retry - .fixedDelay(maxChannelStateRetries, Duration.ofMillis(500)) - .doBeforeRetry(retrySignal -> { - System.out.println("Retrying channel state check..."); - }); - */ // Do the Dapr Http endpoint check to have parity with Dotnet Mono responseMono = this.httpClient.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, null, "", null, null); return responseMono - .retryWhen(retrySpec) - /* - NOTE: (Cassie) Uncomment this once it actually gets implemented: - https://github.com/grpc/grpc-java/issues/4359 - .flatMap(response -> { - // Check the status code - int statusCode = response.getStatusCode(); - - // Check if the channel's state is READY - return Mono.defer(() -> { - if (this.channel.getState(true) == ConnectivityState.READY) { - // Return true if the status code is in the 2xx range - if (statusCode >= 200 && statusCode < 300) { - return Mono.empty(); // Continue with the flow - } - } - return Mono.error(new RuntimeException("Health check failed")); - }).retryWhen(channelStateRetrySpec); - }) - */ + // No method to "retry forever every 500ms", so we make it practically forever. + // 9223372036854775807 * 500 ms = 1.46235604 x 10^11 years + // If anyone needs to wait for the sidecar for longer than that, sorry. + .retryWhen( + Retry + .fixedDelay(Long.MAX_VALUE, Duration.ofMillis(500)) + .doBeforeRetry(s -> { + this.logger.info("Retrying sidecar health check ..."); + })) .timeout(Duration.ofMillis(timeoutInMilliseconds)) .onErrorResume(DaprException.class, e -> Mono.error(new RuntimeException(e))) diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index c09a00124..44e36d06d 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -133,16 +133,11 @@ public void waitForSidecarBadHealthCheck() throws Exception { .times(6) .respond(404, ResponseBody.create("Not Found", MediaType.get("application/json"))); - // retry the max allowed retries (5 times) + // it will timeout. StepVerifier.create(daprClientHttp.waitForSidecar(5000)) .expectSubscription() - .expectErrorMatches(throwable -> { - if (throwable instanceof RuntimeException) { - return "Retries exhausted: 5/5".equals(throwable.getMessage()); - } - return false; - }) - .verify(Duration.ofSeconds(20)); + .expectError() + .verify(Duration.ofMillis(6000)); } @Test