From 3d5f4b7e8bdc8e33066e2387e846664ec20d380a Mon Sep 17 00:00:00 2001 From: andot Date: Sat, 15 May 2021 11:56:24 +0800 Subject: [PATCH] Add push plugin. --- go.mod | 1 + go.sum | 2 + rpc/plugins/push/broker.go | 294 +++++++++++++++++++++++++++++++++++ rpc/plugins/push/message.go | 48 ++++++ rpc/plugins/push/producer.go | 79 ++++++++++ rpc/plugins/push/prosumer.go | 166 ++++++++++++++++++++ rpc/rpc_test.go | 47 ++++++ 7 files changed, 637 insertions(+) create mode 100644 rpc/plugins/push/broker.go create mode 100644 rpc/plugins/push/message.go create mode 100644 rpc/plugins/push/producer.go create mode 100644 rpc/plugins/push/prosumer.go diff --git a/go.mod b/go.mod index 9176ce5..1f47104 100644 --- a/go.mod +++ b/go.mod @@ -8,5 +8,6 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/json-iterator/go v1.1.11 github.com/modern-go/reflect2 v1.0.1 + github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index bc810ea..04a7333 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/rpc/plugins/push/broker.go b/rpc/plugins/push/broker.go new file mode 100644 index 0000000..d1bbae0 --- /dev/null +++ b/rpc/plugins/push/broker.go @@ -0,0 +1,294 @@ +/*--------------------------------------------------------*\ +| | +| hprose | +| | +| Official WebSite: https://hprose.com | +| | +| rpc/plugins/push/broker.go | +| | +| LastModified: May 16, 2021 | +| Author: Ma Bingyao | +| | +\*________________________________________________________*/ + +package push + +import ( + "context" + "sync" + "time" + + "github.com/hprose/hprose-golang/v3/rpc/core" + cmap "github.com/orcaman/concurrent-map" +) + +type Broker struct { + *core.Service + messages sync.Map // map[string]map[string]messageCache + responders cmap.ConcurrentMap // map[string]chan map[string][]Message + signals cmap.ConcurrentMap // map[string]chan bool + MessageQueueMaxLength int + Timeout time.Duration + HeartBeat time.Duration + OnSubscribe func(ctx context.Context, id string, topic string) + OnUnsubscribe func(ctx context.Context, id string, topic string, messages []Message) +} + +func NewBroker(service *core.Service) *Broker { + broker := &Broker{ + Service: service, + responders: cmap.New(), + signals: cmap.New(), + MessageQueueMaxLength: 10, + Timeout: time.Minute * 2, + HeartBeat: time.Second * 10, + } + service.Use(broker.handler). + AddFunction(broker.subscribe, "+"). + AddFunction(broker.unsubscribe, "-"). + AddFunction(broker.message, "<"). + AddFunction(broker.Unicast, ">"). + AddFunction(broker.Multicast, ">?"). + AddFunction(broker.Broadcast, ">*"). + AddFunction(broker.Exists, "?"). + AddFunction(broker.IdList, "|") + return broker +} + +func (b *Broker) send(ctx context.Context, id string, responder chan map[string][]Message) bool { + var topics *sync.Map + if value, ok := b.messages.Load(id); ok { + topics = value.(*sync.Map) + } + if topics == nil { + responder <- nil + return true + } + var size int + result := make(map[string][]Message) + topics.Range(func(key, value interface{}) bool { + size++ + topic := key.(string) + cache := value.(*MessageCache) + if cache == nil { + result[topic] = nil + topics.Delete(topic) + } else { + messages := cache.Take() + if len(messages) > 0 { + result[topic] = messages + } + } + return true + }) + if size == 0 { + responder <- nil + return true + } + if len(result) == 0 { + return false + } + responder <- result + go b.doHeartBeat(ctx, id) + return true +} + +func (b *Broker) doHeartBeat(ctx context.Context, id string) { + if b.HeartBeat <= 0 { + return + } + signal := make(chan bool, 1) + b.signals.Upsert(id, signal, func(exist bool, valueInMap interface{}, newValue interface{}) interface{} { + if exist { + close(valueInMap.(chan bool)) + } + return newValue + }) + ctx, cancel := context.WithTimeout(ctx, b.HeartBeat) + defer cancel() + select { + case <-ctx.Done(): + if topics, ok := b.messages.Load(id); ok { + topics := topics.(*sync.Map) + topics.Range(func(key, value interface{}) bool { + b.offline(ctx, topics, id, key.(string)) + return true + }) + } + case <-signal: + } +} + +func (b *Broker) getID(ctx context.Context) (id string) { + if id = core.GetServiceContext(ctx).RequestHeaders().GetString("id"); id == "" { + panic("client unique id not found") + } + return +} + +func (b *Broker) subscribe(ctx context.Context, topic string) bool { + id := b.getID(ctx) + t, ok := b.messages.Load(id) + if !ok { + t, _ = b.messages.LoadOrStore(id, new(sync.Map)) + } + topics := t.(*sync.Map) + if _, ok := topics.Load(topic); ok { + return false + } + _, loaded := topics.LoadOrStore(topic, new(MessageCache)) + if !loaded && b.OnSubscribe != nil { + b.OnSubscribe(ctx, id, topic) + + } + return !loaded +} + +func (b *Broker) response(ctx context.Context, id string) { + if responder, ok := b.responders.Pop(id); ok { + responder := responder.(chan map[string][]Message) + if !b.send(ctx, id, responder) { + if !b.responders.SetIfAbsent(id, responder) { + responder <- nil + } + } + } +} + +func (b *Broker) offline(ctx context.Context, topics *sync.Map, id string, topic string) bool { + if messages, ok := topics.Load(topic); ok { + topics.Delete(topic) + if b.OnUnsubscribe != nil { + b.OnUnsubscribe(ctx, id, topic, messages.(*MessageCache).Take()) + } + b.response(ctx, id) + return true + } + return false +} + +func (b *Broker) unsubscribe(ctx context.Context, topic string) bool { + id := b.getID(ctx) + if topics, ok := b.messages.Load(id); ok { + return b.offline(ctx, topics.(*sync.Map), id, topic) + } + return false +} + +func (b *Broker) message(ctx context.Context) map[string][]Message { + id := b.getID(ctx) + if responder, ok := b.responders.Pop(id); ok { + responder.(chan map[string][]Message) <- nil + } + if signal, ok := b.signals.Pop(id); ok { + close(signal.(chan bool)) + } + responder := make(chan map[string][]Message, 1) + if !b.send(ctx, id, responder) { + b.responders.Upsert(id, responder, func(exist bool, valueInMap interface{}, newValue interface{}) interface{} { + if exist { + valueInMap.(chan map[string][]Message) <- nil + } + return newValue + }) + if b.Timeout > 0 { + ctx, cancel := context.WithTimeout(ctx, b.Timeout) + defer cancel() + select { + case <-ctx.Done(): + go b.doHeartBeat(ctx, id) + return map[string][]Message{} + case result := <-responder: + return result + } + } + } + return <-responder +} + +func (b *Broker) Unicast(ctx context.Context, data interface{}, topic string, id string, from string) bool { + if topics, ok := b.messages.Load(id); ok { + if cache, ok := topics.(*sync.Map).Load(topic); ok && cache != nil { + cache.(*MessageCache).Append(Message{Data: data, From: from}) + b.response(ctx, id) + return true + } + } + return false +} + +func (b *Broker) Multicast(ctx context.Context, data interface{}, topic string, ids []string, from string) map[string]bool { + result := make(map[string]bool) + for _, id := range ids { + result[id] = b.Unicast(ctx, data, topic, id, from) + } + return result +} + +func (b *Broker) Broadcast(ctx context.Context, data interface{}, topic string, from string) map[string]bool { + result := make(map[string]bool) + b.messages.Range(func(key, value interface{}) bool { + id := key.(string) + topics := value.(*sync.Map) + if cache, ok := topics.Load(topic); ok && cache != nil { + cache.(*MessageCache).Append(Message{Data: data, From: from}) + b.response(ctx, id) + result[id] = true + } + result[id] = false + return true + }) + return result +} + +func (b *Broker) Deny(ctx context.Context, id string, topic string) { + if topics, ok := b.messages.Load(id); ok { + topics := topics.(*sync.Map) + if topic != "" { + if cache, ok := topics.Load(topic); ok && cache != nil { + topics.Store(topic, nil) + } + } else { + topics.Range(func(key, _ interface{}) bool { + topics.Store(key, nil) + return false + }) + } + b.response(ctx, id) + } +} + +func (b *Broker) Exists(topic string, id string) bool { + if topics, ok := b.messages.Load(id); ok { + if cache, ok := topics.(*sync.Map).Load(topic); ok { + return cache != nil + } + } + return false +} + +func (b *Broker) IdList(topic string) (idlist []string) { + b.messages.Range(func(key, value interface{}) bool { + id := key.(string) + topics := value.(*sync.Map) + if cache, ok := topics.Load(topic); ok && cache != nil { + idlist = append(idlist, id) + } + return true + }) + return +} + +func (b *Broker) handler(ctx context.Context, name string, args []interface{}, next core.NextInvokeHandler) (result []interface{}, err error) { + serviceContext := core.GetServiceContext(ctx) + var from string + if id := serviceContext.RequestHeaders().GetString("id"); id != "" { + from = id + } + switch name { + case ">", ">?", ">*": + args = append(args, from) + } + serviceContext.Items().Set("producer", producer{b, from}) + return next(ctx, name, args) +} diff --git a/rpc/plugins/push/message.go b/rpc/plugins/push/message.go new file mode 100644 index 0000000..44fd36f --- /dev/null +++ b/rpc/plugins/push/message.go @@ -0,0 +1,48 @@ +/*--------------------------------------------------------*\ +| | +| hprose | +| | +| Official WebSite: https://hprose.com | +| | +| rpc/plugins/push/message.go | +| | +| LastModified: May 16, 2021 | +| Author: Ma Bingyao | +| | +\*________________________________________________________*/ + +package push + +import ( + "sync" + + "github.com/hprose/hprose-golang/v3/encoding" +) + +type Message struct { + Data interface{} `json:"data"` + From string `json:"from"` +} + +type MessageCache struct { + m []Message + l sync.Mutex +} + +func (m *MessageCache) Append(message Message) { + m.l.Lock() + defer m.l.Unlock() + m.m = append(m.m, message) +} + +func (m *MessageCache) Take() (result []Message) { + m.l.Lock() + defer m.l.Unlock() + result = m.m + m.m = nil + return +} + +func init() { + encoding.RegisterAlias((*Message)(nil), "@") +} diff --git a/rpc/plugins/push/producer.go b/rpc/plugins/push/producer.go new file mode 100644 index 0000000..d337b39 --- /dev/null +++ b/rpc/plugins/push/producer.go @@ -0,0 +1,79 @@ +/*--------------------------------------------------------*\ +| | +| hprose | +| | +| Official WebSite: https://hprose.com | +| | +| rpc/plugins/push/producer.go | +| | +| LastModified: May 11, 2021 | +| Author: Ma Bingyao | +| | +\*________________________________________________________*/ + +package push + +import "context" + +type Producer interface { + From() string + Unicast(ctx context.Context, data interface{}, topic string, id string) bool + Multicast(ctx context.Context, data interface{}, topic string, ids []string) map[string]bool + Broadcast(ctx context.Context, data interface{}, topic string) map[string]bool + Push(ctx context.Context, data interface{}, topic string, id ...string) map[string]bool + Deny(ctx context.Context, id string, topic string) + Exists(topic string, id string) bool + IdList(topic string) []string +} + +type producer struct { + broker *Broker + from string +} + +func (p producer) From() string { + return p.from +} + +func (p producer) Unicast(ctx context.Context, data interface{}, topic string, id string) bool { + return p.broker.Unicast(ctx, data, topic, id, p.from) +} + +func (p producer) Multicast(ctx context.Context, data interface{}, topic string, ids []string) map[string]bool { + return p.broker.Multicast(ctx, data, topic, ids, p.from) +} + +func (p producer) Broadcast(ctx context.Context, data interface{}, topic string) map[string]bool { + return p.broker.Broadcast(ctx, data, topic, p.from) +} + +func (p producer) Push(ctx context.Context, data interface{}, topic string, id ...string) map[string]bool { + switch len(id) { + case 0: + return p.broker.Broadcast(ctx, data, topic, p.from) + case 1: + return map[string]bool{ + id[0]: p.broker.Unicast(ctx, data, topic, id[0], p.from), + } + default: + return p.broker.Multicast(ctx, data, topic, id, p.from) + } +} + +func (p producer) Deny(ctx context.Context, id string, topic string) { + if id == "" { + id = p.from + } + p.broker.Deny(ctx, id, topic) +} + +func (p producer) Exists(topic string, id string) bool { + if id == "" { + id = p.from + } + return p.broker.Exists(topic, id) +} + +func (p producer) IdList(topic string) []string { + return p.broker.IdList(topic) +} diff --git a/rpc/plugins/push/prosumer.go b/rpc/plugins/push/prosumer.go new file mode 100644 index 0000000..a61e00d --- /dev/null +++ b/rpc/plugins/push/prosumer.go @@ -0,0 +1,166 @@ +/*--------------------------------------------------------*\ +| | +| hprose | +| | +| Official WebSite: https://hprose.com | +| | +| rpc/plugins/push/prosumer.go | +| | +| LastModified: May 16, 2021 | +| Author: Ma Bingyao | +| | +\*________________________________________________________*/ + +package push + +import ( + "sync" + "time" + + "github.com/hprose/hprose-golang/v3/rpc/core" +) + +type Prosumer struct { + client *core.Client + proxy prosumer + callbacks sync.Map // map[string]func(Message) + RetryInterval time.Duration + OnError func(error) + OnSubscribe func(topic string) + OnUnsubscribe func(topic string) +} + +type prosumer struct { + message func() (map[string][]Message, error) `name:"<"` + subscribe func(topic string) (bool, error) `name:"+"` + unsubscribe func(topic string) (bool, error) `name:"-"` + unicast func(data interface{}, topic string, id string) (bool, error) `name:">"` + multicast func(data interface{}, topic string, ids []string) (map[string]bool, error) `name:">?"` + broadcast func(data interface{}, topic string) (map[string]bool, error) `name:">*"` + exists func(topic string, id string) (bool, error) `name:"?"` + idList func(topic string) ([]string, error) `name:"|"` +} + +func NewProsumer(client *core.Client, id ...string) *Prosumer { + p := &Prosumer{ + client: client, + } + if len(id) > 0 && id[0] != "" { + p.SetID(id[0]) + } + p.client.UseService(&p.proxy) + return p +} + +func (p *Prosumer) Client() *core.Client { + return p.client +} + +func (p *Prosumer) ID() (id string) { + if id = p.client.RequestHeaders().GetString("id"); id == "" { + panic("client unique id not found") + } + return +} + +func (p *Prosumer) SetID(id string) { + p.client.RequestHeaders().Set("id", id) +} + +func (p *Prosumer) dispatch(topics map[string][]Message) { + for topic, messages := range topics { + if callback, ok := p.callbacks.Load(topic); ok { + if messages == nil { + p.callbacks.Delete(topic) + if p.OnUnsubscribe != nil { + p.OnUnsubscribe(topic) + } + } else { + for _, message := range messages { + callback.(func(Message))(message) + } + } + } + } +} + +func (p *Prosumer) message() { + for { + topics, err := p.proxy.message() + if err != nil { + if err != core.ErrTimeout { + if p.RetryInterval != 0 { + <-time.After(p.RetryInterval) + } + if p.OnError != nil { + p.OnError(err) + } + } + continue + } + if topics == nil { + return + } + go p.dispatch(topics) + } +} + +func (p *Prosumer) Subscribe(topic string, callback func(Message)) (result bool, err error) { + if p.ID() != "" { + p.callbacks.Store(topic, callback) + result, err = p.proxy.subscribe(topic) + go p.message() + if p.OnSubscribe != nil { + p.OnSubscribe(topic) + } + } + return +} + +func (p *Prosumer) Unsubscribe(topic string) (result bool, err error) { + if p.ID() != "" { + result, err = p.proxy.unsubscribe(topic) + p.callbacks.Delete(topic) + if p.OnUnsubscribe != nil { + p.OnUnsubscribe(topic) + } + } + return +} + +func (p *Prosumer) Unicast(data interface{}, topic string, id string) (bool, error) { + return p.proxy.unicast(data, topic, id) +} + +func (p *Prosumer) Multicast(data interface{}, topic string, ids []string) (map[string]bool, error) { + return p.proxy.multicast(data, topic, ids) +} + +func (p *Prosumer) Broadcast(data interface{}, topic string) (map[string]bool, error) { + return p.proxy.broadcast(data, topic) +} + +func (p *Prosumer) Push(data interface{}, topic string, id ...string) (map[string]bool, error) { + switch len(id) { + case 0: + return p.Broadcast(data, topic) + case 1: + result, err := p.Unicast(data, topic, id[0]) + return map[string]bool{ + id[0]: result, + }, err + default: + return p.Multicast(data, topic, id) + } +} + +func (p *Prosumer) Exists(topic string, id string) (bool, error) { + if id == "" { + id = p.ID() + } + return p.proxy.exists(topic, id) +} + +func (p *Prosumer) IdList(topic string) ([]string, error) { + return p.proxy.idList(topic) +} diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 8480697..3ddd514 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/hprose/hprose-golang/v3/encoding" "github.com/hprose/hprose-golang/v3/rpc" "github.com/hprose/hprose-golang/v3/rpc/plugins/log" + "github.com/hprose/hprose-golang/v3/rpc/plugins/push" "github.com/stretchr/testify/assert" ) @@ -762,3 +763,49 @@ func TestWebSocket(t *testing.T) { server.Close() } + +func TestPush(t *testing.T) { + service := push.NewBroker(rpc.NewService()) + server, err := net.Listen("tcp", "127.0.0.1:8412") + assert.NoError(t, err) + err = service.Bind(server) + assert.NoError(t, err) + + time.Sleep(time.Millisecond * 5) + + client1 := rpc.NewClient("tcp://127.0.0.1/") + client1.Use(log.Plugin.IOHandler) + prosumer1 := push.NewProsumer(client1, "1") + prosumer1.OnSubscribe = func(topic string) { + fmt.Println(topic, "is subscribed.") + } + prosumer1.OnUnsubscribe = func(topic string) { + fmt.Println(topic, "is unsubscribed.") + } + client2 := rpc.NewClient("tcp://127.0.0.1/") + client2.Use(log.Plugin.IOHandler) + prosumer2 := push.NewProsumer(client2, "2") + prosumer1.Subscribe("test", func(message push.Message) { + fmt.Println(message) + }) + prosumer1.Subscribe("test2", func(message push.Message) { + fmt.Println(message) + }) + time.Sleep(time.Millisecond * 100) + var wg sync.WaitGroup + n := 1000 + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + prosumer2.Push(i, "test", "1") + wg.Done() + }(i) + } + wg.Wait() + time.Sleep(time.Millisecond * 100) + prosumer1.Unsubscribe("test") + prosumer1.Unsubscribe("test2") + + assert.NoError(t, err) + server.Close() +}