-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
executable file
·109 lines (99 loc) · 2.33 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package minigrush
import (
"encoding/json"
"log"
"net/http"
"sync"
"time"
"github.com/crbrox/store"
)
type Consumer struct {
//Channel for getting petitions
GetFrom <-chan *Petition
//Store of petitions, for removing when done
PetitionStore store.Interface
//Store of replies, for saving responses
ReplyStore store.Interface
//http.Client for making requests to target host
Client http.Client
//number of goroutines consuming petitions
n int
//channel for asking goroutines to finish
endChan chan struct{}
//WaitGroup for goroutines after been notified the should end
wg sync.WaitGroup
}
//Start starts n goroutines for taking Petitions from the GetFrom channel.
//It returns a channel for notifying when the consumer has ended (hopefully after a Stop() method invocation).
func (c *Consumer) Start(n int) <-chan bool {
c.n = n
finalDone := make(chan bool)
c.endChan = make(chan struct{})
c.wg.Add(c.n)
for i := 0; i < c.n; i++ {
go c.relay()
}
go func() {
c.wg.Wait()
finalDone <- true
}()
return finalDone
}
//Loop of taking a petition and making the request it represents.
func (c *Consumer) relay() {
defer c.wg.Done()
SERVE:
for {
select {
case <-c.endChan:
break SERVE
default:
select {
case req := <-c.GetFrom:
c.process(req)
case <-c.endChan:
break SERVE
}
}
}
}
//process recreates the request that should be sent to the target host
//it stores the response in the store of replies.
func (c *Consumer) process(petition *Petition) {
var (
req *http.Request
resp *http.Response
reply *Reply
start = time.Now()
)
req, err := petition.Request()
if err != nil {
log.Println(petition.Id, err)
} else {
resp, err = c.Client.Do(req)
if err != nil {
log.Println(petition.Id, err)
} else {
defer resp.Body.Close()
}
}
reply = newReply(resp, petition, err)
reply.Created = start
text, err := json.MarshalIndent(reply, "", " ")
if err != nil {
log.Println(petition.Id, err)
}
err = c.ReplyStore.Put(reply.Id, text)
if err != nil {
log.Println(petition.Id, err)
}
err = c.PetitionStore.Delete(petition.Id)
if err != nil {
log.Println(petition.Id, err)
}
}
//Stop asks consumer to stop taking petitions. When the stop is complete,
//the fact will be notified through the channel returned by the Start() method.
func (c *Consumer) Stop() {
close(c.endChan)
}