Skip to content

Commit

Permalink
feat(flagd): Add in-process evaluator (#104)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
Co-authored-by: Cole Bailey <[email protected]>
  • Loading branch information
aepfli and colebaileygit authored Feb 18, 2025
1 parent 4251f36 commit 01285e7
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 65 deletions.
35 changes: 33 additions & 2 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flagd Provider for OpenFeature

This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto).
This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json) via the OpenFeature Python SDK.

## Installation

Expand Down Expand Up @@ -29,7 +29,9 @@ api.set_provider(FlagdProvider())

### In-process resolver

This mode performs flag evaluations locally (in-process).
This mode performs flag evaluations locally (in-process). Flag configurations for evaluation are obtained via gRPC protocol using [sync protobuf schema](https://buf.build/open-feature/flagd/file/main:sync/v1/sync_service.proto) service definition.

Consider the following example to create a `FlagdProvider` with in-process evaluations,

```python
from openfeature import api
Expand All @@ -38,10 +40,39 @@ from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
))
```

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

<!--
#### Sync-metadata
To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
The value is updated with every (re)connection to the sync implementation.
This can be used to enrich evaluations with such data.
If the `in-process` mode is not used, and before the provider is ready, the `getSyncMetadata` returns an empty map.
-->
### File mode

In-process resolvers can also work in an offline mode.
To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.FILE,
offline_flag_source_path="my-flag.json",
))
```

Provider will attempt to detect file changes using polling.
Polling happens at 5 second intervals and this is currently unconfigurable.
This mode is useful for local development, tests and offline applications.

### Configuration options

The default options can be defined in the FlagdProvider constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CacheType(Enum):
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
ENV_VAR_SELECTOR = "FLAGD_SOURCE_SELECTOR"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
ENV_VAR_TLS_CERT = "FLAGD_SERVER_CERT_PATH"
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
selector: typing.Optional[str] = None,
resolver: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_ms: typing.Optional[int] = None,
Expand Down Expand Up @@ -221,3 +223,7 @@ def __init__( # noqa: PLR0913
if cert_path is None
else cert_path
)

self.selector = (
env_or_default(ENV_VAR_SELECTOR, None) if selector is None else selector
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__( # noqa: PLR0913
deadline_ms: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
selector: typing.Optional[str] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
stream_deadline_ms: typing.Optional[int] = None,
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__( # noqa: PLR0913
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_period=retry_grace_period,
selector=selector,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,5 @@
import typing

from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails

from .grpc import GrpcResolver
from .in_process import InProcessResolver


class AbstractResolver(typing.Protocol):
def initialize(self, evaluation_context: EvaluationContext) -> None: ...

def shutdown(self) -> None: ...

def resolve_boolean_details(
self,
key: str,
default_value: bool,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[bool]: ...

def resolve_string_details(
self,
key: str,
default_value: str,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[str]: ...

def resolve_float_details(
self,
key: str,
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]: ...

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]: ...

def resolve_object_details(
self,
key: str,
default_value: typing.Union[dict, list],
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[typing.Union[dict, list]]: ...

from .protocol import AbstractResolver

__all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"]
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ..config import Config
from .process.connector import FlagStateConnector
from .process.connector.grpc_watcher import GrpcWatcher
from .process.flags import FlagStore
from .process.targeting import targeting

Expand All @@ -28,13 +29,19 @@ def __init__(
],
):
self.config = config
if not self.config.offline_flag_source_path:
raise ValueError(
"offline_flag_source_path must be provided when using in-process resolver"
)
self.flag_store = FlagStore(emit_provider_configuration_changed)
self.connector: FlagStateConnector = FileWatcher(
self.config, self.flag_store, emit_provider_ready, emit_provider_error
self.connector: FlagStateConnector = (
FileWatcher(
self.config, self.flag_store, emit_provider_ready, emit_provider_error
)
if self.config.offline_flag_source_path
else GrpcWatcher(
self.config,
self.flag_store,
emit_provider_ready,
emit_provider_error,
emit_provider_stale,
)
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
Expand Down Expand Up @@ -112,6 +119,7 @@ def _resolve(
raise ParseError(
"Parsed JSONLogic targeting did not return a string or bool"
)

variant, value = flag.get_variant(variant)
if not value:
raise ParseError(f"Resolved variant {variant} not in variants config.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import json
import logging
import threading
import time
import typing

import grpc

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
from openfeature.schemas.protobuf.flagd.sync.v1 import (
sync_pb2,
sync_pb2_grpc,
)

from ....config import Config
from ..connector import FlagStateConnector
from ..flags import FlagStore

logger = logging.getLogger("openfeature.contrib")


class GrpcWatcher(FlagStateConnector):
def __init__(
self,
config: Config,
flag_store: FlagStore,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
):
self.flag_store = flag_store
self.config = config

self.channel = self._generate_channel(config)
self.stub = sync_pb2_grpc.FlagSyncServiceStub(self.channel)
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
self.retry_grace_period = config.retry_grace_period
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.selector = config.selector
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_stale = emit_provider_stale

self.connected = False
self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()

def _generate_channel(self, config: Config) -> grpc.Channel:
target = f"{config.host}:{config.port}"
# Create the channel with the service config
options = [
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.stream_deadline_ms),
]
if config.tls:
channel_args = {
"options": options,
"credentials": grpc.ssl_channel_credentials(),
}
if config.cert_path:
with open(config.cert_path, "rb") as f:
channel_args["credentials"] = grpc.ssl_channel_credentials(f.read())

channel = grpc.secure_channel(target, **channel_args)

else:
channel = grpc.insecure_channel(
target,
options=options,
)

return channel

def initialize(self, context: EvaluationContext) -> None:
self.connect()

def connect(self) -> None:
self.active = True

# Run monitoring in a separate thread
self.monitor_thread = threading.Thread(
target=self.monitor, daemon=True, name="FlagdGrpcSyncServiceMonitorThread"
)
self.monitor_thread.start()
## block until ready or deadline reached
timeout = self.deadline + time.time()
while not self.connected and time.time() < timeout:
time.sleep(0.05)
logger.debug("Finished blocking gRPC state initialization")

if not self.connected:
raise ProviderNotReadyError(
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
)

def monitor(self) -> None:
self.channel.subscribe(self._state_change_callback, try_to_connect=True)

def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if new_state == grpc.ChannelConnectivity.READY:
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.listen,
daemon=True,
name="FlagdGrpcSyncWorkerThread",
)
self.thread.start()

if self.timer and self.timer.is_alive():
logger.debug("gRPC error timer expired")
self.timer.cancel()

elif new_state == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
# this is the failed reconnect attempt so we are going into stale
self.emit_provider_stale(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
)
)
self.start_time = time.time()
# adding a timer, so we can emit the error event after time
self.timer = threading.Timer(self.retry_grace_period, self.emit_error)

logger.debug("gRPC error timer started")
self.timer.start()
self.connected = False

def emit_error(self) -> None:
logger.debug("gRPC error emitted")
self.emit_provider_error(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
error_code=ErrorCode.GENERAL,
)
)

def shutdown(self) -> None:
self.active = False
self.channel.close()

def listen(self) -> None:
call_args = (
{"timeout": self.streamline_deadline_seconds}
if self.streamline_deadline_seconds > 0
else {}
)
request_args = {"selector": self.selector} if self.selector is not None else {}

while self.active:
try:
request = sync_pb2.SyncFlagsRequest(**request_args)

logger.debug("Setting up gRPC sync flags connection")
for flag_rsp in self.stub.SyncFlags(
request, wait_for_ready=True, **call_args
):
flag_str = flag_rsp.flag_configuration
logger.debug(
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
)
self.flag_store.update(json.loads(flag_str))

if not self.connected:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
)
self.connected = True

if not self.active:
logger.debug("Terminating gRPC sync thread")
return
except grpc.RpcError as e: # noqa: PERF203
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
except json.JSONDecodeError:
logger.exception(
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
)
except ParseError:
logger.exception(
f"Could not parse flag data using flagd syntax: {flag_str=}"
)
Loading

0 comments on commit 01285e7

Please sign in to comment.