diff --git a/client.go b/client.go index 14bb8a5..63b5b5c 100644 --- a/client.go +++ b/client.go @@ -48,8 +48,9 @@ type procedureDesc struct { } type eventDesc struct { - topic string - handler EventHandler + topic string + handler EventHandler + advHandler AdvancedEventHandler } // NewWebsocketClient creates a new websocket client connected to the specified @@ -276,7 +277,15 @@ func (c *Client) handleEvent(msg *Event) { sync := make(chan struct{}) c.acts <- func() { if event, ok := c.events[msg.Subscription]; ok { - go event.handler(msg.Arguments, msg.ArgumentsKw) + if event.handler == nil { + if event.advHandler == nil { + log.Println("No handler registered for subscription:", msg.Subscription) + } else { + go event.advHandler(msg.Arguments, msg.ArgumentsKw, msg.Details) + } + } else { + go event.handler(msg.Arguments, msg.ArgumentsKw) + } } else { log.Println("no handler registered for subscription:", msg.Subscription) } @@ -389,6 +398,9 @@ func (c *Client) waitOnListener(id ID) (msg Message, err error) { // EventHandler handles a publish event. type EventHandler func(args []interface{}, kwargs map[string]interface{}) +type AdvancedEventHandler func( + args []interface{}, kwargs map[string]interface{}, details map[string]interface{}, +) // Subscribe registers the EventHandler to be called for every message in the provided topic. func (c *Client) Subscribe(topic string, options map[string]interface{}, fn EventHandler) error { @@ -418,7 +430,40 @@ func (c *Client) Subscribe(topic string, options map[string]interface{}, fn Even // register the event handler with this subscription sync := make(chan struct{}) c.acts <- func() { - c.events[subscribed.Subscription] = &eventDesc{topic, fn} + c.events[subscribed.Subscription] = &eventDesc{topic, fn, nil} + sync <- struct{}{} + } + <-sync + } + return nil +} + +func (c *Client) AdvancedSubscribe(topic string, options map[string]interface{}, fn AdvancedEventHandler) error { + if options == nil { + options = make(map[string]interface{}) + } + id := NewID() + c.registerListener(id) + sub := &Subscribe{ + Request: id, + Options: options, + Topic: URI(topic), + } + err := c.Send(sub) + if err != nil { + return err + } + var msg Message + if msg, err = c.waitOnListener(id); err != nil { + return err + } else if e, ok := msg.(*Error); ok { + return fmt.Errorf("error subscribing to topic '%v': %v", topic, e.Error) + } else if subscribed, ok := msg.(*Subscribed); !ok { + return fmt.Errorf(formatUnexpectedMessage(msg, SUBSCRIBED)) + } else { + sync := make(chan struct{}) + c.acts <- func() { + c.events[subscribed.Subscription] = &eventDesc{topic, nil, fn} sync <- struct{}{} } <-sync