Skip to content

Commit

Permalink
chore: add more logging in sync stream (#929)
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
  • Loading branch information
toddbaert authored Aug 27, 2024
1 parent 7109051 commit 64c9f13
Showing 1 changed file with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
* Implements the {@link Connector} contract and emit flags obtained from flagd
* sync gRPC contract.
*/
@Slf4j
@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"},
justification = "Random is used to generate a variation & flag configurations require exposing")
@SuppressFBWarnings(value = { "PREDICTABLE_RANDOM",
"EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector implements Connector {
private static final Random RANDOM = new Random();

Expand Down Expand Up @@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException {
* Contains blocking calls, to be used concurrently.
*/
static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
final AtomicBoolean shutdown,
final FlagSyncServiceStub serviceStub,
final SyncFlagsRequest request)
final AtomicBoolean shutdown,
final FlagSyncServiceStub serviceStub,
final SyncFlagsRequest request)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
int retryDelay = INIT_BACK_OFF;

log.info("Initializing sync stream observer");

while (!shutdown.get()) {
log.debug("Initializing sync stream request");
serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver));

while (!shutdown.get()) {
final GrpcResponseModel response = streamReceiver.take();

if (response.isComplete()) {
// The stream is complete. This is not considered as an error
log.info("Sync stream completed");
// The stream is complete, this isn't really an error but we should try to
// reconnect
break;
}

if (response.getError() != null) {
log.warn(String.format("Error from grpc connection, retrying in %dms", retryDelay),
log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay),
response.getError());

if (!writeTo.offer(
new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) {
log.warn("Failed to convey ERROR satus, queue is full");
log.error("Failed to convey ERROR status, queue is full");
}
break;
}

final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: " + data);
if (!writeTo.offer(
new StreamPayload(StreamPayloadType.DATA, flagsResponse.getFlagConfiguration()))) {
log.warn("Stream writing failed");
new StreamPayload(StreamPayloadType.DATA, data))) {
log.error("Stream writing failed");
}

// reset retry delay if we succeeded in a retry attempt
Expand All @@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
}

// busy wait till next attempt
log.warn(String.format("Stream failed, retrying in %dms", retryDelay));
Thread.sleep(retryDelay + RANDOM.nextInt(INIT_BACK_OFF));

if (retryDelay < MAX_BACK_OFF) {
Expand Down

0 comments on commit 64c9f13

Please sign in to comment.