Skip to content

Commit

Permalink
fixed generic callback function for Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
andot committed May 17, 2021
1 parent cfb85d8 commit 796cf79
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
47 changes: 32 additions & 15 deletions rpc/plugins/push/prosumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func NewProsumer(client *core.Client, id ...string) *Prosumer {
return p
}

func (p *Prosumer) onError(err error) {
if p.OnError != nil {
p.OnError(err)
}
}

func (p *Prosumer) onSubscribe(topic string) {
if p.OnSubscribe != nil {
p.OnSubscribe(topic)
}
}

func (p *Prosumer) onUnsubscribe(topic string) {
if p.OnUnsubscribe != nil {
p.OnUnsubscribe(topic)
}
}

func (p *Prosumer) Client() *core.Client {
return p.client
}
Expand Down Expand Up @@ -99,14 +117,19 @@ func (p *Prosumer) call(callback Callback, message Message) {
default:
v := reflect.ValueOf(callback)
t := v.Type()
switch t.NumIn() {
case 1:
if data, err := io.Convert(message.Data, t.In(0)); err != nil {
v.Call([]reflect.Value{reflect.ValueOf(data)})
if n := t.NumIn(); n >= 1 {
data, err := io.Convert(message.Data, t.In(0))
if err != nil {
p.onError(err)
return
}
case 2:
if data, err := io.Convert(message.Data, t.In(0)); err != nil {
switch n {
case 1:
v.Call([]reflect.Value{reflect.ValueOf(data)})
case 2:
v.Call([]reflect.Value{reflect.ValueOf(data), reflect.ValueOf(message.From)})
default:
panic("invalid callback: " + t.String())
}
}
}
Expand All @@ -120,9 +143,7 @@ func (p *Prosumer) message() {
if p.RetryInterval != 0 {
<-time.After(p.RetryInterval)
}
if p.OnError != nil {
p.OnError(err)
}
p.onError(err)
}
continue
}
Expand All @@ -138,9 +159,7 @@ func (p *Prosumer) Subscribe(topic string, callback Callback) (result bool, err
p.callbacks.Store(topic, callback)
result, err = p.proxy.subscribe(topic)
go p.message()
if p.OnSubscribe != nil {
p.OnSubscribe(topic)
}
p.onSubscribe(topic)
}
return
}
Expand All @@ -149,9 +168,7 @@ func (p *Prosumer) Unsubscribe(topic string) (result bool, err error) {
if p.ID() != "" {
result, err = p.proxy.unsubscribe(topic)
p.callbacks.Delete(topic)
if p.OnUnsubscribe != nil {
p.OnUnsubscribe(topic)
}
p.onUnsubscribe(topic)
}
return
}
Expand Down
11 changes: 7 additions & 4 deletions rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,19 +779,22 @@ func TestPush(t *testing.T) {
time.Sleep(time.Millisecond * 5)

client1 := rpc.NewClient("tcp://127.0.0.1/")
client1.Use(log.Plugin.IOHandler)
//client1.Use(log.Plugin.IOHandler)
prosumer1 := push.NewProsumer(client1, "1")
prosumer1.OnError = func(e error) {
fmt.Println(e.Error())
}
prosumer1.OnSubscribe = func(topic string) {
fmt.Println(topic, "is subscribed.")
}
prosumer1.OnUnsubscribe = func(topic string) {
fmt.Println(topic, "is unsubscribed.")
}
client2 := rpc.NewClient("tcp://127.0.0.1/")
client2.Use(log.Plugin.IOHandler)
//client2.Use(log.Plugin.IOHandler)
prosumer2 := push.NewProsumer(client2, "2")
prosumer1.Subscribe("test", func(data string) {
fmt.Println(data)
prosumer1.Subscribe("test", func(data int, from string) {
fmt.Printf("%v from %v\n", data, from)
})
prosumer1.Subscribe("test2", func(message push.Message) {
fmt.Println(message)
Expand Down

0 comments on commit 796cf79

Please sign in to comment.