Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSE Client #234

Merged
merged 9 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test_examples.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Test Examples

## For anyone looking to change this (Internal to DevCycle) - the project is here: https://app.devcycle.com/o/org_U9F8YMaTChTEndWw/p/git-hub-actions-integration-tests/features/6642210af1c941418857b237
on:
pull_request:
branches: [ main ]
Expand All @@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
env:
DEVCYCLE_SERVER_SDK_KEY: ${{ secrets.DEVCYCLE_SERVER_SDK_KEY }}
DEVCYCLE_VARIABLE_KEY: test-boolean-variable
DEVCYCLE_VARIABLE_KEY: go-example-tests
steps:
- uses: actions/checkout@v4

Expand Down
27 changes: 19 additions & 8 deletions api/model_event.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/*
* DevCycle Bucketing API
*
* Documents the DevCycle Bucketing API which provides and API interface to User Bucketing and for generated SDKs.
*
* API version: 1.0.0
* Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
*/
package api

import (
Expand All @@ -21,6 +13,25 @@ const (
EventType_CustomEvent = "customEvent"
)

type ClientEvent struct {
EventType ClientEventType `json:"eventType"`
EventData interface{} `json:"eventData"`
Status string `json:"status"`
Error error `json:"error"`
}

type ClientEventType string

const (
ClientEventType_Initialized ClientEventType = "initialized"
ClientEventType_Error ClientEventType = "error"
ClientEventType_ConfigUpdated ClientEventType = "configUpdated"
ClientEventType_RealtimeUpdates ClientEventType = "realtimeUpdates"
ClientEventType_InternalSSEFailure ClientEventType = "internalSSEFailure"
ClientEventType_InternalNewConfigAvailable ClientEventType = "internalNewConfigAvailable"
ClientEventType_InternalSSEConnected ClientEventType = "internalSSEConnected"
)

type Event struct {
Comment on lines +16 to +34
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this for internal (and exposed) events to send in the notification channels.

RealtimeUpdates is used by the SDKProxy to rebroadcast the SSE messages received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add that context to the code as a comment

Type_ string `json:"type"`
Target string `json:"target,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions api/model_sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package api

type MinimalConfig struct {
SSE *SSEHost `json:"sse,omitempty"`
}

type SSEHost struct {
Hostname string `json:"hostname,omitempty"`
Path string `json:"path,omitempty"`
}
Comment on lines +3 to +10
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSE config properties need to be parsed from the config itself - but not the rest of the config. This is the minimal config that we need before we pass the raw data to the bucketing library which handles the rest.

3 changes: 1 addition & 2 deletions bucketing/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (eq *EventQueue) FlushEventQueue(clientUUID, configEtag, rayId, lastModifie
records = append(records, eq.userEventQueue.BuildBatchRecords()...)
eq.aggEventQueue = make(AggregateEventQueue)
eq.userEventQueue = make(UserEventQueue)
eq.userEventQueueCount = 0

for _, record := range records {
var payload *api.FlushPayload
Expand All @@ -286,7 +285,7 @@ func (eq *EventQueue) FlushEventQueue(clientUUID, configEtag, rayId, lastModifie
}
eq.pendingPayloads[payload.PayloadId] = *payload
}

eq.userEventQueueCount = 0
eq.updateFailedPayloads()

eq.eventsFlushed.Add(int32(len(eq.pendingPayloads)))
Expand Down
1 change: 1 addition & 0 deletions bucketing/model_config_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type configBody struct {
etag string
rayId string
lastModified string
SSE api.SSEHost `json:"sse,omitempty"`
variableIdMap map[string]*Variable
variableKeyMap map[string]*Variable
variableIdToFeatureMap map[string]*ConfigFeature
Expand Down
66 changes: 39 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Client struct {
localBucketing LocalBucketing
platformData *PlatformData
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully.
isInitialized bool
internalOnInitializedChannel chan bool
isInitialized bool
internalClientEventChannel chan api.ClientEvent
Comment on lines +55 to +56
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only has the initialized event sent to it - but for continuity I moved it to be a general client event.

}

type LocalBucketing interface {
Expand Down Expand Up @@ -84,8 +84,11 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
util.Errorf("%v", err)
return nil, err
}
if options == nil {
return nil, errors.New("missing options! Call NewClient with valid options")
}
if !sdkKeyIsValid(sdkKey) {
return nil, fmt.Errorf("Invalid sdk key. Call NewClient with a valid sdk key.")
return nil, fmt.Errorf("invalid sdk key %s. Call NewClient with a valid sdk key", sdkKey)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually super helpful - the stupid sdkkey on github actions is way too easy to hit enter on after copying - which then gets added as \n to the string, thus is an invalid sdk key - but you have no idea because it's not shown.

}
options.CheckDefaults()
cfg := NewConfiguration(options)
Expand All @@ -99,15 +102,14 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
} else {
c.platformData = GeneratePlatformData()
}
c.internalClientEventChannel = make(chan api.ClientEvent, 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this init channel for both cloud and local - had to move it outside of the local bucketing setup.


if c.DevCycleOptions.Logger != nil {
util.SetLogger(c.DevCycleOptions.Logger)
}
if c.IsLocalBucketing() {
util.Infof("Using Native Bucketing")

c.internalOnInitializedChannel = make(chan bool, 1)

err := c.setLBClient(sdkKey, options)
if err != nil {
return c, fmt.Errorf("Error setting up local bucketing: %w", err)
Expand All @@ -119,29 +121,27 @@ func NewClient(sdkKey string, options *Options) (*Client, error) {
return c, fmt.Errorf("Error initializing event queue: %w", err)
}

c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

c.configManager.StartPolling(options.ConfigPollingIntervalMS)
c.configManager, err = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg)

if c.DevCycleOptions.OnInitializedChannel != nil {
// TODO: Pass this error back via a channel internally
if err != nil {
return nil, fmt.Errorf("Error initializing config manager: %w", err)
}
if c.DevCycleOptions.ClientEventHandler != nil {
go func() {
_ = c.configManager.initialFetch()
c.handleInitialization()
}()
} else {
err := c.configManager.initialFetch()
err = c.configManager.initialFetch()
c.handleInitialization()
return c, err
}
} else {
util.Infof("Using Cloud Bucketing")
if c.DevCycleOptions.OnInitializedChannel != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
}()
if err != nil {
return c, err
}
}
return c, err
}

c.handleInitialization()
return c, nil
}

Expand All @@ -150,18 +150,26 @@ func (c *Client) IsLocalBucketing() bool {
}

func (c *Client) handleInitialization() {
c.isInitialized = true

bucketingInitMessage := "Using cloud bucketing with hostname: " + c.DevCycleOptions.BucketingAPIURI
if c.IsLocalBucketing() {
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
bucketingInitMessage = fmt.Sprintf("Client initialized with local bucketing %v", c.localBucketing.GetUUID())
}
if c.DevCycleOptions.OnInitializedChannel != nil {
initEvent := api.ClientEvent{
EventType: api.ClientEventType_Initialized,
EventData: bucketingInitMessage,
Status: "success",
Error: nil,
}
c.internalClientEventChannel <- initEvent
c.isInitialized = true

if c.DevCycleOptions.ClientEventHandler != nil {
go func() {
c.DevCycleOptions.OnInitializedChannel <- true
c.DevCycleOptions.ClientEventHandler <- initEvent
}()

}
c.internalOnInitializedChannel <- true
util.Infof(bucketingInitMessage)

}

func (c *Client) generateBucketedConfig(user User) (config *BucketedUserConfig, err error) {
Expand Down Expand Up @@ -508,7 +516,11 @@ func (c *Client) Close() (err error) {

if !c.isInitialized {
util.Infof("Awaiting client initialization before closing")
<-c.internalOnInitializedChannel
for event := range c.internalClientEventChannel {
if event.EventType == api.ClientEventType_Initialized {
break
}
}
}

if c.eventQueue != nil {
Expand Down
8 changes: 4 additions & 4 deletions client_native_bucketing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type NativeLocalBucketing struct {
clientUUID string
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

Comment on lines +40 to +43
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the uuid handler inside of the configreceiver so we can use it as part of the unit testing/eventqueue for tracking sdkConfig events

func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) {
clientUUID := uuid.New().String()

Expand All @@ -53,10 +57,6 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti
}, err
}

func (n *NativeLocalBucketing) GetUUID() string {
return n.clientUUID
}

func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error {
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue)
if err != nil {
Expand Down
Loading
Loading