Skip to content

Commit

Permalink
Add push plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
andot committed May 16, 2021
1 parent 1e07f3c commit 3d5f4b7
Show file tree
Hide file tree
Showing 7 changed files with 637 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/json-iterator/go v1.1.11
github.com/modern-go/reflect2 v1.0.1
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
github.com/stretchr/testify v1.7.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
294 changes: 294 additions & 0 deletions rpc/plugins/push/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/*--------------------------------------------------------*\
| |
| hprose |
| |
| Official WebSite: https://hprose.com |
| |
| rpc/plugins/push/broker.go |
| |
| LastModified: May 16, 2021 |
| Author: Ma Bingyao <[email protected]> |
| |
\*________________________________________________________*/

package push

import (
"context"
"sync"
"time"

"github.com/hprose/hprose-golang/v3/rpc/core"
cmap "github.com/orcaman/concurrent-map"
)

type Broker struct {
*core.Service
messages sync.Map // map[string]map[string]messageCache
responders cmap.ConcurrentMap // map[string]chan map[string][]Message
signals cmap.ConcurrentMap // map[string]chan bool
MessageQueueMaxLength int
Timeout time.Duration
HeartBeat time.Duration
OnSubscribe func(ctx context.Context, id string, topic string)
OnUnsubscribe func(ctx context.Context, id string, topic string, messages []Message)
}

func NewBroker(service *core.Service) *Broker {
broker := &Broker{
Service: service,
responders: cmap.New(),
signals: cmap.New(),
MessageQueueMaxLength: 10,
Timeout: time.Minute * 2,
HeartBeat: time.Second * 10,
}
service.Use(broker.handler).
AddFunction(broker.subscribe, "+").
AddFunction(broker.unsubscribe, "-").
AddFunction(broker.message, "<").
AddFunction(broker.Unicast, ">").
AddFunction(broker.Multicast, ">?").
AddFunction(broker.Broadcast, ">*").
AddFunction(broker.Exists, "?").
AddFunction(broker.IdList, "|")
return broker
}

func (b *Broker) send(ctx context.Context, id string, responder chan map[string][]Message) bool {
var topics *sync.Map
if value, ok := b.messages.Load(id); ok {
topics = value.(*sync.Map)
}
if topics == nil {
responder <- nil
return true
}
var size int
result := make(map[string][]Message)
topics.Range(func(key, value interface{}) bool {
size++
topic := key.(string)
cache := value.(*MessageCache)
if cache == nil {
result[topic] = nil
topics.Delete(topic)
} else {
messages := cache.Take()
if len(messages) > 0 {
result[topic] = messages
}
}
return true
})
if size == 0 {
responder <- nil
return true
}
if len(result) == 0 {
return false
}
responder <- result
go b.doHeartBeat(ctx, id)
return true
}

func (b *Broker) doHeartBeat(ctx context.Context, id string) {
if b.HeartBeat <= 0 {
return
}
signal := make(chan bool, 1)
b.signals.Upsert(id, signal, func(exist bool, valueInMap interface{}, newValue interface{}) interface{} {
if exist {
close(valueInMap.(chan bool))
}
return newValue
})
ctx, cancel := context.WithTimeout(ctx, b.HeartBeat)
defer cancel()
select {
case <-ctx.Done():
if topics, ok := b.messages.Load(id); ok {
topics := topics.(*sync.Map)
topics.Range(func(key, value interface{}) bool {
b.offline(ctx, topics, id, key.(string))
return true
})
}
case <-signal:
}
}

func (b *Broker) getID(ctx context.Context) (id string) {
if id = core.GetServiceContext(ctx).RequestHeaders().GetString("id"); id == "" {
panic("client unique id not found")
}
return
}

func (b *Broker) subscribe(ctx context.Context, topic string) bool {
id := b.getID(ctx)
t, ok := b.messages.Load(id)
if !ok {
t, _ = b.messages.LoadOrStore(id, new(sync.Map))
}
topics := t.(*sync.Map)
if _, ok := topics.Load(topic); ok {
return false
}
_, loaded := topics.LoadOrStore(topic, new(MessageCache))
if !loaded && b.OnSubscribe != nil {
b.OnSubscribe(ctx, id, topic)

}
return !loaded
}

func (b *Broker) response(ctx context.Context, id string) {
if responder, ok := b.responders.Pop(id); ok {
responder := responder.(chan map[string][]Message)
if !b.send(ctx, id, responder) {
if !b.responders.SetIfAbsent(id, responder) {
responder <- nil
}
}
}
}

func (b *Broker) offline(ctx context.Context, topics *sync.Map, id string, topic string) bool {
if messages, ok := topics.Load(topic); ok {
topics.Delete(topic)
if b.OnUnsubscribe != nil {
b.OnUnsubscribe(ctx, id, topic, messages.(*MessageCache).Take())
}
b.response(ctx, id)
return true
}
return false
}

