-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublish.go
56 lines (50 loc) · 1.33 KB
/
publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package rabbit
import (
"encoding/json"
amqp "github.com/rabbitmq/amqp091-go"
"log/slog"
)
type publisher struct {
reconnect func() (*amqp.Connection, error)
conn *amqp.Connection
ch *amqp.Channel
opt *Options
logger *slog.Logger
}
func (p *publisher) init() (Publisher, error) {
var err error
if p.conn != nil && !p.conn.IsClosed() {
return p, err
}
p.conn, err = p.reconnect()
if err != nil {
return nil, err
}
p.ch, err = p.conn.Channel()
return p, p.ch.ExchangeDeclare(p.opt.Exchange, string(p.opt.Type), p.opt.Durable, false, false, false, nil)
}
func (p *publisher) Publishing(key, mineType string, data []byte) error {
if _, err := p.init(); err != nil {
return err
}
if key == "" {
key = p.opt.RoutingKey
}
return p.ch.Publish(p.opt.Exchange, key, false, false, amqp.Publishing{Type: mineType, Body: data})
}
func (p *publisher) Publish(encoder Encoder, data interface{}) error {
routing, ret, err := encoder.Encode(data)
if err != nil {
p.logger.Warn("encode data error", "error", err)
return err
}
return p.Publishing(routing, encoder.MimeType(), ret)
}
func (p *publisher) PublishJson(key string, v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
p.logger.Warn("serialize data error", "error", err)
return err
}
return p.Publishing(key, "application/json", data)
}