Skip to content

Commit

Permalink
Merge pull request #94 from terascope/modernize-file-reader-and-exporter
Browse files Browse the repository at this point in the history
modernized file_reader and file_exporter
  • Loading branch information
macgyver603 authored Aug 2, 2019
2 parents 62e905b + 2fa2eda commit df34f3a
Show file tree
Hide file tree
Showing 19 changed files with 1,086 additions and 1,017 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "file-assets",
"version": "0.7.0",
"version": "0.7.2",
"description": "A set of processors for exporting data to files"
}
166 changes: 4 additions & 162 deletions asset/file_exporter/index.js
Original file line number Diff line number Diff line change
@@ -1,165 +1,7 @@
'use strict';

const Promise = require('bluebird');
const path = require('path');
const fs = require('fs');
const json2csv = require('json2csv').parse;
const { TSError } = require('@terascope/utils');
const { legacyReaderShim } = require('@terascope/job-components');
const Processor = require('./processor');
const Schema = require('./schema');

Promise.promisifyAll(fs);

function newProcessor(context, opConfig) {
// This will need to be changed to the worker name since multiple workers on a node would result
// in write conflicts between workers
const worker = context.sysconfig._nodeName;
const filePrefix = opConfig.file_prefix;
let filePerSlice = opConfig.file_per_slice;
const filenameBase = path.join(opConfig.path, `${filePrefix}${worker}`);
let fileNum = 0;

// Used as a guard against dropping the header mid-file
let firstSlice = true;

// `file_per_slice` needs to be forced to `true` if the format is JSON to provide a sensible
// output
if (opConfig.format === 'json') {
filePerSlice = true;
}

// Set the options for the parser
const csvOptions = {};
// Only need to set `fields` if there is a custom list since the library will, by default,
// use the record' top-level attributes. This might be a problem if records are missing
// attirbutes
if (opConfig.fields.length !== 0) {
csvOptions.fields = opConfig.fields;
}

csvOptions.header = opConfig.include_header;
csvOptions.eol = opConfig.line_delimiter;

// Assumes a custom delimiter will be used only if the `csv` output format is chosen
if (opConfig.format === 'csv') {
csvOptions.delimiter = opConfig.field_delimiter;
} else if (opConfig.format === 'tsv') {
csvOptions.delimiter = '\t';
}

// Determines the filname based on the settings
function getFilename() {
if (filePerSlice) {
// Increment the file number tracker by one and use the previous number
fileNum += 1;
return `${filenameBase}.${fileNum - 1}`;
}
// Make sure header does not show up mid-file if the worker is writing all slices to a
// single file
if (!firstSlice) {
csvOptions.header = false;
}
firstSlice = false;
return filenameBase;
}

return (data) => {
// Converts the slice to a string, formatted based on the configuration options selected;
function buildOutputString(slice) {
switch (opConfig.format) {
case 'csv':
case 'tsv':
// null or empty slices will manifest as blank lines in the output file
if (!slice || !slice.length) return opConfig.line_delimiter;
return `${json2csv(slice, csvOptions)}${opConfig.line_delimiter}`;
case 'raw': {
let outStr = '';
slice.forEach((record) => {
outStr = `${outStr}${record.data}${opConfig.line_delimiter}`;
});
return outStr;
}
case 'ldjson': {
let outStr = '';
if (opConfig.fields.length > 0) {
slice.forEach((record) => {
outStr = `${outStr}${JSON.stringify(record, opConfig.fields)}${opConfig.line_delimiter}`;
});
} else {
slice.forEach((record) => {
outStr = `${outStr}${JSON.stringify(record)}${opConfig.line_delimiter}`;
});
}
return outStr;
}
case 'json': {
// This case assumes the data is just a single record in the slice's data array. We
// could just strigify the slice as-is, but feeding the output back into the reader
// would just nest that array into a record in that slice's array, which probably
// isn't the desired effect.
const outStr = `${JSON.stringify(slice)}${opConfig.line_delimiter}`;
return outStr;
}
// Schema validation guards against this
default:
throw new Error(`Unsupported output format "${opConfig.format}"`);
}
}
const fileName = getFilename();
return fs.appendFileAsync(fileName, buildOutputString(data))
.catch(err => Promise.reject(new TSError(err, {
reason: `Failure to append to file ${fileName}`
})));
};
}

