Skip to content

Commit

Permalink
feat(flagd): migrate to new provider mode file and update e2e tests (#…
Browse files Browse the repository at this point in the history
…121)

* feat(flagd-rpc): add caching with tests

Signed-off-by: Simon Schrottner <[email protected]>

* fixup: using new test-harness

Signed-off-by: Simon Schrottner <[email protected]>

* fixup(flagd): remove merge conflict error as stated by warber

Signed-off-by: Simon Schrottner <[email protected]>

* feat(flagd): add graceful attempts

Signed-off-by: Simon Schrottner <[email protected]>

* feat(flagd): add graceful attempts

Signed-off-by: Simon Schrottner <[email protected]>

* fixup: rename method

Signed-off-by: Simon Schrottner <[email protected]>

* fixup: naming linting

Signed-off-by: Simon Schrottner <[email protected]>

* feat: better reconnect gherkins

Signed-off-by: Simon Schrottner <[email protected]>

---------

Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli authored Feb 6, 2025
1 parent 02dcfc0 commit eed1ee0
Show file tree
Hide file tree
Showing 38 changed files with 996 additions and 1,128 deletions.
20 changes: 17 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ We use `pytest` for our unit testing, making use of `parametrized` to inject cas

### Integration tests

These are planned once the SDK has been stabilized and a Flagd provider implemented. At that point, we will utilize the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance.
The Flagd provider utilizes the [gherkin integration tests](https://github.com/open-feature/test-harness/blob/main/features/evaluation.feature) to validate against a live, seeded Flagd instance.

To run the integration tests you need to have a container runtime, like docker, ranger, etc. installed.

```bash
hatch run test
```

### Type checking

Expand All @@ -52,6 +58,13 @@ Navigate to the repository folder
cd python-sdk-contrib
```

Checkout submodules

```bash
git submodule update --init --recursive
```


Add your fork as an origin

```bash
Expand All @@ -62,15 +75,16 @@ Ensure your development environment is all set up by building and testing

```bash
cd <package>
hatch run test
hatch build
hatch test
```

To start working on a new feature or bugfix, create a new branch and start working on it.

```bash
git checkout -b feat/NAME_OF_FEATURE
# Make your changes
git commit
git commit -s -m "feat: my feature"
git push fork feat/NAME_OF_FEATURE
```

Expand Down
4 changes: 2 additions & 2 deletions providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ classifiers = [
keywords = []
dependencies = [
"openfeature-sdk>=0.6.0",
"grpcio>=1.68.0",
"protobuf>=4.25.2",
"grpcio>=1.68.1",
"protobuf>=4.29.2",
"mmh3>=4.1.0",
"panzi-json-logic>=1.0.1",
"semver>=3,<4",
Expand Down
14 changes: 13 additions & 1 deletion providers/openfeature-provider-flagd/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
[pytest]
markers =
rpc: tests for rpc mode.
in-process: tests for rpc mode.
in-process: tests for in-process mode.
file: tests for file mode.
unavailable: tests for unavailable providers.
customCert: Supports custom certs.
unixsocket: Supports unixsockets.
targetURI: Supports targetURI.
grace: Supports grace attempts.
targeting: Supports targeting.
fractional: Supports fractional.
string: Supports string.
semver: Supports semver.
reconnect: Supports reconnect.
events: Supports events.
sync: Supports sync.
caching: Supports caching.
offline: Supports offline.
os.linux: linux mark.
stream: Supports streams.
bdd_features_base_dir = tests/features
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
class ResolverType(Enum):
RPC = "rpc"
IN_PROCESS = "in-process"
FILE = "file"


class CacheType(Enum):
Expand Down Expand Up @@ -158,6 +159,17 @@ def __init__( # noqa: PLR0913
else offline_flag_source_path
)

if (
self.offline_flag_source_path is not None
and self.resolver is ResolverType.IN_PROCESS
):
self.resolver = ResolverType.FILE

if self.resolver is ResolverType.FILE and self.offline_flag_source_path is None:
raise AttributeError(
"Resolver Type 'FILE' requires a offlineFlagSourcePath"
)

self.offline_poll_interval_ms: int = (
int(
env_or_default(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails
from openfeature.provider import AbstractProvider
from openfeature.provider.metadata import Metadata
from openfeature.provider.provider import AbstractProvider

from .config import CacheType, Config, ResolverType
from .resolvers import AbstractResolver, GrpcResolver, InProcessResolver
Expand All @@ -43,14 +43,14 @@ def __init__( # noqa: PLR0913
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
deadline: typing.Optional[int] = None,
deadline_ms: typing.Optional[int] = None,
timeout: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
cache: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
Expand All @@ -62,16 +62,16 @@ def __init__( # noqa: PLR0913
:param host: the host to make requests to
:param port: the port the flagd service is available on
:param tls: enable/disable secure TLS connectivity
:param deadline: the maximum to wait before a request times out
:param deadline_ms: the maximum to wait before a request times out
:param timeout: the maximum time to wait before a request times out
:param retry_backoff_ms: the number of milliseconds to backoff
:param offline_flag_source_path: the path to the flag source file
:param stream_deadline_ms: the maximum time to wait before a request times out
:param keep_alive_time: the number of milliseconds to keep alive
:param resolver_type: the type of resolver to use
"""
if deadline is None and timeout is not None:
deadline = timeout * 1000
if deadline_ms is None and timeout is not None:
deadline_ms = timeout * 1000
warnings.warn(
"'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds",
DeprecationWarning,
Expand All @@ -82,15 +82,15 @@ def __init__( # noqa: PLR0913
host=host,
port=port,
tls=tls,
deadline_ms=deadline,
deadline_ms=deadline_ms,
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_period=retry_grace_period,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
keep_alive_time=keep_alive_time,
cache=cache_type,
cache=cache,
max_cache_size=max_cache_size,
cert_path=cert_path,
)
Expand All @@ -106,8 +106,17 @@ def setup_resolver(self) -> AbstractResolver:
self.emit_provider_stale,
self.emit_provider_configuration_changed,
)
elif self.config.resolver == ResolverType.IN_PROCESS:
return InProcessResolver(self.config, self)
elif (
self.config.resolver == ResolverType.IN_PROCESS
or self.config.resolver == ResolverType.FILE
):
return InProcessResolver(
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_stale,
self.emit_provider_configuration_changed,
)
else:
raise ValueError(
f"`resolver_type` parameter invalid: {self.config.resolver}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ def initialize(self, evaluation_context: EvaluationContext) -> None:

def shutdown(self) -> None:
self.active = False
self.channel.unsubscribe(self._state_change_callback)
self.channel.close()
if self.timer and self.timer.is_alive():
logger.debug("gRPC error timer cancelled due to shutdown")
self.timer.cancel()
if self.cache:
self.cache.clear()

Expand Down Expand Up @@ -179,21 +183,22 @@ def listen(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)
call_args["wait_for_ready"] = True
request = evaluation_pb2.EventStreamRequest()

# defining a never ending loop to recreate the stream
while self.active:
try:
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
for message in self.stub.EventStream(
request, wait_for_ready=True, **call_args
):
if message.type == "provider_ready":
self.connected = True
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
)
self.connected = True
elif message.type == "configuration_change":
data = MessageToDict(message)["data"]
self.handle_changed_flags(data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
import typing

from openfeature.contrib.provider.flagd.resolvers.process.connector.file_watcher import (
FileWatcher,
)
from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import FlagNotFoundError, ParseError
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
from openfeature.provider import AbstractProvider

from ..config import Config
from .process.file_watcher import FileWatcherFlagStore
from .process.connector import FlagStateConnector
from .process.flags import FlagStore
from .process.targeting import targeting

T = typing.TypeVar("T")


class InProcessResolver:
def __init__(self, config: Config, provider: AbstractProvider):
def __init__(
self,
config: Config,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
emit_provider_configuration_changed: typing.Callable[
[ProviderEventDetails], None
],
):
self.config = config
self.provider = provider
if not self.config.offline_flag_source_path:
raise ValueError(
"offline_flag_source_path must be provided when using in-process resolver"
)
self.flag_store = FileWatcherFlagStore(
self.config.offline_flag_source_path,
self.provider,
self.config.retry_backoff_ms * 0.001,
self.flag_store = FlagStore(emit_provider_configuration_changed)
self.connector: FlagStateConnector = FileWatcher(
self.config, self.flag_store, emit_provider_ready, emit_provider_error
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
pass
self.connector.initialize(evaluation_context)

def shutdown(self) -> None:
self.flag_store.shutdown()
self.connector.shutdown()

def resolve_boolean_details(
self,
Expand All @@ -54,7 +65,10 @@ def resolve_float_details(
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]:
return self._resolve(key, default_value, evaluation_context)
result = self._resolve(key, default_value, evaluation_context)
if isinstance(result.value, int):
result.value = float(result.value)
return result

def resolve_integer_details(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import typing

from openfeature.evaluation_context import EvaluationContext


class FlagStateConnector(typing.Protocol):
def initialize(
self, evaluation_context: EvaluationContext
) -> None: ... # pragma: no cover

def shutdown(self) -> None: ... # pragma: no cover
Loading

0 comments on commit eed1ee0

Please sign in to comment.