Skip to content
This repository has been archived by the owner on Jan 18, 2019. It is now read-only.

Commit

Permalink
Merge pull request #26 from terascope/plugin_adapter
Browse files Browse the repository at this point in the history
Plugin adapter
  • Loading branch information
kstaken authored Jan 8, 2019
2 parents baa794a + dfa907f commit 6fc1cb5
Show file tree
Hide file tree
Showing 34 changed files with 358 additions and 219 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ts-transforms",
"version": "0.5.1",
"version": "0.6.0",
"description": "An ETL framework built upon xlucene-evaluator",
"srcMain": "src/index.ts",
"main": "dist/index.js",
Expand Down
37 changes: 29 additions & 8 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ const command = yargs
.alias('T', 'types-file')
.alias('r', 'rules')
.alias('d', 'data')
.alias('p', 'performance')
.alias('perf', 'performance')
.alias('m', 'match')
.alias('p', 'plugins')
.help('h')
.alias('h', 'help')
.describe('r', 'path to load the rules file')
Expand All @@ -25,7 +26,7 @@ const command = yargs
.describe('T', 'specify type configs from file')
.describe('p', 'output the time it took to run the data')
.demandOption(['r'])
.version('0.4.0')
.version('0.5.0')
.argv;

const filePath = command.rules;
Expand All @@ -39,7 +40,7 @@ interface ESData {

try {
if (command.t) {
const segments = command.t.split(',');
const segments = formatList(command.t);
segments.forEach((segment: string) => {
const pieces = segment.split(':');
typesConfig[pieces[0].trim()] = pieces[1].trim();
Expand Down Expand Up @@ -77,6 +78,10 @@ async function dataLoader(dataPath: string): Promise<object[]> {
});
}

function formatList(input: string): string[] {
return input.split(',').map((str) => str.trim());
}

function getPipedData() {
return new Promise((resolve, reject) => {
let strResults = '';
Expand Down Expand Up @@ -141,24 +146,40 @@ async function getData(dataPath: string) {
async function initCommand() {
try {
const opConfig: WatcherConfig = {
file_path: path.resolve(dir, filePath),
selector_config: typesConfig,
rules: formatList(filePath),
types: typesConfig,
type
};
let plugins = [];
if (command.p) {
try {
const pluginList = formatList(command.p);
plugins = pluginList.map((pluginPath) => {
const module = require(path.resolve(dir, pluginPath));
const results = module.default || module;
return results;
});

} catch (err) {
// @ts-ignore
console.error('could not load plugins');
process.exit(1);
}
}
const manager = new PhaseManager(opConfig, logger);

await manager.init();
await manager.init(plugins);

const data = await getData(dataPath);

if (command.p) {
if (command.perf) {
process.stderr.write('\n');
// tslint:disable-next-line
console.time('execution-time');
}
const results = manager.run(data);
// tslint:disable-next-line
if (command.p) console.timeEnd('execution-time');
if (command.perf) console.timeEnd('execution-time');
process.stdout.write(`${JSON.stringify(results, null, 4)} \n`);
} catch (err) {
console.error(err);
Expand Down
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import ExtractionPhase from './phase_manager/extraction_phase';
import PostProcessPhase from './phase_manager/post_process_phase';
import ValidationPhase from './phase_manager/validation_phase';
import Loader from './loader';
import { OperationsManager } from './operations';

export {
PhaseManager,
SelectionPhase,
ExtractionPhase,
PostProcessPhase,
ValidationPhase,
Loader
Loader,
OperationsManager
};
57 changes: 30 additions & 27 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@

import PhaseBase from './operations/lib/base';
import { DataEntity } from '@terascope/job-components';

export enum NotifyType { matcher = 'matcher', transform = 'transform' }

export interface OperationConfig {
tag?: string;
selector?: string;
selector_config?: object | undefined;
types?: object | undefined;
source_field?: string;
start?: string;
end?: string;
Expand All @@ -19,18 +20,31 @@ export interface OperationConfig {
registration_selector?:string;
mutate?: boolean;
other_match_required?: boolean;
length?: number;
fields?: string[];
delimiter?: string;
}
// TODO: fix registrationSelector above
export interface Refs {
refs: string;
validation?: string;
post_process?: string;

export interface SelectorTypes {
[field: string]: string;
}

export interface StringRefs extends Refs {
length?: number;
target_field?: string;
source_field: string;
export type PluginClassConstructor = { new (): PluginClassType };

export interface PluginClassType {
init: () => OperationsDict;
}

export type PluginList = PluginClassConstructor[];

export type BaseOperationClass = { new (config: OperationConfig, types?: SelectorTypes): Operation };

export interface OperationsDict {
[op: string]: BaseOperationClass;
}

export interface Operation {
run(data: DataEntity): null | DataEntity;
}

export interface ConfigResults {
Expand All @@ -43,26 +57,15 @@ export interface NormalizedConfig {
registrationSelector: string;
}

export interface OperationsDictionary {
[key: string]: PhaseBase[];
export interface OperationsPipline {
[key: string]: Operation[];
}

export interface WatcherConfig {
type: string;
file_path: string | undefined;
connection?: string | undefined;
index?: string | undefined;
selector_config?: object | undefined;
actions?: object[];
}

export interface JoinConfig {
selector?: string;
operation: string;
fields: string[];
target_field: string;
delimiter?: string;
remove_source?: boolean;
rules: string[];
plugins?: string[];
types?: SelectorTypes;
}

export type injectFn = (config: OperationConfig, list: OperationConfig[]) => void;
Expand Down
12 changes: 5 additions & 7 deletions src/loader/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import fs from 'fs';
import readline from 'readline';
import { WatcherConfig, OperationConfig } from '../interfaces';
import _ from 'lodash';

export default class Loader {
private opConfig: WatcherConfig;
Expand All @@ -11,13 +12,10 @@ export default class Loader {
}

public async load():Promise<OperationConfig[]> {
return this.fileLoader();
const results = await Promise.all(this.opConfig.rules.map((ruleFile) => this.fileLoader(ruleFile)));
return _.flatten(results);
}

// private async esLoader() {
// //TODO: implement me
// }

private parseConfig(config: string): OperationConfig {
if (config.charAt(0) !== '{') {
return { selector: config as string };
Expand All @@ -28,12 +26,12 @@ export default class Loader {
return results;
}

private async fileLoader(): Promise<OperationConfig[]> {
private async fileLoader(ruleFile: string): Promise<OperationConfig[]> {
const parseConfig = this.parseConfig.bind(this);
const results: OperationConfig[] = [];

const rl = readline.createInterface({
input: fs.createReadStream(this.opConfig.file_path as string),
input: fs.createReadStream(ruleFile),
crlfDelay: Infinity
});
// TODO: error handling here
Expand Down
75 changes: 52 additions & 23 deletions src/operations/index.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,81 @@

import _ from 'lodash';
import OperationBase from './lib/base';
import Join from './lib/ops/join';
import Selector from './lib/ops/selector';
import Extraction from './lib/ops/extraction';
import Geolocation from './lib/validations/geolocation';
import String from './lib/validations/string';
import Number from './lib/validations/number';
import Boolean from './lib/validations/boolean';
import StringValidation from './lib/validations/string';
import NumberValidation from './lib/validations/number';
import BooleanValidation from './lib/validations/boolean';
import Url from './lib/validations/url';
import Email from './lib/validations/email';
import Ip from './lib/validations/ip';
import Base64Decode from './lib/ops/base64decode';
import UrlDecode from './lib/ops/urldecode';
import HexDecode from './lib/ops/hexdecode';
import RequiredExtractions from './lib/validations/required_extractions';
import { OperationsDict, PluginClassType, BaseOperationClass, PluginList } from '../interfaces';

const opNames = {
join: Join,
selector: Selector,
extraction: Extraction,
geolocation: Geolocation,
string: String,
boolean: Boolean,
number: Number,
url: Url,
email: Email,
ip: Ip,
base64decode: Base64Decode,
urldecode: UrlDecode,
hexdecode: HexDecode,
requiredExtractions: RequiredExtractions
};
class CorePlugins implements PluginClassType {
init(): OperationsDict {
return {
join: Join,
selector: Selector,
extraction: Extraction,
geolocation: Geolocation,
string: StringValidation,
boolean: BooleanValidation,
number: NumberValidation,
url: Url,
email: Email,
ip: Ip,
base64decode: Base64Decode,
urldecode: UrlDecode,
hexdecode: HexDecode,
requiredExtractions: RequiredExtractions
};
}
}

class OperationsManager {
operations: OperationsDict;

constructor(pluginList: PluginList = []) {
pluginList.push(CorePlugins);

const operations = pluginList.reduce((plugins, PluginClass) => {
const plugin = new PluginClass();
const pluginOps = plugin.init();
_.assign(plugins, pluginOps);
return plugins;
}, {});

this.operations = operations;
}

getTransform(name: string): BaseOperationClass {
const op = this.operations[name];
if (!op) throw new Error(`could not find transform module ${name}`);
return op;
}
}

export {
OperationBase,
Join,
Selector,
Extraction,
Geolocation,
String,
Number,
Boolean,
StringValidation,
NumberValidation,
BooleanValidation,
Url,
Email,
Ip,
Base64Decode,
UrlDecode,
HexDecode,
RequiredExtractions,
opNames
OperationsManager
};
8 changes: 4 additions & 4 deletions src/operations/lib/ops/join.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@

import { DataEntity } from '@terascope/job-components';
import { JoinConfig } from '../../../interfaces';
import { OperationConfig } from '../../../interfaces';
import _ from 'lodash';
import OperationBase from '../base';

export default class Join extends OperationBase {
private delimiter: string;
private fields: string[];

constructor(config: JoinConfig) {
constructor(config: OperationConfig) {
super(config);
this.delimiter = config.delimiter !== undefined ? config.delimiter : '';
this.fields = config.fields;
this.fields = config.fields as string[];
}
// source work differently here so we do not use the inherited validate
protected validate(config: JoinConfig) {
protected validate(config: OperationConfig) {
const { target_field: tField, remove_source } = config;
if (!tField || typeof tField !== 'string' || tField.length === 0) {
throw new Error(`could not find target_field for ${this.constructor.name} validation or it is improperly formatted, config: ${JSON.stringify(config)}`);
Expand Down
6 changes: 3 additions & 3 deletions src/operations/lib/ops/selector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
import { DocumentMatcher } from 'xlucene-evaluator';
import { DataEntity } from '@terascope/job-components';
import OperationBase from '../base';
import { OperationConfig } from '../../../interfaces';
import { OperationConfig, SelectorTypes } from '../../../interfaces';

export default class Selector extends OperationBase {
private documentMatcher: DocumentMatcher;
public selector: string;
private isMatchAll: boolean;

constructor(config: OperationConfig, typeConfigs?: object) {
constructor(config: OperationConfig, types?: SelectorTypes) {
super(config);
let luceneQuery = config.selector as string;
if (typeof luceneQuery !== 'string') throw new Error('selector must be a string');
this.selector = luceneQuery;
this.isMatchAll = luceneQuery === '*';
if (this.isMatchAll) luceneQuery = '';
this.documentMatcher = new DocumentMatcher(luceneQuery, typeConfigs);
this.documentMatcher = new DocumentMatcher(luceneQuery, types);
}

addMetaData(doc: DataEntity, selector: string) {
Expand Down
Loading

0 comments on commit 6fc1cb5

Please sign in to comment.