function schema() {
return {
path: {
doc: 'Path to the file where the data will be saved to, directory must pre-exist.',
default: null,
format: 'required_String'
},
file_prefix: {
doc: 'Optional prefix to prepend to the file name.',
default: 'export_',
format: String
},
fields: {
doc: 'List of fields to extract from the incoming records and save to the file. '
+ 'The order here determines the order of columns in the file.',
default: [],
format: Array
},
field_delimiter: {
doc: 'Delimiter to use for separating fields in the output file.',
default: ',',
format: String
},
line_delimiter: {
doc: 'Delimiter to use for records in the output file.',
default: '\n',
format: String
},
file_per_slice: {
doc: 'Determines if a new file is created for each slice.',
default: false,
format: Boolean
},
include_header: {
doc: 'Determines whether or not to include a header at the top of the file. '
+ 'The header will consist of the field names.',
default: false,
format: Boolean
},
format: {
doc: 'Specifies the output format of the file. Supported formats are csv, tsv, json,'
+ ' and text, where each line of the output file will be a separate record.',
default: 'ldjson',
format: ['json', 'ldjson', 'raw', 'csv', 'tsv']
}
};
}

module.exports = {
newProcessor,
schema
};
exports = legacyReaderShim(Processor, Schema);
112 changes: 112 additions & 0 deletions asset/file_exporter/processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
'use strict';

const {
BatchProcessor
} = require('@terascope/job-components');
const json2csv = require('json2csv').parse;
const Promise = require('bluebird');
const path = require('path');
const fse = require('fs-extra');
const { TSError } = require('@terascope/utils');

class FileBatcher extends BatchProcessor {
constructor(context, opConfig, executionConfig) {
super(context, opConfig, executionConfig);
this.worker = context.cluster.worker.id;
this.filePrefix = opConfig.file_prefix;
if (opConfig.file_per_slice || (this.opConfig.format === 'json')) {
this.filePerSlice = true;
} else {
this.filePerSlice = false;
}
// Used for incrementing file name with `file_per_slice`
this.sliceCount = 0;
this.firstSlice = true;
// Set the options for the parser
this.csvOptions = {};
if (this.opConfig.fields.length !== 0) {
this.csvOptions.fields = this.opConfig.fields;
} else {
this.csvOptions.fields = null;
}

this.csvOptions.header = this.opConfig.include_header;
this.csvOptions.eol = this.opConfig.line_delimiter;

// Assumes a custom delimiter will be used only if the `csv` output format is chosen
if (this.opConfig.format === 'csv') {
this.csvOptions.delimiter = this.opConfig.field_delimiter;
} else if (opConfig.format === 'tsv') {
this.csvOptions.delimiter = '\t';
}
}

getName() {
const fileName = path.join(this.opConfig.path, `${this.filePrefix}${this.worker}`);
if (this.filePerSlice) {
this.sliceCount += 1;
// Slice count is only used in the file name with `file_per_slice`
return `${fileName}.${this.sliceCount - 1}`;
}
if (!this.firstSlice) this.csvOptions.header = false;
this.firstSlice = false;
return fileName;
}

async onBatch(slice) {
// console.log(slice)
const fileName = this.getName();
// console.log(fileName)

// Build the output string to dump to the object
// TODO externalize this into a ./lib/ for use with the `file_exporter`
let outStr = '';
switch (this.opConfig.format) {
case 'csv':
case 'tsv':
// null or empty slices will manifest as blank lines in the output file
if (!slice || !slice.length) outStr = this.opConfig.line_delimiter;
else outStr = `${json2csv(slice, this.csvOptions)}${this.opConfig.line_delimiter}`;
break;
case 'raw': {
slice.forEach((record) => {
outStr = `${outStr}${record.data}${this.opConfig.line_delimiter}`;
});
break;
}
case 'ldjson': {
if (this.opConfig.fields.length > 0) {
slice.forEach((record) => {
outStr = `${outStr}${JSON.stringify(record, this.opConfig.fields)}${this.opConfig.line_delimiter}`;
});
} else {
slice.forEach((record) => {
outStr = `${outStr}${JSON.stringify(record)}${this.opConfig.line_delimiter}`;
});
}
break;
}
case 'json': {
// This case assumes the data is just a single record in the slice's data array. We
// could just strigify the slice as-is, but feeding the output back into the reader
// would just nest that array into a record in that slice's array, which probably
// isn't the desired effect.
outStr = `${JSON.stringify(slice)}${this.opConfig.line_delimiter}`;
break;
}
// Schema validation guards against this
default:
throw new Error(`Unsupported output format "${this.opConfig.format}"`);
}

// console.log(outStr)

// Doesn't return a DataEntity or anything else if siccessful
return fse.appendFile(fileName, outStr)
.catch(err => Promise.reject(new TSError(err, {
reason: `Failure to append to file ${fileName}`
})));
}
}

