forked from dcharbonnier/Haraka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunkemitter.js
75 lines (68 loc) · 2.21 KB
/
chunkemitter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
'use strict';
const EventEmitter = require('events').EventEmitter;
class ChunkEmitter extends EventEmitter {
constructor (buffer_size) {
super();
this.buffer_size = parseInt(buffer_size) || (64 * 1024);
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
}
fill (input) {
if (typeof input === 'string') {
input = Buffer.from(input);
}
// Optimization: don't allocate a new buffer until
// the input we've had so far is bigger than our
// buffer size.
if (!this.buf) {
// We haven't allocated a buffer yet
this.bufs.push(input);
this.bufs_size += input.length;
if ((input.length + this.bufs_size) > this.buffer_size) {
this.buf = Buffer.alloc(this.buffer_size);
const in_new = Buffer.concat(this.bufs, this.bufs_size);
input = in_new;
// Reset
this.bufs = [];
this.bufs_size = 0;
}
else {
return;
}
}
while (input.length > 0) {
let remaining = this.buffer_size - this.pos;
if (remaining === 0) {
this.emit('data', this.buf); //.slice(0));
this.buf = Buffer.alloc(this.buffer_size);
this.pos = 0;
remaining = this.buffer_size;
}
const to_write = ((remaining > input.length) ? input.length : remaining);
input.copy(this.buf, this.pos, 0, to_write);
this.pos += to_write;
input = input.slice(to_write);
}
}
end (cb) {
let emitted = false;
if (this.bufs_size > 0) {
this.emit('data', Buffer.concat(this.bufs, this.bufs_size));
emitted = true;
}
else if (this.pos > 0) {
this.emit('data', this.buf.slice(0, this.pos));
emitted = true;
}
// Reset
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
if (cb && typeof cb === 'function') cb();
return emitted;
}
}
module.exports = ChunkEmitter;