Skip to content

Commit

Permalink
fixup: use pojo instead of triconsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
  • Loading branch information
toddbaert committed Sep 25, 2024
1 parent 0c148a6 commit 667946f
Show file tree
Hide file tree
Showing 8 changed files with 1,346 additions and 1,219 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand Down Expand Up @@ -145,17 +145,16 @@ private boolean isConnected() {
return this.connected;
}

private void onConnectionEvent(boolean newConnectedState, List<String> changedFlagKeys,
Map<String, Object> syncMetadata) {
private void onConnectionEvent(ConnectionEvent connectionEvent) {
boolean previous = connected;
boolean current = newConnectedState;
this.connected = newConnectedState;
this.syncMetadata = syncMetadata;
boolean current = connected = connectionEvent.isConnected();
syncMetadata = connectionEvent.getSyncMetadata();

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys)
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* Event payload for a
* {@link dev.openfeature.contrib.providers.flagd.resolver.Resolver} connection
* state change event.
*/
@AllArgsConstructor
public class ConnectionEvent {
@Getter
private final boolean connected;
private final List<String> flagsChanged;
private final Map<String, Object> syncMetadata;

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
*/
public ConnectionEvent(boolean connected) {
this(connected, Collections.emptyList(), Collections.emptyMap());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param flagsChanged list of flags changed
*/
public ConnectionEvent(boolean connected, List<String> flagsChanged) {
this(connected, flagsChanged, Collections.emptyMap());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param syncMetadata sync.getMetadata
*/
public ConnectionEvent(boolean connected, Map<String, Object> syncMetadata) {
this(connected, Collections.emptyList(), syncMetadata);
}

/**
* Get changed flags.
*
* @return an unmodifiable view of the changed flags
*/
public List<String> getFlagsChanged() {
return Collections.unmodifiableList(flagsChanged);
}

/**
* Get changed sync metadata.
*
* @return an unmodifiable view of the sync metadata
*/
public Map<String, Object> getSyncMetadata() {
return Collections.unmodifiableMap(syncMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
import dev.openfeature.sdk.internal.TriConsumer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
Expand All @@ -38,7 +38,7 @@ public class GrpcConnector {
private final long deadline;

private final Cache cache;
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
private final Consumer<ConnectionEvent> onConnectionEvent;
private final Supplier<Boolean> connectedSupplier;

private int eventStreamAttempt = 1;
Expand All @@ -56,7 +56,7 @@ public class GrpcConnector {
* @param onConnectionEvent lambda which handles changes in the connection/stream
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
Consumer<ConnectionEvent> onConnectionEvent) {
this.channel = ChannelBuilder.nettyChannel(options);
this.serviceStub = ServiceGrpc.newStub(channel);
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
Expand Down Expand Up @@ -105,7 +105,7 @@ public void shutdown() throws Exception {
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
}
this.onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
this.onConnectionEvent.accept(new ConnectionEvent(false, Collections.emptyList(), Collections.emptyMap()));
}
}

Expand Down Expand Up @@ -166,6 +166,6 @@ private void onConnectionEvent(final boolean connected, final List<String> chang
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
}
// chain to initiator
this.onConnectionEvent.accept(connected, changedFlags, Collections.emptyMap());
this.onConnectionEvent.accept(new ConnectionEvent(connected, changedFlags));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.getField;
import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.getFieldDescriptor;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -16,6 +16,7 @@
import dev.openfeature.contrib.providers.flagd.Config;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveFactory;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveStrategy;
Expand All @@ -33,7 +34,6 @@
import dev.openfeature.sdk.exceptions.OpenFeatureError;
import dev.openfeature.sdk.exceptions.ParseError;
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import dev.openfeature.sdk.internal.TriConsumer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -61,7 +61,7 @@ public final class GrpcResolver implements Resolver {
* @param onConnectionEvent lambda which handles changes in the connection/stream
*/
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
final Consumer<ConnectionEvent> onConnectionEvent) {
this.cache = cache;
this.connectedSupplier = connectedSupplier;
this.strategy = ResolveFactory.getStrategy(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
Expand All @@ -27,7 +26,6 @@
import dev.openfeature.sdk.Value;
import dev.openfeature.sdk.exceptions.ParseError;
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import dev.openfeature.sdk.internal.TriConsumer;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -38,7 +36,7 @@
@Slf4j
public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
private final Consumer<ConnectionEvent> onConnectionEvent;
private final Operator operator;
private final long deadline;
private final ImmutableMetadata metadata;
Expand All @@ -56,7 +54,7 @@ public class InProcessResolver implements Resolver {
* connection/stream
*/
public InProcessResolver(FlagdOptions options, final Supplier<Boolean> connectedSupplier,
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
Consumer<ConnectionEvent> onConnectionEvent) {
this.flagStore = new FlagStore(getConnector(options));
this.deadline = options.getDeadline();
this.onConnectionEvent = onConnectionEvent;
Expand All @@ -79,11 +77,11 @@ public void init() throws Exception {
final StorageStateChange storageStateChange = flagStore.getStateQueue().take();
switch (storageStateChange.getStorageState()) {
case OK:
onConnectionEvent.accept(true, storageStateChange.getChangedFlagsKeys(),
storageStateChange.getSyncMetadata());
onConnectionEvent.accept(new ConnectionEvent(true, storageStateChange.getChangedFlagsKeys(),
storageStateChange.getSyncMetadata()));
break;
case ERROR:
onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
onConnectionEvent.accept(new ConnectionEvent(false));
break;
default:
log.info(String.format("Storage emitted unhandled status: %s",
Expand All @@ -109,7 +107,7 @@ public void init() throws Exception {
*/
public void shutdown() throws InterruptedException {
flagStore.shutdown();
onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
onConnectionEvent.accept(new ConnectionEvent(false));
}

/**
Expand Down
Loading

0 comments on commit 667946f

Please sign in to comment.