-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
111 lines (87 loc) · 1.91 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package art
import (
"context"
"sync"
"github.com/gookit/goutil/maputil"
)
func GetMessage() *Message {
return messagePool.Get()
}
func PutMessage(message *Message) {
message.reset()
messagePool.Put(message)
}
var messagePool = newPool(newMessage)
func newMessage() *Message {
return &Message{
RouteParam: map[string]any{},
Metadata: map[string]any{},
Ctx: context.Background(),
}
}
type Message struct {
Subject string
Bytes []byte
Body any
identifier string
Mutex sync.Mutex
// RouteParam are used to capture values from subject.
// These parameters represent resources or identifiers.
//
// Example:
//
// define mux subject = "/users/{id}"
// send or recv subject = "/users/1017"
//
// get route param:
// key : value => id : 1017
RouteParam maputil.Data
Metadata maputil.Data
RawInfra any
Ctx context.Context
}
func (msg *Message) UpdateContext(updates ...func(ctx context.Context) context.Context) context.Context {
for _, update := range updates {
msg.Ctx = update(msg.Ctx)
}
return msg.Ctx
}
func (msg *Message) MsgId() string {
if msg.identifier == "" {
msg.identifier = GenerateUlid()
}
return msg.identifier
}
func (msg *Message) SetMsgId(msgId string) {
msg.identifier = msgId
}
func (msg *Message) reset() {
msg.Subject = ""
msg.Bytes = nil
msg.Body = nil
msg.identifier = ""
for key := range msg.RouteParam {
delete(msg.RouteParam, key)
}
for key := range msg.Metadata {
delete(msg.Metadata, key)
}
msg.RawInfra = nil
msg.Ctx = context.Background()
}
func (msg *Message) Copy() *Message {
message := GetMessage()
message.Subject = msg.Subject
message.Bytes = msg.Bytes
message.Body = msg.Body
message.identifier = msg.identifier
for key, v := range msg.RouteParam {
message.RouteParam.Set(key, v)
}
for key, v := range msg.Metadata {
message.Metadata.Set(key, v)
}
message.RawInfra = msg.RawInfra
message.Ctx = msg.Ctx
return message
}