Skip to content

Commit

Permalink
feat: Support supplying a custom sync provider for in-process flagd (#…
Browse files Browse the repository at this point in the history
…598)

Signed-off-by: Maks Osowski <[email protected]>
  • Loading branch information
cupofcat authored Dec 19, 2024
1 parent 629a082 commit bfa642a
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 95 deletions.
31 changes: 29 additions & 2 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,36 @@ openfeature.SetProvider(provider)
The provider will attempt to detect file changes, but this is a best-effort attempt as file system events differ between operating systems.
This mode is useful for local development, tests and offline applications.

#### Custom sync provider

In-process resolver can also be configured with a custom sync provider to change how the in-process resolver fetches flags.
The custom sync provider must implement the [sync.ISync interface](https://github.com/open-feature/flagd/blob/main/core/pkg/sync/isync.go). Optional URI can be provided for the custom sync provider.

```go
var syncProvider sync.ISync = MyAwesomeSyncProvider{}

provider := flagd.NewProvider(
flagd.WithInProcessResolver(),
flagd.WithCustomSyncProvider(syncProvider))
openfeature.SetProvider(provider)
```

```go
var syncProvider sync.ISync = MyAwesomeSyncProvider{}
var syncProviderUri string = "myawesome://sync.uri"

provider := flagd.NewProvider(
flagd.WithInProcessResolver(),
flagd.WithCustomSyncProviderAndUri(syncProvider, syncProviderUri))
openfeature.SetProvider(provider)
```

> [!IMPORTANT]
> Note that you can only use a single flag source (either gRPC or offline file) for the in-process resolver.
> If both sources are configured, offline mode will be selected.
> Note that the in-process resolver can only use a single flag source.
> If multiple sources are configured then only one would be selected based on the following order of preference:
> 1. Custom sync provider
> 2. Offline file
> 3. gRPC
## Configuration options

Expand Down
8 changes: 6 additions & 2 deletions providers/flagd/pkg/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package flagd

import (
"fmt"
"github.com/go-logr/logr"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"os"
"strconv"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
)

type ResolverType string
Expand Down Expand Up @@ -52,6 +54,8 @@ type providerConfiguration struct {
Selector string
SocketPath string
TLSEnabled bool
CustomSyncProvider sync.ISync
CustomSyncProviderUri string

log logr.Logger
}
Expand Down
42 changes: 33 additions & 9 deletions providers/flagd/pkg/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@ package flagd
import (
"context"
"fmt"

parallel "sync"

"github.com/go-logr/logr"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/logger"
"github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
rpcService "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/rpc"
of "github.com/open-feature/go-sdk/openfeature"
"sync"
)

const (
defaultCustomSyncProviderUri = "syncprovider://custom"
)

type Provider struct {
Expand All @@ -18,7 +25,7 @@ type Provider struct {
providerConfiguration *providerConfiguration
service IService
status of.State
mtx sync.RWMutex
mtx parallel.RWMutex

eventStream chan of.Event
}
Expand Down Expand Up @@ -71,12 +78,14 @@ func NewProvider(opts ...ProviderOption) *Provider {
provider.providerConfiguration.EventStreamConnectionMaxAttempts)
} else {
service = process.NewInProcessService(process.Configuration{
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TargetUri: provider.providerConfiguration.TargetUri,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
Host: provider.providerConfiguration.Host,
Port: provider.providerConfiguration.Port,
Selector: provider.providerConfiguration.Selector,
TargetUri: provider.providerConfiguration.TargetUri,
TLSEnabled: provider.providerConfiguration.TLSEnabled,
OfflineFlagSource: provider.providerConfiguration.OfflineFlagSourcePath,
CustomSyncProvider: provider.providerConfiguration.CustomSyncProvider,
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
})
}

Expand Down Expand Up @@ -324,3 +333,18 @@ func FromEnv() ProviderOption {
p.providerConfiguration.updateFromEnvVar()
}
}

// WithCustomSyncProvider provides a custom implementation of the sync.ISync interface used by the inProcess Service
// This is only useful with inProcess resolver type
func WithCustomSyncProvider(customSyncProvider sync.ISync) ProviderOption {
return WithCustomSyncProviderAndUri(customSyncProvider, defaultCustomSyncProviderUri)
}

// WithCustomSyncProvider provides a custom implementation of the sync.ISync interface used by the inProcess Service
// This is only useful with inProcess resolver type
func WithCustomSyncProviderAndUri(customSyncProvider sync.ISync, customSyncProviderUri string) ProviderOption {
return func(p *Provider) {
p.providerConfiguration.CustomSyncProvider = customSyncProvider
p.providerConfiguration.CustomSyncProviderUri = customSyncProviderUri
}
}
215 changes: 141 additions & 74 deletions providers/flagd/pkg/provider_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,67 @@
package flagd

