-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathTransactionStream.js
100 lines (83 loc) · 2.04 KB
/
TransactionStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
'use strict';
var $ = require('highland');
var CypherStream = require('./CypherStream');
var Duplex = require('stream').Duplex;
var normalize = require('./util/normalize-query-statement');
// var R = require('ramda');
// var tap = R.tap;
// var log = tap(console.log.bind(console));
class TransactionStream extends Duplex {
constructor(session, options) {
super({ objectMode: true });
this.session = session;
this.tx = session.beginTransaction();
this.statements = $();
this.writes = this.statements.fork()
.flatMap(normalize)
.map(statement => {
if(statement.commit) {
this.commit();
}
return $(new CypherStream(this.tx, statement, options));
})
;
this.results = this.writes.fork()
.flatten()
.doto(x => this.push(x))
.errors(error => this.emit('error', error))
;
this.writes.resume();
this.results.resume();
}
_write(chunk, encoding, callback) {
if(this.rolledBack) {
throw new Error('Cannot write after rollback.');
}
if(this.committed) {
throw new Error('Cannot write after commit.');
}
this.statements.write(chunk);
callback();
}
_read() { }
commit() {
if(this.committed) {
return;
}
this.committed = true;
this.writes.on('end', () => {
this.tx.commit()
.subscribe({
onCompleted: () => {
this.emit('comitted');
this.push(null);
},
onError: error => {
this.emit('error', error);
this.push(null);
}
});
});
this.statements.end();
}
rollback() {
if(this.rolledBack) {
return;
}
this.rolledBack = true;
this.statements.end();
this.results.end();
this.writes.end();
this.tx.rollback()
.subscribe({
onCompleted: () => {
this.push(null);
},
onError: error => {
this.emit('error', error);
this.push(null);
}
});
}
}
module.exports = TransactionStream;