Skip to content

Commit

Permalink
fixup: use deadline, pr feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
  • Loading branch information
toddbaert committed Sep 25, 2024
1 parent 8594544 commit 0c148a6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
/**
* Create a gRPC stream that get notified about flag changes.
*
* @param sync synchronization object from caller
* @param cache cache to update
* @param onResponse lambda to call to handle the response
* @param sync synchronization object from caller
* @param cache cache to update
* @param onConnectionEvent lambda to call to handle the response
*/
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onResponse) {
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onConnectionEvent) {
this.sync = sync;
this.cache = cache;
this.onConnectionEvent = onResponse;
this.onConnectionEvent = onConnectionEvent;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::grpconConnectionEvent);
this::onConnectionEvent);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
Expand Down Expand Up @@ -156,10 +156,10 @@ private void observeEventStream() {
}

log.error("failed to connect to event stream, exhausted retries");
this.grpconConnectionEvent(false, Collections.emptyList());
this.onConnectionEvent(false, Collections.emptyList());
}

private void grpcOnConnectionEvent(final boolean connected, final List<String> changedFlags) {
private void onConnectionEvent(final boolean connected, final List<String> changedFlags) {
// reset reconnection states
if (connected) {
this.eventStreamAttempt = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@
"EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector implements Connector {
private static final Random RANDOM = new Random();

private static final int INIT_BACK_OFF = 2 * 1000;
private static final int MAX_BACK_OFF = 120 * 1000;

private static final int QUEUE_SIZE = 5;

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final BlockingQueue<QueuePayload> blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);

private final ManagedChannel channel;
private final FlagSyncServiceStub serviceStub;
private final FlagSyncServiceBlockingStub serviceBlockingStub;
Expand All @@ -69,7 +66,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
public void init() {
Thread listener = new Thread(() -> {
try {
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector);
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline);
} catch (InterruptedException e) {
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -118,7 +115,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
final AtomicBoolean shutdown,
final FlagSyncServiceStub serviceStub,
final FlagSyncServiceBlockingStub serviceBlockingStub,
final String selector)
final String selector,
final int deadline)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
Expand All @@ -137,9 +135,11 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
syncRequest.setSelector(selector);
}

serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(),
new GrpcStreamHandler(streamReceiver));
try {
GetMetadataResponse metadataResponse = serviceBlockingStub.getMetadata(metadataRequest.build());
GetMetadataResponse metadataResponse = serviceBlockingStub
.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build());
metadata = convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap();
} catch (Exception e) {
// the chances this call fails but the syncRequest does not are slim
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -17,7 +20,6 @@
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import com.google.protobuf.Struct;

Expand All @@ -39,21 +41,25 @@ public void connectionParameters() throws Throwable {
// given
final FlagdOptions options = FlagdOptions.builder()
.selector("selector")
.deadline(1337)
.build();

final GrpcStreamConnector connector = new GrpcStreamConnector(options);
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);

final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
final SyncFlagsRequest[] request = new SyncFlagsRequest[1];

Mockito.doAnswer(invocation -> {
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
doAnswer(invocation -> {
request[0] = invocation.getArgument(0, SyncFlagsRequest.class);
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);

// then
final SyncFlagsRequest flagsRequest = request[0];
Expand All @@ -70,25 +76,27 @@ public void grpcConnectionStatus() throws Throwable {
final FlagSyncServiceStub stubMock = mockStubAndReturn(connector);
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);

final Struct metadata = Struct.newBuilder()
.putFields(key,
com.google.protobuf.Value.newBuilder().setStringValue(val).build())
.build();

final Struct metadata = Struct.newBuilder()
.putFields(key,
com.google.protobuf.Value.newBuilder().setStringValue(val).build())
.build();

when(blockingStubMock.getMetadata(any())).thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build());
when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock);
when(blockingStubMock.getMetadata(any()))
.thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build());

final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];

Mockito.doAnswer(invocation -> {
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
doAnswer(invocation -> {
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();
// verify and wait for initialization
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(blockingStubMock).getMetadata(any());

// then
Expand Down Expand Up @@ -133,15 +141,17 @@ public void listenerExitOnShutdown() throws Throwable {

final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];

Mockito.doAnswer(invocation -> {
when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock);
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
doAnswer(invocation -> {
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
return null;
}).when(stubMock).syncFlags(any(), any());

// when
connector.init();
// verify and wait for initialization
verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(blockingStubMock).getMetadata(any());

// then
Expand Down Expand Up @@ -176,23 +186,23 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c
final Field serviceStubField = GrpcStreamConnector.class.getDeclaredField("serviceStub");
serviceStubField.setAccessible(true);

final FlagSyncServiceStub stubMock = Mockito.mock(FlagSyncServiceStub.class);
final FlagSyncServiceStub stubMock = mock(FlagSyncServiceStub.class);

serviceStubField.set(connector, stubMock);

return stubMock;
}

private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector)
throws Throwable {
final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub");
blockingStubField.setAccessible(true);
throws Throwable {
final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub");
blockingStubField.setAccessible(true);

final FlagSyncServiceBlockingStub blockingStubMock = Mockito.mock(FlagSyncServiceBlockingStub.class);
final FlagSyncServiceBlockingStub blockingStubMock = mock(FlagSyncServiceBlockingStub.class);

blockingStubField.set(connector, blockingStubMock);
blockingStubField.set(connector, blockingStubMock);

return blockingStubMock;
return blockingStubMock;
}

}

0 comments on commit 0c148a6

Please sign in to comment.