-
-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathretry_conn.go
157 lines (135 loc) · 4.1 KB
/
retry_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package redisc
import (
"errors"
"time"
"github.com/gomodule/redigo/redis"
)
// RetryConn wraps the connection c (which must be a *redisc.Conn) into a
// connection that automatically handles cluster redirections (MOVED and ASK
// replies) and retries for TRYAGAIN errors. Only Do, Close and Err can be
// called on that connection, all other methods return an error.
//
// The maxAtt parameter indicates the maximum number of attempts to
// successfully execute the command. The tryAgainDelay is the duration to wait
// before retrying a TRYAGAIN error.
//
// The only case where it returns a non-nil error is if c is not a
// *redisc.Conn.
func RetryConn(c redis.Conn, maxAtt int, tryAgainDelay time.Duration) (redis.Conn, error) {
cc, ok := c.(*Conn)
if !ok {
return nil, errors.New("redisc: connection is not a *Conn")
}
return &retryConn{c: cc, maxAttempts: maxAtt, tryAgainDelay: tryAgainDelay}, nil
}
type retryConn struct {
c *Conn
maxAttempts int // immutable
tryAgainDelay time.Duration // immutable
}
func (rc *retryConn) Do(cmd string, args ...interface{}) (interface{}, error) {
return rc.do(cmd, args...)
}
func (rc *retryConn) do(cmd string, args ...interface{}) (interface{}, error) {
var att int
var asking bool
cluster := rc.c.cluster
for rc.maxAttempts <= 0 || att < rc.maxAttempts {
if asking {
if err := rc.c.Send("ASKING"); err != nil {
return nil, err
}
asking = false
}
v, err := rc.c.Do(cmd, args...)
re := ParseRedir(err)
if re == nil {
if IsTryAgain(err) {
// handle retry
time.Sleep(rc.tryAgainDelay)
att++
continue
}
// not a retry error nor a redirection, return result
return v, err
}
// handle redirection
rc.c.mu.Lock()
readOnly := rc.c.readOnly
connAddr := rc.c.boundAddr
rc.c.mu.Unlock()
if readOnly {
// check if the connection was already made to that slot, meaning that
// the redirection is because the command can't be served by the replica
// and a non-readonly connection must be made to the slot's master. If
// that's not the case, then keep the readonly flag to true, meaning that
// it will attempt a connection
// to a replica for the new slot.
cluster.mu.Lock()
slotMappings := cluster.mapping[re.NewSlot]
cluster.mu.Unlock()
if isIn(slotMappings, connAddr) {
readOnly = false
}
}
var conn redis.Conn
addr := re.Addr
asking = re.Type == "ASK"
if asking {
// if redirecting due to ASK, use the address that was provided in the
// ASK error reply.
conn, err = cluster.getConnForAddr(addr, rc.c.forceDial)
if err != nil {
return nil, err
}
// TODO(mna): does redis cluster send ASK replies that redirect to
// replicas if the source node was a replica? Assume no for now.
readOnly = false
} else {
// if redirecting due to a MOVED, the slot mapping is already updated to
// reflect the new server for that slot (done in rc.c.Do), so
// getConnForSlot will return a connection to the correct address.
conn, addr, err = cluster.getConnForSlot(re.NewSlot, rc.c.forceDial, readOnly)
if err != nil {
// could not get connection to that node, return that error
return nil, err
}
}
var cerr error
rc.c.mu.Lock()
// close and replace the old connection (close must come before assignments)
cerr = rc.c.closeLocked()
rc.c.rc = conn
rc.c.boundAddr = addr
rc.c.readOnly = readOnly
rc.c.mu.Unlock()
if cerr != nil && cluster.BgError != nil {
go cluster.BgError(RetryCloseConn, cerr)
}
att++
}
return nil, errors.New("redisc: too many attempts")
}
func (rc *retryConn) Err() error {
return rc.c.Err()
}
func (rc *retryConn) Close() error {
return rc.c.Close()
}
func (rc *retryConn) Send(_ string, _ ...interface{}) error {
return errors.New("redisc: unsupported call to Send")
}
func (rc *retryConn) Receive() (interface{}, error) {
return nil, errors.New("redisc: unsupported call to Receive")
}
func (rc *retryConn) Flush() error {
return errors.New("redisc: unsupported call to Flush")
}
func isIn(list []string, v string) bool {
for _, vv := range list {
if v == vv {
return true
}
}
return false
}