-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbalance.js
206 lines (175 loc) · 5.55 KB
/
balance.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/**
* Parts of this code are from the original module and under the following
* copyright.
*
* /* MIT License. Copyright (c) 2015-2018, Richard Rodger and other contributors.
* the rest of the code follows the following copyright
* /* Copyright (c) 2024 WizardTales GmbH, MIT License
*/
import visigoth from '@wzrdtales/visigoth';
import { uid } from 'uid';
import { pattern } from './index.js';
import Promise from 'bluebird';
export default class BalanceClient {
targets = {};
static defaults = {
debug: {
client_updates: false
}
};
static errors = {
'no-target': 'No targets have been registered for message <%=msg%>',
'no-current-target': 'No targets are currently active for message <%=msg%>'
};
constructor (options = {}) {
options = Object.assign({}, BalanceClient.defaults, options);
this.options = options;
this.options.circuitBreaker = this.options.circuitBreaker || {
closingTimeout: 1000,
retryTimeout: 100
};
}
makeHandle (config) {
return (pat, func) => {
pat = pattern(pat);
return this.addTarget(config, pat, func);
};
}
removeTarget (pg, pat, config) {
const actionId = config.id;
let found = false;
let targetState = this.targets[pg];
if (!targetState?.targets) {
targetState = { targets: {} };
}
this.targets[pg] = targetState;
if (targetState.targets[actionId]) found = true;
if (found) {
targetState.visigoth.remove(targetState.targets[actionId].e);
delete targetState.targets[actionId];
}
if (this.options.debug.client_updates) {
console.log('remove', pat, actionId, found);
}
}
addClient (msg) {
const clientOptions = msg;
const pg = ([clientOptions.pin] || clientOptions.pins)
.map(pattern)
.join(':::');
if (!this.targets[pg]) {
this.targets[pg] = {};
}
const model = 'consume';
const me = this;
const send = function (msg, meta) {
const targetstate = me.targets[pg];
if (targetstate.visigoth) {
return Promise.fromCallback((reply) =>
me[model](this, msg, targetstate, reply, meta)
);
} else return { err: 'no-target', msg };
};
return {
config: msg,
send
};
}
removeClient (msg) {
const pins = (
msg.config.pin
? Array.isArray(msg.config.pin)
? msg.config.pin
: Array(msg.config.pin)
: msg.config.pins
).map(pattern);
const pg = pins.join(':::');
this.targets[pg] = this.targets[pg] || {};
pins.forEach((pin) => this.removeTarget(pg, pin, msg.config));
}
consume (mc, msg, targetState, done, meta) {
let trys = 0;
const me = this;
function tryCall () {
targetState.visigoth.choose(function (err, target, errored, stats) {
if (err) {
return done({ err: 'no-current-target', msg });
}
// if (trys > 0) {
// meta.mi += trys;
// meta.tx += trys;
// meta.id = `${meta.mi}/${meta.tx}`;
// }
target.action
.call(mc, msg, meta)
.then(function (msg, _meta) {
// we don't really need this right now, it slows down as
// new Date() and even hrtime is suprisingly expensive and it is
// not utilized, our default rating strategy is none at all. We
// simply go round robin and fail fast. The service itself decides
// if it wants to accept a request or answer
// retry_later_err_overload, a service does get circuit breaked at
// that time and wont be retried too fast. A system that has too
// sensitive overload metrics, may be adjusted properly and a
// system that truly is overloaded will start responding with no
// targets available.
// If overload metrics are in sync with autoscaling, everything
// should be usually fine.
if (process.env.METRICS) {
stats.responseTime = new Date() - meta.start;
}
done(null, msg, _meta);
})
.catch((err) => {
// seneca.log.error('execute_err', err);
if (
err.details?.message === 'retry_later_err_overload' ||
err.message === 'timeout'
) {
// only error on controlled overload errors
errored();
if (++trys < 10) {
return setTimeout(
() => tryCall(),
me.options.circuitBreaker.retryTimeout
);
}
if ((process.env.TRACES?.indexOf('MW:D') ?? -1) !== -1) {
console.error(err);
}
return done({ err: 'all-targets-overloaded', msg });
} else {
return done({ err, msg });
}
});
});
}
tryCall();
}
addTarget (config, pat, action) {
let targetState = this.targets[pat];
let add = true;
if (!targetState?.targets) {
targetState = {
targets: {},
visigoth: visigoth(this.options.circuitBreaker)
};
}
this.targets[pat] = targetState;
const target = targetState.targets[action.id];
if (target) add = false;
if (add) {
const target = {
action: action,
id: action.id,
config: config
};
const { e } = targetState.visigoth.add(target);
target.e = e;
targetState.targets[action.id] = target;
}
if (this.options.debug.client_updates) {
console.log('add', pat, action.id, add);
}
}
}