-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: SSE Client #234
feat: SSE Client #234
Changes from all commits
94e54c3
c6cb4e7
f36ef79
88e6b28
8bd7750
c3b7789
bacea40
3db8d55
63f08b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package api | ||
|
||
type MinimalConfig struct { | ||
SSE *SSEHost `json:"sse,omitempty"` | ||
} | ||
|
||
type SSEHost struct { | ||
Hostname string `json:"hostname,omitempty"` | ||
Path string `json:"path,omitempty"` | ||
} | ||
Comment on lines
+3
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SSE config properties need to be parsed from the config itself - but not the rest of the config. This is the minimal config that we need before we pass the raw data to the bucketing library which handles the rest. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,8 +52,8 @@ type Client struct { | |
localBucketing LocalBucketing | ||
platformData *PlatformData | ||
// Set to true when the client has been initialized, regardless of whether the config has loaded successfully. | ||
isInitialized bool | ||
internalOnInitializedChannel chan bool | ||
isInitialized bool | ||
internalClientEventChannel chan api.ClientEvent | ||
Comment on lines
+55
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only has the initialized event sent to it - but for continuity I moved it to be a general client event. |
||
} | ||
|
||
type LocalBucketing interface { | ||
|
@@ -84,8 +84,11 @@ func NewClient(sdkKey string, options *Options) (*Client, error) { | |
util.Errorf("%v", err) | ||
return nil, err | ||
} | ||
if options == nil { | ||
return nil, errors.New("missing options! Call NewClient with valid options") | ||
} | ||
if !sdkKeyIsValid(sdkKey) { | ||
return nil, fmt.Errorf("Invalid sdk key. Call NewClient with a valid sdk key.") | ||
return nil, fmt.Errorf("invalid sdk key %s. Call NewClient with a valid sdk key", sdkKey) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was actually super helpful - the stupid sdkkey on github actions is way too easy to hit enter on after copying - which then gets added as |
||
} | ||
options.CheckDefaults() | ||
cfg := NewConfiguration(options) | ||
|
@@ -99,15 +102,14 @@ func NewClient(sdkKey string, options *Options) (*Client, error) { | |
} else { | ||
c.platformData = GeneratePlatformData() | ||
} | ||
c.internalClientEventChannel = make(chan api.ClientEvent, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use this init channel for both cloud and local - had to move it outside of the local bucketing setup. |
||
|
||
if c.DevCycleOptions.Logger != nil { | ||
util.SetLogger(c.DevCycleOptions.Logger) | ||
} | ||
if c.IsLocalBucketing() { | ||
util.Infof("Using Native Bucketing") | ||
|
||
c.internalOnInitializedChannel = make(chan bool, 1) | ||
|
||
err := c.setLBClient(sdkKey, options) | ||
if err != nil { | ||
return c, fmt.Errorf("Error setting up local bucketing: %w", err) | ||
|
@@ -119,29 +121,27 @@ func NewClient(sdkKey string, options *Options) (*Client, error) { | |
return c, fmt.Errorf("Error initializing event queue: %w", err) | ||
} | ||
|
||
c.configManager = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg) | ||
|
||
c.configManager.StartPolling(options.ConfigPollingIntervalMS) | ||
c.configManager, err = NewEnvironmentConfigManager(sdkKey, c.localBucketing, c.eventQueue, options, c.cfg) | ||
|
||
if c.DevCycleOptions.OnInitializedChannel != nil { | ||
// TODO: Pass this error back via a channel internally | ||
if err != nil { | ||
return nil, fmt.Errorf("Error initializing config manager: %w", err) | ||
} | ||
if c.DevCycleOptions.ClientEventHandler != nil { | ||
go func() { | ||
_ = c.configManager.initialFetch() | ||
c.handleInitialization() | ||
}() | ||
} else { | ||
err := c.configManager.initialFetch() | ||
err = c.configManager.initialFetch() | ||
c.handleInitialization() | ||
return c, err | ||
} | ||
} else { | ||
util.Infof("Using Cloud Bucketing") | ||
if c.DevCycleOptions.OnInitializedChannel != nil { | ||
go func() { | ||
c.DevCycleOptions.OnInitializedChannel <- true | ||
}() | ||
if err != nil { | ||
return c, err | ||
} | ||
} | ||
return c, err | ||
} | ||
|
||
c.handleInitialization() | ||
return c, nil | ||
} | ||
|
||
|
@@ -150,18 +150,26 @@ func (c *Client) IsLocalBucketing() bool { | |
} | ||
|
||
func (c *Client) handleInitialization() { | ||
c.isInitialized = true | ||
|
||
bucketingInitMessage := "Using cloud bucketing with hostname: " + c.DevCycleOptions.BucketingAPIURI | ||
if c.IsLocalBucketing() { | ||
util.Infof("Client initialized with local bucketing %v", c.localBucketing.GetUUID()) | ||
bucketingInitMessage = fmt.Sprintf("Client initialized with local bucketing %v", c.localBucketing.GetUUID()) | ||
} | ||
if c.DevCycleOptions.OnInitializedChannel != nil { | ||
initEvent := api.ClientEvent{ | ||
EventType: api.ClientEventType_Initialized, | ||
EventData: bucketingInitMessage, | ||
Status: "success", | ||
Error: nil, | ||
} | ||
c.internalClientEventChannel <- initEvent | ||
c.isInitialized = true | ||
|
||
if c.DevCycleOptions.ClientEventHandler != nil { | ||
go func() { | ||
c.DevCycleOptions.OnInitializedChannel <- true | ||
c.DevCycleOptions.ClientEventHandler <- initEvent | ||
}() | ||
|
||
} | ||
c.internalOnInitializedChannel <- true | ||
util.Infof(bucketingInitMessage) | ||
|
||
} | ||
|
||
func (c *Client) generateBucketedConfig(user User) (config *BucketedUserConfig, err error) { | ||
|
@@ -508,7 +516,11 @@ func (c *Client) Close() (err error) { | |
|
||
if !c.isInitialized { | ||
util.Infof("Awaiting client initialization before closing") | ||
<-c.internalOnInitializedChannel | ||
for event := range c.internalClientEventChannel { | ||
if event.EventType == api.ClientEventType_Initialized { | ||
break | ||
} | ||
} | ||
} | ||
|
||
if c.eventQueue != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,10 @@ type NativeLocalBucketing struct { | |
clientUUID string | ||
} | ||
|
||
func (n *NativeLocalBucketing) GetUUID() string { | ||
return n.clientUUID | ||
} | ||
|
||
Comment on lines
+40
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the uuid handler inside of the configreceiver so we can use it as part of the unit testing/eventqueue for tracking |
||
func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, options *Options) (*NativeLocalBucketing, error) { | ||
clientUUID := uuid.New().String() | ||
|
||
|
@@ -53,10 +57,6 @@ func NewNativeLocalBucketing(sdkKey string, platformData *api.PlatformData, opti | |
}, err | ||
} | ||
|
||
func (n *NativeLocalBucketing) GetUUID() string { | ||
return n.clientUUID | ||
} | ||
|
||
func (n *NativeLocalBucketing) StoreConfig(configJSON []byte, eTag, rayId, lastModified string) error { | ||
err := bucketing.SetConfig(configJSON, n.sdkKey, eTag, rayId, lastModified, n.eventQueue) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this for internal (and exposed) events to send in the notification channels.
RealtimeUpdates is used by the SDKProxy to rebroadcast the SSE messages received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add that context to the code as a comment