-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathfan_out_complex.go
146 lines (128 loc) · 3.04 KB
/
fan_out_complex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package fanout
import (
"context"
"sync"
"sync/atomic"
"go.uber.org/zap" //https://github.com/uber-go/zap
//Blazing fast, structured, leveled logging in Go.
)
var (
log, _ = zap.NewDevelopment()
)
//Settings of pipeline
const (
MaxWorkers = 16
MaxQueueSize = 512
MasterQueueSize = MaxQueueSize * MaxWorkers
)
//IDispatcher Message
type IDispatcher interface {
Before(context.Context) error
After() error
Process(interface{}) error
}
//worker each work will dispatch message to several channels
type worker struct {
index uint32
mutex *sync.Mutex
running bool
chain chan interface{}
debug bool
idle uint32
dispatcher IDispatcher //hold a dispacher,需要自己实现一个dispatcher 工厂
}
//Pipeline of workers
type Pipeline struct {
workers map[int]*worker
chain chan interface{}
}
//DispatcherBuilder create Dispatcher
type DispatcherBuilder func() IDispatcher
//Start run
func (p *Pipeline) Start(ctx context.Context) {
go func(pipe *Pipeline) {
for {
expectationWorkers := len(pipe.chain) % MaxWorkers
if expectationWorkers >= MaxWorkers {
expectationWorkers = 0
}
select {
case <-ctx.Done():
return
case val, ok := <-pipe.chain:
if !ok {
return
}
go pipe.workers[expectationWorkers].stream(val)
}
}
}(p)
}
//Dispatch message to chains
func (p *Pipeline) Dispatch(msg interface{}) {
p.chain <- msg
}
//NewPipeline create a Workflow with a dispacher builder and some workers
func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline {
ch := make(chan interface{}, MasterQueueSize)
wk := make(map[int]*worker)
for i := 0; i < MaxWorkers; i++ {
wk[i] = &worker{
index: uint32(i + 1),
chain: make(chan interface{}, MaxQueueSize),
mutex: new(sync.Mutex),
debug: debug,
idle: idle,
dispatcher: d(), //build real dispatcher
}
}
return &Pipeline{workers: wk, chain: ch}
}
func (c *worker) stream(val interface{}) {
c.chain <- val
if !c.running {
c.mutex.Lock()
c.running = true
ctx, cancel := context.WithCancel(context.Background())
defer func(w *worker, cancel context.CancelFunc) {
if w.debug {
log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle))
}
if c.dispatcher != nil {
err := c.dispatcher.After()
if err != nil {
log.Error("can not finish track issue", zap.Error(err))
}
}
cancel()
w.mutex.Unlock()
w.running = false
}(c, cancel)
if c.dispatcher != nil {
err := c.dispatcher.Before(ctx)
if err != nil {
log.Error("can not start worker", zap.Error(err))
}
}
var idle uint32 = 0
for {
select {
case msg := <-c.chain:
atomic.StoreUint32(&idle, 0)
if msg != nil && c.dispatcher != nil {
err := c.dispatcher.Process(msg)
if err != nil {
log.Error("can not process message", zap.Any("msg", &msg), zap.Error(err))
}
}
default:
atomic.AddUint32(&idle, 1)
if i := atomic.LoadUint32(&idle); i > 0 {
if i > c.idle {
return
}
}
}
}
}
}