Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement advanced subscription #144

Open
wants to merge 1 commit into
base: v2
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type procedureDesc struct {
}

type eventDesc struct {
topic string
handler EventHandler
topic string
handler EventHandler
advHandler AdvancedEventHandler
}

// NewWebsocketClient creates a new websocket client connected to the specified
Expand Down Expand Up @@ -276,7 +277,15 @@ func (c *Client) handleEvent(msg *Event) {
sync := make(chan struct{})
c.acts <- func() {
if event, ok := c.events[msg.Subscription]; ok {
go event.handler(msg.Arguments, msg.ArgumentsKw)
if event.handler == nil {
if event.advHandler == nil {
log.Println("No handler registered for subscription:", msg.Subscription)
} else {
go event.advHandler(msg.Arguments, msg.ArgumentsKw, msg.Details)
}
} else {
go event.handler(msg.Arguments, msg.ArgumentsKw)
}
} else {
log.Println("no handler registered for subscription:", msg.Subscription)
}
Expand Down Expand Up @@ -389,6 +398,9 @@ func (c *Client) waitOnListener(id ID) (msg Message, err error) {

// EventHandler handles a publish event.
type EventHandler func(args []interface{}, kwargs map[string]interface{})
type AdvancedEventHandler func(
args []interface{}, kwargs map[string]interface{}, details map[string]interface{},
)

// Subscribe registers the EventHandler to be called for every message in the provided topic.
func (c *Client) Subscribe(topic string, options map[string]interface{}, fn EventHandler) error {
Expand Down Expand Up @@ -418,7 +430,40 @@ func (c *Client) Subscribe(topic string, options map[string]interface{}, fn Even
// register the event handler with this subscription
sync := make(chan struct{})
c.acts <- func() {
c.events[subscribed.Subscription] = &eventDesc{topic, fn}
c.events[subscribed.Subscription] = &eventDesc{topic, fn, nil}
sync <- struct{}{}
}
<-sync
}
return nil
}

func (c *Client) AdvancedSubscribe(topic string, options map[string]interface{}, fn AdvancedEventHandler) error {
if options == nil {
options = make(map[string]interface{})
}
id := NewID()
c.registerListener(id)
sub := &Subscribe{
Request: id,
Options: options,
Topic: URI(topic),
}
err := c.Send(sub)
if err != nil {
return err
}
var msg Message
if msg, err = c.waitOnListener(id); err != nil {
return err
} else if e, ok := msg.(*Error); ok {
return fmt.Errorf("error subscribing to topic '%v': %v", topic, e.Error)
} else if subscribed, ok := msg.(*Subscribed); !ok {
return fmt.Errorf(formatUnexpectedMessage(msg, SUBSCRIBED))
} else {
sync := make(chan struct{})
c.acts <- func() {
c.events[subscribed.Subscription] = &eventDesc{topic, nil, fn}
sync <- struct{}{}
}
<-sync
Expand Down