diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 8184da048..4e28f8bdf 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -20,11 +20,12 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract. + * Implements the {@link Connector} contract and emit flags obtained from flagd + * sync gRPC contract. */ @Slf4j -@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, - justification = "Random is used to generate a variation & flag configurations require exposing") +@SuppressFBWarnings(value = { "PREDICTABLE_RANDOM", + "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing") public class GrpcStreamConnector implements Connector { private static final Random RANDOM = new Random(); @@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException { * Contains blocking calls, to be used concurrently. */ static void observeEventStream(final BlockingQueue writeTo, - final AtomicBoolean shutdown, - final FlagSyncServiceStub serviceStub, - final SyncFlagsRequest request) + final AtomicBoolean shutdown, + final FlagSyncServiceStub serviceStub, + final SyncFlagsRequest request) throws InterruptedException { final BlockingQueue streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); int retryDelay = INIT_BACK_OFF; + log.info("Initializing sync stream observer"); + while (!shutdown.get()) { + log.debug("Initializing sync stream request"); serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver)); while (!shutdown.get()) { final GrpcResponseModel response = streamReceiver.take(); if (response.isComplete()) { - // The stream is complete. This is not considered as an error + log.info("Sync stream completed"); + // The stream is complete, this isn't really an error but we should try to + // reconnect break; } if (response.getError() != null) { - log.warn(String.format("Error from grpc connection, retrying in %dms", retryDelay), + log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay), response.getError()); if (!writeTo.offer( new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) { - log.warn("Failed to convey ERROR satus, queue is full"); + log.error("Failed to convey ERROR status, queue is full"); } break; } final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse(); + String data = flagsResponse.getFlagConfiguration(); + log.debug("Got stream response: " + data); if (!writeTo.offer( - new StreamPayload(StreamPayloadType.DATA, flagsResponse.getFlagConfiguration()))) { - log.warn("Stream writing failed"); + new StreamPayload(StreamPayloadType.DATA, data))) { + log.error("Stream writing failed"); } // reset retry delay if we succeeded in a retry attempt @@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue writeTo, } // busy wait till next attempt + log.warn(String.format("Stream failed, retrying in %dms", retryDelay)); Thread.sleep(retryDelay + RANDOM.nextInt(INIT_BACK_OFF)); if (retryDelay < MAX_BACK_OFF) {