forked from ryanskidmore/parallel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_helper.go
68 lines (61 loc) · 1.85 KB
/
worker_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package parallel
import (
"errors"
"sync"
)
// WorkerHelper struct contains components to
// assist the execution of workers.
type WorkerHelper struct {
worker *Worker
wg *sync.WaitGroup
}
func newWorkerHelper(w *Worker) *WorkerHelper {
wg := &sync.WaitGroup{}
return &WorkerHelper{worker: w, wg: wg}
}
// Done signals to the worker helper that this worker
// is complete. This must be run in each thread if you
// are calling Wait().
func (wh *WorkerHelper) Done() {
wh.wg.Done()
}
// PublishData publishes through the specified DataChannel.
// This is a non-blocking operation.
func (wh *WorkerHelper) PublishData(name string, data interface{}) error {
if _, exists := wh.worker.p.dataChannels[name]; !exists {
return errors.New("Data channel does not exist")
}
go func() {
wh.worker.p.dataChannels[name] <- data
}()
return nil
}
// ConsumeData consumes data from the specified DataChannel.
// This is a blocking operation.
func (wh *WorkerHelper) ConsumeData(name string) (interface{}, error) {
if _, exists := wh.worker.p.dataChannels[name]; !exists {
return nil, errors.New("Data channel does not exist")
}
data, open := <-wh.worker.p.dataChannels[name]
if !open {
return nil, errors.New("Data channel closed")
}
return data, nil
}
// ConsumeDataInBatches consumes data from the specified
// DataChannel in batches. This is a blocking operation
// and will only run when there are enough items to batch.
func (wh *WorkerHelper) ConsumeDataInBatches(name string, size int) ([]interface{}, error) {
if _, exists := wh.worker.p.dataChannels[name]; !exists {
return nil, errors.New("Data channel does not exist")
}
dataBatch := make([]interface{}, size)
for i := 0; i < size; i++ {
data, open := <-wh.worker.p.dataChannels[name]
if !open {
return dataBatch, errors.New("Data channel closed")
}
dataBatch[i] = data
}
return dataBatch, nil
}