-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.js
103 lines (91 loc) · 2.03 KB
/
streams.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
const {Future} = require("./future");
const {Readable, Transform, Writable} = require("stream");
const {LengthPrefixedFrameIngress, LengthPrefixedFrameEgress} = require("./streams/network-length-frame");
function promisePiped( from, to ){
const future = new Future();
function remove() {
from.removeListener("end", accept);
from.removeListener("error", reject);
to.removeListener("error", reject);
}
function reject(e){
remove();
future.reject(e);
}
function accept(v){
remove();
future.accept(v);
}
from.on("error", reject);
to.on("error", reject);
to.on("finish", accept);
from.pipe(to);
return future.promised;
}
class EchoOnReceive extends Transform {
constructor( log = console ){
super();
this.log = log;
}
_transform( chunk, encoding, cb ) {
this.log.log("Chunk", chunk);
cb(null,chunk);
}
_final(cb){
this.log.log("Final called");
cb();
}
}
/**
* A readable which will provide a given buffer on the first read attempt then appear closed on all further attempts.
*/
class MemoryReadable extends Readable {
/**
*
* @param source {Buffer} bytes to be provided
* @param props additional properties to be passed to Readable
*/
constructor(source, props) {
super(props);
this.bytes = source;
this.pushed = false;
}
_read( size ){
if( !this.pushed ) {
this.pushed = true;
this.push(this.bytes);
} else {
this.readable = false;
this.push(null);
}
}
}
/**
* Accumulates bytes written to the sink. This sink is never finished or finalized.
*/
class MemoryWritable extends Writable {
/**
* Initializes an empty buffer with the provided properties.
*
* @param props properties to provide to the writable
*/
constructor(props) {
super(props);
this.bytes = Buffer.alloc(0);
}
_write(chunk, encoding, callback) {
try {
this.bytes = Buffer.concat([this.bytes, chunk]);
callback(null);
}catch(e){
callback(e);
}
}
}
module.exports = {
promisePiped,
EchoOnReceive,
LengthPrefixedFrameIngress, LengthPrefixedFrameEgress,
MemoryReadable,
MemoryWritable
};