forked from NeowayLabs/wabbit
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdial.go
180 lines (150 loc) · 3.57 KB
/
dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package amqptest
import (
"sync"
"time"
"github.com/PeriscopeData/wabbit"
"github.com/PeriscopeData/wabbit/amqptest/server"
"github.com/PeriscopeData/wabbit/utils"
"github.com/pborman/uuid"
)
const (
// 1 second
defaultReconnectDelay = 1
)
// Conn is the fake AMQP connection
type Conn struct {
amqpuri string
isConnected bool
ConnID string
errSpread *utils.ErrBroadcast
errChan chan wabbit.Error
defErrDone chan bool
mu *sync.Mutex
hasAutoRedial bool
amqpServer *server.AMQPServer
dialFn func() error
}
// Dial mock the connection dialing to rabbitmq and
// returns the established connection or error if something goes wrong
func Dial(amqpuri string) (wabbit.Conn, error) {
conn := &Conn{
amqpuri: amqpuri,
errSpread: utils.NewErrBroadcast(),
errChan: make(chan wabbit.Error),
defErrDone: make(chan bool),
mu: &sync.Mutex{},
}
conn.errSpread.Add(conn.errChan)
conn.dialFn = func() error {
var err error
conn.ConnID = uuid.New()
conn.amqpServer, err = server.Connect(amqpuri, conn.ConnID, conn.errSpread)
if err != nil {
return err
}
// concurrent access with Close method
conn.mu.Lock()
conn.isConnected = true
conn.mu.Unlock()
// by default, we discard any errors
// send something to defErrDone to destroy
// this goroutine and start consume the errors
go func() {
for {
select {
case <-conn.errChan:
case <-conn.defErrDone:
conn.mu.Lock()
if conn.hasAutoRedial {
conn.mu.Unlock()
return
}
conn.mu.Unlock()
// Drain the errChan channel before
// the exit.
for {
if _, ok := <-conn.errChan; !ok {
return
}
}
}
}
}()
return nil
}
err := conn.dialFn()
if err != nil {
return nil, err
}
return conn, nil
}
// NotifyClose publishs notifications about server or client errors in the given channel
func (conn *Conn) NotifyClose(c chan wabbit.Error) chan wabbit.Error {
conn.errSpread.Add(c)
return c
}
// AutoRedial mock the reconnection faking a delay of 1 second
func (conn *Conn) AutoRedial(outChan chan wabbit.Error, done chan bool) {
if !conn.hasAutoRedial {
conn.mu.Lock()
conn.hasAutoRedial = true
conn.mu.Unlock()
conn.defErrDone <- true
}
go func() {
var err wabbit.Error
var attempts uint
select {
case amqpErr := <-conn.errChan:
err = amqpErr
if amqpErr == nil {
// Gracefull connection close
return
}
lattempts:
// send the error to client
outChan <- err
if attempts > 60 {
attempts = 0
}
// Wait n Seconds where n == attempts...
time.Sleep(time.Duration(attempts) * time.Second)
connErr := conn.dialFn()
if connErr != nil {
attempts++
goto lattempts
}
// enabled AutoRedial on the new connection
conn.AutoRedial(outChan, done)
done <- true
return
}
}()
}
// Close the fake connection
func (conn *Conn) Close() error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.isConnected {
// Disconnect from the server.
if err := server.Close(conn.amqpuri, conn.ConnID); err != nil {
return err
}
conn.isConnected = false
conn.amqpServer = nil
}
// enables AutoRedial to gracefully shutdown
// This isn't wabbit stuff. It's the streadway/amqp way of notify the shutdown
if conn.hasAutoRedial {
conn.errSpread.Write(nil)
} else {
conn.errSpread.Delete(conn.errChan)
close(conn.errChan)
conn.defErrDone <- true
}
return nil
}
// Channel creates a new fake channel
func (conn *Conn) Channel() (wabbit.Channel, error) {
return conn.amqpServer.CreateChannel(conn.ConnID, conn)
}