func (b *Broker) unsubscribe(ctx context.Context, topic string) bool {
id := b.getID(ctx)
if topics, ok := b.messages.Load(id); ok {
return b.offline(ctx, topics.(*sync.Map), id, topic)
}
return false
}

func (b *Broker) message(ctx context.Context) map[string][]Message {
id := b.getID(ctx)
if responder, ok := b.responders.Pop(id); ok {
responder.(chan map[string][]Message) <- nil
}
if signal, ok := b.signals.Pop(id); ok {
close(signal.(chan bool))
}
responder := make(chan map[string][]Message, 1)
if !b.send(ctx, id, responder) {
b.responders.Upsert(id, responder, func(exist bool, valueInMap interface{}, newValue interface{}) interface{} {
if exist {
valueInMap.(chan map[string][]Message) <- nil
}
return newValue
})
if b.Timeout > 0 {
ctx, cancel := context.WithTimeout(ctx, b.Timeout)
defer cancel()
select {
case <-ctx.Done():
go b.doHeartBeat(ctx, id)
return map[string][]Message{}
case result := <-responder:
return result
}
}
}
return <-responder
}

func (b *Broker) Unicast(ctx context.Context, data interface{}, topic string, id string, from string) bool {
if topics, ok := b.messages.Load(id); ok {
if cache, ok := topics.(*sync.Map).Load(topic); ok && cache != nil {
cache.(*MessageCache).Append(Message{Data: data, From: from})
b.response(ctx, id)
return true
}
}
return false
}

func (b *Broker) Multicast(ctx context.Context, data interface{}, topic string, ids []string, from string) map[string]bool {
result := make(map[string]bool)
for _, id := range ids {
result[id] = b.Unicast(ctx, data, topic, id, from)
}
return result
}

func (b *Broker) Broadcast(ctx context.Context, data interface{}, topic string, from string) map[string]bool {
result := make(map[string]bool)
b.messages.Range(func(key, value interface{}) bool {
id := key.(string)
topics := value.(*sync.Map)
if cache, ok := topics.Load(topic); ok && cache != nil {
cache.(*MessageCache).Append(Message{Data: data, From: from})
b.response(ctx, id)
result[id] = true
}
result[id] = false
return true
})
return result
}

func (b *Broker) Deny(ctx context.Context, id string, topic string) {
if topics, ok := b.messages.Load(id); ok {
topics := topics.(*sync.Map)
if topic != "" {
if cache, ok := topics.Load(topic); ok && cache != nil {
topics.Store(topic, nil)
}
} else {
topics.Range(func(key, _ interface{}) bool {
topics.Store(key, nil)
return false
})
}
b.response(ctx, id)
}
}

func (b *Broker) Exists(topic string, id string) bool {
if topics, ok := b.messages.Load(id); ok {
if cache, ok := topics.(*sync.Map).Load(topic); ok {
return cache != nil
}
}
return false
}

func (b *Broker) IdList(topic string) (idlist []string) {
b.messages.Range(func(key, value interface{}) bool {
id := key.(string)
topics := value.(*sync.Map)
if cache, ok := topics.Load(topic); ok && cache != nil {
idlist = append(idlist, id)
}
return true
})
return
}

func (b *Broker) handler(ctx context.Context, name string, args []interface{}, next core.NextInvokeHandler) (result []interface{}, err error) {
serviceContext := core.GetServiceContext(ctx)
var from string
if id := serviceContext.RequestHeaders().GetString("id"); id != "" {
from = id
}
switch name {
case ">", ">?", ">*":
args = append(args, from)
}
serviceContext.Items().Set("producer", producer{b, from})
return next(ctx, name, args)
}
48 changes: 48 additions & 0 deletions rpc/plugins/push/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*--------------------------------------------------------*\
| |
| hprose |
| |
| Official WebSite: https://hprose.com |
| |
| rpc/plugins/push/message.go |
| |
| LastModified: May 16, 2021 |
| Author: Ma Bingyao <[email protected]> |
| |
\*________________________________________________________*/

package push

import (
"sync"

"github.com/hprose/hprose-golang/v3/encoding"
)

type Message struct {
Data interface{} `json:"data"`
From string `json:"from"`
}

type MessageCache struct {
m []Message
l sync.Mutex
}

func (m *MessageCache) Append(message Message) {
m.l.Lock()
defer m.l.Unlock()
m.m = append(m.m, message)
}

func (m *MessageCache) Take() (result []Message) {
m.l.Lock()
defer m.l.Unlock()
result = m.m
m.m = nil
return
}

func init() {
encoding.RegisterAlias((*Message)(nil), "@")
}
Loading

0 comments on commit 3d5f4b7

Please sign in to comment.