-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathindexer.js
39 lines (30 loc) · 1000 Bytes
/
indexer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"use strict";
// External dependencies
const fs = require('fs'),
transform = require('stream-transform'),
parse = require('csv-parse'),
gdelt_formatter = require('./gdelt/formatter.js'),
esIndexer = require('./es.js'),
concurrency = 10000;
function indexGdeltFile(filename) {
const input = fs.createReadStream(__dirname+`/${filename}`);
const indexer = new esIndexer({_index: 'elastic_gdelt', _type: 'event'});
const parser = parse({delimiter: '\t'});
const formatter = transform(function(record, callback){
callback(null, gdelt_formatter(record));
}, {parallel: concurrency});
const elastic_indexer = transform(function(record, callback){
indexer.indexDoc(record,callback);
}, {parallel: concurrency});
input.on('end', () => {
console.log("Finished reading CSV file.");
// TODO: finish indexing handler...
});
input.pipe(parser)
.pipe(formatter)
.pipe(elastic_indexer)
.pipe(process.stdout);
}
module.exports = {
indexGdeltFile
}