import (
"testing"

"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/cache"
"github.com/open-feature/go-sdk-contrib/providers/flagd/internal/mock"
process "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg/service/in_process"
of "github.com/open-feature/go-sdk/openfeature"
"go.uber.org/mock/gomock"
"testing"
)

func TestNewProvider(t *testing.T) {
customSyncProvider := process.NewDoNothingCustomSyncProvider()

tests := []struct {
name string
expectedResolver ResolverType
expectPort uint16
expectHost string
expectTargetUri string
expectCacheType cache.Type
expectCertPath string
expectMaxRetries int
expectCacheSize int
expectOtelIntercept bool
expectSocketPath string
expectTlsEnabled bool
options []ProviderOption
name string
expectedResolver ResolverType
expectPort uint16
expectHost string
expectTargetUri string
expectCacheType cache.Type
expectCertPath string
expectMaxRetries int
expectCacheSize int
expectOtelIntercept bool
expectSocketPath string
expectTlsEnabled bool
expectCustomSyncProvider sync.ISync
expectCustomSyncProviderUri string
options []ProviderOption
}{
{
name: "default construction",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default construction",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
},
{
name: "with options",
expectedResolver: inProcess,
expectPort: 9090,
expectHost: "myHost",
expectTargetUri: "",
expectCacheType: cache.LRUValue,
expectCertPath: "/path",
expectMaxRetries: 2,
expectCacheSize: 2500,
expectOtelIntercept: true,
expectSocketPath: "/socket",
expectTlsEnabled: true,
name: "with options",
expectedResolver: inProcess,
expectPort: 9090,
expectHost: "myHost",
expectTargetUri: "",
expectCacheType: cache.LRUValue,
expectCertPath: "/path",
expectMaxRetries: 2,
expectCacheSize: 2500,
expectOtelIntercept: true,
expectSocketPath: "/socket",
expectTlsEnabled: true,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
WithSocketPath("/socket"),
Expand All @@ -63,57 +74,103 @@ func TestNewProvider(t *testing.T) {
},
},
{
name: "default port handling with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default port handling with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
},
},
{
name: "default port handling with in-process resolver",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "default port handling with in-process resolver",
expectedResolver: rpc,
expectPort: defaultRpcPort,
expectHost: defaultHost,
expectTargetUri: "",
expectCacheType: defaultCache,
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithRPCResolver(),
},
},
{
name: "with target uri with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "envoy://localhost:9211/test.service",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
name: "with target uri with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "envoy://localhost:9211/test.service",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: nil,
expectCustomSyncProviderUri: "",
options: []ProviderOption{
WithInProcessResolver(),
WithTargetUri("envoy://localhost:9211/test.service"),
},
},
{
name: "with custom sync provider and uri with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: customSyncProvider,
expectCustomSyncProviderUri: "testsyncer://custom.uri",
options: []ProviderOption{
WithInProcessResolver(),
WithCustomSyncProviderAndUri(customSyncProvider, "testsyncer://custom.uri"),
},
},
{
name: "with custom sync provider with in-process resolver",
expectedResolver: inProcess,
expectPort: defaultInProcessPort,
expectHost: defaultHost,
expectCacheType: defaultCache,
expectTargetUri: "",
expectCertPath: "",
expectMaxRetries: defaultMaxEventStreamRetries,
expectCacheSize: defaultMaxCacheSize,
expectOtelIntercept: false,
expectSocketPath: "",
expectTlsEnabled: false,
expectCustomSyncProvider: customSyncProvider,
expectCustomSyncProviderUri: defaultCustomSyncProviderUri,
options: []ProviderOption{
WithInProcessResolver(),
WithCustomSyncProvider(customSyncProvider),
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -172,6 +229,16 @@ func TestNewProvider(t *testing.T) {
test.expectTargetUri, config.TargetUri)
}

if config.CustomSyncProvider != test.expectCustomSyncProvider {
t.Errorf("incorrect configuration CustomSyncProvider, expected %v, got %v",
test.expectCustomSyncProvider, config.CustomSyncProvider)
}

if config.CustomSyncProviderUri != test.expectCustomSyncProviderUri {
t.Errorf("incorrect configuration CustomSyncProviderUri, expected %v, got %v",
test.expectCustomSyncProviderUri, config.CustomSyncProviderUri)
}

// this line will fail linting if this provider is no longer compatible with the openfeature sdk
var _ of.FeatureProvider = flagdProvider
})
Expand Down
Loading

0 comments on commit bfa642a

Please sign in to comment.