forked from nvisibleinc/go-ari-library
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbitmq.go
104 lines (94 loc) · 2.24 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package ari
import (
"github.com/streadway/amqp"
)
type rabbitmqConfig struct {
URL string `json:"url"`
}
type RabbitMQ struct {
config rabbitmqConfig
producerConn *amqp.Connection
consumerConn *amqp.Connection
}
func (r *RabbitMQ) InitBus(config interface{}) error {
var err error
c := config.(map[string]interface{})
for key, value := range c {
switch key {
case "url":
r.config.URL = value.(string)
}
}
r.producerConn, err = amqp.Dial(r.config.URL)
if err != nil {
return err
}
r.consumerConn, err = amqp.Dial(r.config.URL)
if err != nil {
return err
}
return nil
}
func (r *RabbitMQ) StartProducer(topic string) (chan []byte, error) {
c := make(chan []byte)
channel, err := r.producerConn.Channel()
_, err = channel.QueueDeclare(
topic, // name of queue
true, // durable
true, // delete when unused
false, // exclusive
true, // nowait
nil) // arguments
go func(channel *amqp.Channel, messages chan []byte) {
for message := range messages {
channel.Publish(
"", // exchange, for now always using the default exchange
topic,
false,
false,
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
Body: message,
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
})
}
}(channel, c)
if err != nil {
return nil, err
}
return c, nil
}
func (r *RabbitMQ) StartConsumer(topic string) (chan []byte, error) {
c := make(chan []byte)
channel, err := r.consumerConn.Channel()
if err != nil {
return nil, err
}
queue, err := channel.QueueDeclare(
topic, // name of queue
true, // durable
true, // delete when unused
false, // exclusive
true, // nowait
nil) // arguments
if err != nil {
return nil, err
}
deliveries, err := channel.Consume(queue.Name, "", false, false, true, true, nil)
if err != nil {
return nil, err
}
go func(deliveries <-chan amqp.Delivery, c chan []byte) {
for d := range deliveries {
c <- d.Body
d.Ack(false) // false does *not* mean don't acknowledge, see library docs for details
}
}(deliveries, c)
return c, nil
}
func (r *RabbitMQ) TopicExists(topic string) bool {
return true
}