-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
130 lines (102 loc) · 2.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
'use strict'
var EE = require('events').EventEmitter
var clone = require('clone')
var fastparallel = require('fastparallel')
var JOBS = 'JOBS::'
function cocktailControl (db, defs) {
var that = new EE()
var parallel = fastparallel()
that.enqueue = deferredEnqueue
that.free = deferredFree
var jobs = []
var count = 0
var deferred = []
that.queue = jobs
var stream = db.createValueStream({
gt: JOBS,
lt: JOBS + '\xff',
valueEncoding: 'json'
})
stream.on('data', function (job) {
count = job.id
jobs.push(job)
})
stream.on('end', function () {
that.enqueue = enqueue
that.free = free
parallel(that, function (task, cb) {
this[task.method](task.job, function (err) {
if (task.cb) {
task.cb(err)
}
cb(err)
})
}, deferred, function (err) {
if (err) {
that.emit('error', err)
}
that.emit('ready')
})
})
return that
function deferredEnqueue (job, cb) {
deferred.push({ method: 'enqueue', job: job, cb: cb })
return this
}
function deferredFree (worker, cb) {
deferred.push({ method: 'free', job: worker, cb: cb })
return this
}
function enqueue (job, cb) {
job.id = count++
jobs.push(job)
db.put(JOBS + job.id, job, {
valueEncoding: 'json'
}, cb)
return this
}
function free (worker, cb) {
var wrap = defs.workers[worker]
if (!wrap) {
throw new Error('unknown worker')
}
var cocktails = wrap.cocktails
var executables = []
jobs.forEach(function (job) {
var cocktail
for (var i = 0; i < cocktails.length; i++) {
cocktail = cocktails[i]
if (job.cocktail !== cocktail.cocktail) {
continue
}
var isTaken = executables.reduce(function (acc, job) {
return acc || job.pump === cocktail.pump
}, false)
if (isTaken) {
continue
}
job.activations = clone(defs.cocktails[job.cocktail].activations)
job.pump = cocktail.pump
executables.push(job)
break
}
})
db.batch(executables.map(function (job) {
return {
type: 'del',
key: JOBS + job.id
}
}), cb)
// splice the new exectuables from the jobs queue. :D
executables.forEach(function (executable) {
jobs.splice(jobs.indexOf(executable), 1)
})
if (executables.length > 0) {
that.emit(worker, {
jobs: executables
})
}
return this
}
}
module.exports = cocktailControl