-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer.go
executable file
·161 lines (147 loc) · 4.36 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package gridas
import (
"net/http"
"sync"
"time"
"labix.org/v2/mgo"
"labix.org/v2/mgo/bson"
"github.com/crbrox/gridas/config"
"github.com/crbrox/gridas/mylog"
)
//Consumer is in charge of taking up petitions from the "GetFrom" channel and
//making the actual request to the target host, saving the answer and deleting the
//petition after that.
type Consumer struct {
//Channel for getting petitions
GetFrom <-chan *Petition
//Configuration object
Cfg *config.Config
//Session seed for mongo
SessionSeed *mgo.Session
//http.Client for making requests to target host
Client http.Client
//number of goroutines consuming petitions
n int
//channel for asking goroutines to finish
endChan chan struct{}
//WaitGroup for goroutines after been notified the should end
wg sync.WaitGroup
}
//Start starts n goroutines for taking Petitions from the GetFrom channel.
//It returns a channel for notifying when the consumer has ended (hopefully after a Stop() method invocation).
func (c *Consumer) Start(n int) <-chan bool {
mylog.Debugf("starting consumer %+v", c)
c.n = n
finalDone := make(chan bool)
c.endChan = make(chan struct{})
c.wg.Add(c.n)
for i := 0; i < c.n; i++ {
go c.relay()
}
go func() {
c.wg.Wait()
mylog.Debug("consumer waiting for children")
finalDone <- true
mylog.Debug("all consumer's children finished")
}()
return finalDone
}
//Loop of taking a petition and making the request it represents.
func (c *Consumer) relay() {
defer c.wg.Done()
SERVE:
for {
select {
case <-c.endChan:
break SERVE
default:
select {
case req := <-c.GetFrom:
mylog.Debugf("extracted petition %+v", req)
c.process(req)
case <-c.endChan:
break SERVE
}
}
}
}
//process recreates the request that should be sent to the target host
//it stores the response in the store of replies.
func (c *Consumer) process(petition *Petition) {
var (
req *http.Request
resp *http.Response
reply *Reply
start = bson.Now()
)
db := c.SessionSeed.DB(c.Cfg.Database)
petColl := db.C(c.Cfg.Instance + c.Cfg.PetitionsColl)
replyColl := db.C(c.Cfg.ResponsesColl)
errColl := db.C(c.Cfg.ErrorsColl)
mylog.Debugf("processing petition %+v", petition)
req, err := petition.Request()
if err != nil {
mylog.Alert(petition.ID, err)
return
}
mylog.Debugf("restored request %+v", req)
mylog.Debug("before making request", petition.ID)
resp, err = c.doRequest(req, petition.ID)
if err == nil {
mylog.Debug("after making request", petition.ID)
defer func() {
mylog.Debug("closing response body", petition.ID)
resp.Body.Close()
}()
}
reply = newReply(resp, petition, err)
reply.Created = start
mylog.Debugf("created reply %+v", reply)
if err != nil || resp.StatusCode < 200 || resp.StatusCode >= 300 {
e := errColl.Insert(reply)
if e != nil {
mylog.Alert("ERROR inserting erroneous reply", petition.ID, err)
c.SessionSeed.Refresh()
}
}
mylog.Debugf("before insert reply %+v", reply)
err = replyColl.Insert(reply)
mylog.Debugf("after insert reply %+v", reply)
if err != nil {
mylog.Alert("ERROR inserting reply", petition.ID, err)
c.SessionSeed.Refresh()
}
mylog.Debugf("before remove petition %+v", petition)
err = petColl.Remove(bson.M{"id": petition.ID})
mylog.Debugf("after remove petition %+v", petition)
if err != nil {
mylog.Alert("ERROR removing petition", petition.ID, err)
c.SessionSeed.Refresh()
}
}
//Stop asks consumer to stop taking petitions. When the stop is complete,
//the fact will be notified through the channel returned by the Start() method.
func (c *Consumer) Stop() {
mylog.Debug("closing consumer end channel")
close(c.endChan)
}
//doRequest makes and retries the request as many times as is set, increasing the time between retries, doubling the initial time
func (c *Consumer) doRequest(req *http.Request, petid string) (resp *http.Response, err error) {
resp, err = c.Client.Do(req)
if err == nil && resp.StatusCode != 503 { //Good, not error and non challenging response
return resp, nil
}
mylog.Debug("error making request", petid, err)
var retryTime = time.Duration(c.Cfg.RetryTime) * time.Millisecond
var retries = c.Cfg.Retries
for i := 0; i < retries; i++ {
time.Sleep(retryTime)
mylog.Debugf("retrying request %v retry #%v after %v error %v", petid, i+1, retryTime, err)
resp, err = c.Client.Do(req)
if err == nil && resp.StatusCode != 503 {
break
}
retryTime *= 2
}
return resp, err
}