Skip to content

Commit

Permalink
Fix issue singer-io#2. Add singer message support
Browse files Browse the repository at this point in the history
  • Loading branch information
gris committed Aug 15, 2017
1 parent 2e76485 commit 2cae0f5
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 38 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,7 @@ ENV/
# Rope project settings
.ropeproject

*~
*~

# VS Code project settings
.vscode/
62 changes: 25 additions & 37 deletions target_csv.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/env python3

import argparse
import io
import os
Expand All @@ -11,8 +9,8 @@
import urllib
from datetime import datetime
import collections

import pkg_resources

from jsonschema.validators import Draft4Validator
import singer

Expand All @@ -35,40 +33,34 @@ def flatten(d, parent_key='', sep='__'):
items.append((new_key, str(v) if type(v) is list else v))
return dict(items)

def persist_lines(delimiter, quotechar, lines):
def persist_messages(delimiter, quotechar, messages):
state = None
schemas = {}
key_properties = {}
headers = {}
validators = {}

now = datetime.now().strftime('%Y%m%dT%H%M%S')

for line in lines:
for message in messages:
try:
o = json.loads(line)
o = singer.parse_message(message).asdict()
except json.decoder.JSONDecodeError:
logger.error("Unable to parse:\n{}".format(line))
logger.error("Unable to parse:\n{}".format(message))
raise

if 'type' not in o:
raise Exception("Line is missing required key 'type': {}".format(line))
t = o['type']

if t == 'RECORD':
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(line))
message_type = o['type']
if message_type == 'RECORD':
if o['stream'] not in schemas:
raise Exception("A record for stream {} was encountered before a corresponding schema".format(o['stream']))
raise Exception("A record for stream {}"
"was encountered before a corresponding schema".format(o['stream']))

schema = schemas[o['stream']]
validators[o['stream']].validate(o['record'])

filename = o['stream'] + '-' + now + '.csv'
file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0

flattened_record = flatten(o['record'])

if o['stream'] not in headers and not file_is_empty:
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile,
Expand All @@ -78,35 +70,31 @@ def persist_lines(delimiter, quotechar, lines):
headers[o['stream']] = first_line if first_line else flattened_record.keys()
else:
headers[o['stream']] = flattened_record.keys()

with open(filename, 'a') as csvfile:
writer = csv.DictWriter(csvfile,
writer = csv.DictWriter(csvfile,
headers[o['stream']],
extrasaction='ignore',
delimiter=delimiter,
quotechar=quotechar)
if file_is_empty:
writer.writeheader()
writer.writerow(flattened_record)

writer.writerow(flattened_record)

state = None
elif t == 'STATE':
elif message_type == 'STATE':
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif t == 'SCHEMA':
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(line))
elif message_type == 'SCHEMA':
stream = o['stream']
schemas[stream] = o['schema']
validators[stream] = Draft4Validator(o['schema'])
if 'key_properties' not in o:
raise Exception("key_properties field is required")
key_properties[stream] = o['key_properties']
else:
raise Exception("Unknown message type {} in message {}"
.format(o['type'], o))

return state


Expand Down Expand Up @@ -135,8 +123,8 @@ def main():
args = parser.parse_args()

if args.config:
with open(args.config) as input:
config = json.load(input)
with open(args.config) as input_json:
config = json.load(input_json)
else:
config = {}

Expand All @@ -146,11 +134,11 @@ def main():
'the config parameter "disable_collection" to true')
threading.Thread(target=send_usage_stats).start()

input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_lines(config.get('delimiter', ','),
config.get('quotechar', '"'),
input)
input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_messages(config.get('delimiter', ','),
config.get('quotechar', '"'),
input_messages)

emit_state(state)
logger.debug("Exiting normally")

Expand Down

0 comments on commit 2cae0f5

Please sign in to comment.