From 2cae0f5070574679ba4bb99eeade69c271364c55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos=20Gris?= Date: Tue, 15 Aug 2017 10:56:59 -0300 Subject: [PATCH] Fix issue #2. Add singer message support --- .gitignore | 5 ++++- target_csv.py | 62 +++++++++++++++++++++------------------------------ 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 72b965a..b9dbe7f 100644 --- a/.gitignore +++ b/.gitignore @@ -88,4 +88,7 @@ ENV/ # Rope project settings .ropeproject -*~ \ No newline at end of file +*~ + +# VS Code project settings +.vscode/ diff --git a/target_csv.py b/target_csv.py index 43a8d13..253b52e 100755 --- a/target_csv.py +++ b/target_csv.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import argparse import io import os @@ -11,8 +9,8 @@ import urllib from datetime import datetime import collections - import pkg_resources + from jsonschema.validators import Draft4Validator import singer @@ -35,40 +33,34 @@ def flatten(d, parent_key='', sep='__'): items.append((new_key, str(v) if type(v) is list else v)) return dict(items) -def persist_lines(delimiter, quotechar, lines): +def persist_messages(delimiter, quotechar, messages): state = None schemas = {} key_properties = {} headers = {} validators = {} - + now = datetime.now().strftime('%Y%m%dT%H%M%S') - for line in lines: + for message in messages: try: - o = json.loads(line) + o = singer.parse_message(message).asdict() except json.decoder.JSONDecodeError: - logger.error("Unable to parse:\n{}".format(line)) + logger.error("Unable to parse:\n{}".format(message)) raise - - if 'type' not in o: - raise Exception("Line is missing required key 'type': {}".format(line)) - t = o['type'] - - if t == 'RECORD': - if 'stream' not in o: - raise Exception("Line is missing required key 'stream': {}".format(line)) + message_type = o['type'] + if message_type == 'RECORD': if o['stream'] not in schemas: - raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream'])) + raise Exception("A record for stream {}" + "was encountered before a corresponding schema".format(o['stream'])) - schema = schemas[o['stream']] validators[o['stream']].validate(o['record']) filename = o['stream'] + '-' + now + '.csv' file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0 flattened_record = flatten(o['record']) - + if o['stream'] not in headers and not file_is_empty: with open(filename, 'r') as csvfile: reader = csv.reader(csvfile, @@ -78,35 +70,31 @@ def persist_lines(delimiter, quotechar, lines): headers[o['stream']] = first_line if first_line else flattened_record.keys() else: headers[o['stream']] = flattened_record.keys() - + with open(filename, 'a') as csvfile: - writer = csv.DictWriter(csvfile, + writer = csv.DictWriter(csvfile, headers[o['stream']], extrasaction='ignore', delimiter=delimiter, quotechar=quotechar) if file_is_empty: writer.writeheader() - - writer.writerow(flattened_record) + + writer.writerow(flattened_record) state = None - elif t == 'STATE': + elif message_type == 'STATE': logger.debug('Setting state to {}'.format(o['value'])) state = o['value'] - elif t == 'SCHEMA': - if 'stream' not in o: - raise Exception("Line is missing required key 'stream': {}".format(line)) + elif message_type == 'SCHEMA': stream = o['stream'] schemas[stream] = o['schema'] validators[stream] = Draft4Validator(o['schema']) - if 'key_properties' not in o: - raise Exception("key_properties field is required") key_properties[stream] = o['key_properties'] else: raise Exception("Unknown message type {} in message {}" .format(o['type'], o)) - + return state @@ -135,8 +123,8 @@ def main(): args = parser.parse_args() if args.config: - with open(args.config) as input: - config = json.load(input) + with open(args.config) as input_json: + config = json.load(input_json) else: config = {} @@ -146,11 +134,11 @@ def main(): 'the config parameter "disable_collection" to true') threading.Thread(target=send_usage_stats).start() - input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') - state = persist_lines(config.get('delimiter', ','), - config.get('quotechar', '"'), - input) - + input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') + state = persist_messages(config.get('delimiter', ','), + config.get('quotechar', '"'), + input_messages) + emit_state(state) logger.debug("Exiting normally")