forked from golang-queue/nats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnats.go
140 lines (120 loc) · 2.8 KB
/
nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package nats
import (
"context"
"encoding/json"
"sync"
"sync/atomic" //nolint:typecheck,nolintlint
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/job"
nats "github.com/nats-io/nats.go"
)
var _ core.Worker = (*Worker)(nil)
// Worker for NSQ
type Worker struct {
client *nats.Conn
stop chan struct{}
exit chan struct{}
stopFlag int32
stopOnce sync.Once
startOnce sync.Once
opts options
subscription *nats.Subscription
tasks chan *nats.Msg
}
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
var err error
w := &Worker{
opts: newOptions(opts...),
stop: make(chan struct{}),
exit: make(chan struct{}),
tasks: make(chan *nats.Msg),
}
w.client, err = nats.Connect(w.opts.addr)
if err != nil {
w.opts.logger.Fatal("can't connect to nats:", err)
}
if err := w.startConsumer(); err != nil {
w.opts.logger.Fatal("can't start consumer:", err)
}
return w
}
func (w *Worker) startConsumer() (err error) {
w.startOnce.Do(func() {
w.subscription, err = w.client.QueueSubscribe(w.opts.subj, w.opts.queue, func(msg *nats.Msg) {
select {
case w.tasks <- msg:
case <-w.stop:
if msg != nil {
// re-queue the task if worker has been shutdown.
w.opts.logger.Info("re-queue the current task")
if err := w.client.Publish(w.opts.subj, msg.Data); err != nil {
w.opts.logger.Errorf("error to re-queue the current task: %s", err.Error())
}
}
close(w.exit)
}
})
})
return err
}
// Run start the worker
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
return w.opts.runFunc(ctx, task)
}
// Shutdown worker
func (w *Worker) Shutdown() error {
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
return queue.ErrQueueShutdown
}
w.stopOnce.Do(func() {
// unsubscribe channel if start the consumer
if w.subscription != nil {
_ = w.subscription.Unsubscribe()
}
close(w.stop)
select {
case <-w.exit:
case <-time.After(50 * time.Millisecond):
}
w.client.Close()
close(w.tasks)
})
return nil
}
// Queue send notification to queue
func (w *Worker) Queue(job core.QueuedMessage) error {
if atomic.LoadInt32(&w.stopFlag) == 1 {
return queue.ErrQueueShutdown
}
err := w.client.Publish(w.opts.subj, job.Bytes())
if err != nil {
return err
}
return nil
}
// Request a new task
func (w *Worker) Request() (core.QueuedMessage, error) {
_ = w.startConsumer()
clock := 0
loop:
for {
select {
case task, ok := <-w.tasks:
if !ok {
return nil, queue.ErrQueueHasBeenClosed
}
var data job.Message
_ = json.Unmarshal(task.Data, &data)
return &data, nil
case <-time.After(1 * time.Second):
if clock == 5 {
break loop
}
clock += 1
}
}
return nil, queue.ErrNoTaskInQueue
}