module.exports = FileBatcher;
54 changes: 54 additions & 0 deletions asset/file_exporter/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const { ConvictSchema } = require('@terascope/job-components');

class Schema extends ConvictSchema {
build() {
return {
path: {
doc: 'Path to the file where the data will be saved to, directory must pre-exist.',
default: null,
format: 'required_String'
},
file_prefix: {
doc: 'Optional prefix to prepend to the file name.',
default: 'export_',
format: String
},
field_delimiter: {
doc: 'Delimiter character between record fields. Only used with `csv` format',
default: ',',
format: String
},
line_delimiter: {
doc: 'Line delimiter character for the object',
default: '\n',
format: String
},
fields: {
doc: 'CSV field headers used to create the json key, must be in same order as the',
default: [],
format: Array
},
file_per_slice: {
doc: 'Determines if a new file is created for each slice.',
default: false,
format: Boolean
},
include_header: {
doc: 'Determines whether or not to include a header at the top of the file. '
+ 'The header will consist of the field names.',
default: false,
format: 'Boolean'
},
format: {
doc: 'Format of the target object. Currently supports "json", "ldjson", "raw", "tsv", and'
+ ' "csv".',
default: 'ldjson',
format: ['json', 'ldjson', 'raw', 'tsv', 'csv']
}
};
}
}

module.exports = Schema;
44 changes: 44 additions & 0 deletions asset/file_reader/fetcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

const { Fetcher } = require('@terascope/job-components');
const { getChunk } = require('@terascope/chunked-file-reader');
const fse = require('fs-extra');

class FileFetcher extends Fetcher {
constructor(context, opConfig, executionConfig) {
super(context, opConfig, executionConfig);
this._initialized = false;
this._shutdown = false;
}

async initialize() {
this._initialized = true;
return super.initialize();
}

async shutdown() {
this._shutdown = true;
return super.shutdown();
}

async fetch(slice) {
// Coerce the field delimiter if the format is `tsv`
if (this.opConfig.format === 'tsv') {
this.opConfig.field_delimiter = '\t';
}
const reader = async (offset, length) => {
const fd = await fse.open(slice.path, 'r');
try {
const buf = Buffer.alloc(2 * this.opConfig.size);
const { bytesRead } = await fse.read(fd, buf, 0, length, offset);
return buf.slice(0, bytesRead).toString();
} finally {
fse.close(fd);
}
};
// Passing the slice in as the `metadata`. This will include the path, offset, and length
return getChunk(reader, slice, this.opConfig, this.logger, slice);
}
}

module.exports = FileFetcher;
Loading

0 comments on commit df34f3a

Please sign in to comment.