diff --git a/README.md b/README.md index baf0947..343e1e0 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Because for (mostly) structured documents where we're expecting the schema to ra How ------ -Malort will read through a directory of .json or flat text files with delimited JSON blobs and generate relevant statistics on each key. +Malort will read through a directory of .json or flat text files (optionally gzipped) with delimited JSON blobs and generate relevant statistics on each key. It uses the Dask libary to parallelize these computations. For example, let's look at a directory with two JSON files, and one text file with newline-delimited JSON: ```json @@ -109,18 +109,16 @@ Install API --- -* `result = malort.analyze(path, delimiter='\n', parse_timestamps=True)` +* `result = malort.analyze(path, parse_timestamps=True)` ```python -Analyze a given directory of either .json or flat text files -with delimited JSON to get relevant key statistics. +Analyze a given directory of either .json, flat text files +with newline-delimited JSON, or gzipped files with newline-delimted JSON to get relevant key statistics. Parameters ---------- path: string Path to directory -delimiter: string, default newline - For flat text files, the JSON blob delimiter parse_timestamps: boolean, default True If True, will attempt to regex match ISO8601 formatted parse_timestamps ``` @@ -158,7 +156,7 @@ Yes. How fast is it? --------------- -With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster. +With timestamp parsing turned on, I used Malort to process 2.1 GB of files (1,326,794 nested JSON blobs) in 8 minutes. There are undoubtedly ways to do it faster. Speed will depend on a number of factors, including nesting depth. Should I use the column type results verbatim? ---------------------------------------------- diff --git a/malort/core.py b/malort/core.py index bff5cc0..2a275c5 100644 --- a/malort/core.py +++ b/malort/core.py @@ -6,11 +6,10 @@ JSON -> Postgres Column types """ -from __future__ import absolute_import -from __future__ import print_function -from __future__ import division +from __future__ import absolute_import, print_function, division from collections import defaultdict +from functools import partial import json import os from os.path import isfile, join, splitext @@ -18,11 +17,13 @@ import re import time -from malort.stats import recur_dict, dict_generator +import dask.bag as db + +from malort.stats import recur_dict, combine_stats, dict_generator from malort.type_mappers import TypeMappers -def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): +def analyze(path, parse_timestamps=True, **kwargs): """ Analyze a given directory of either .json or flat text files with delimited JSON to get relevant key statistics. @@ -31,19 +32,21 @@ def analyze(path, delimiter='\n', parse_timestamps=True, **kwargs): ---------- path: string Path to directory - delimiter: string, default newline - For flat text files, the JSON blob delimiter parse_timestamps: boolean, default True If True, will attempt to regex match ISO8601 formatted parse_timestamps - kwargs: - passed into json.loads. Here you can specify encoding, etc. + kwargs: + passed into json.loads. Here you can specify encoding, etc. """ stats = {} start_time = time.time() - for count, blob in enumerate(dict_generator(path, delimiter, **kwargs), start=1): - recur_dict(blob, stats, parse_timestamps=parse_timestamps) + file_list = [os.path.join(path, f) for f in os.listdir(path)] + bag = db.from_filenames(file_list).map(json.loads) + recur_partial = partial(recur_dict, parse_timestamps=parse_timestamps) + stats = bag.fold(recur_partial, combine_stats, initial={}).compute() + count = stats["total_records"] + del stats["total_records"] elapsed = time.time() - start_time print('Malort run finished: {} JSON blobs analyzed in {} seconds.' diff --git a/malort/stats.py b/malort/stats.py index a6fc222..bb98cac 100644 --- a/malort/stats.py +++ b/malort/stats.py @@ -6,9 +6,7 @@ Functions to generate Malort stats """ -from __future__ import absolute_import -from __future__ import print_function -from __future__ import division +from __future__ import absolute_import, print_function, division import decimal import json @@ -25,6 +23,7 @@ \d([\.,]\d+)?)?([zZ]|([\+-])([01]\d|2[0-3]):?([0-5]\d)?) ?)?)?$""", re.VERBOSE) + def delimited(file, delimiter='\n', bufsize=4096): buf = '' while True: @@ -38,6 +37,7 @@ def delimited(file, delimiter='\n', bufsize=4096): yield line buf = lines[-1] + def catch_json_error(blob, filepath, **kwargs): """Wrapper to provide better error message for JSON reads""" try: @@ -48,6 +48,7 @@ def catch_json_error(blob, filepath, **kwargs): return parsed + def dict_generator(path, delimiter='\n', **kwargs): """ Given a directory path, return a generator that will return a dict for each @@ -80,11 +81,109 @@ def dict_generator(path, delimiter='\n', **kwargs): else: yield catch_json_error(fread.read(), filepath, **kwargs) + def get_new_mean(value, current_mean, count): """Given a value, current mean, and count, return new mean""" summed = current_mean * count return (summed + value)/(count + 1) + +def combine_means(means, counts): + """Combine ordered iter of means and counts""" + numer = sum([mean * count for mean, count in zip(means, counts) + if mean is not None and count is not None]) + denom = sum([c for c in counts if c is not None]) + return round(numer / denom, 3) + + +def combine_stats(accum, value): + """ + Combine two sets of stats into one. Used for final dask rollup of + multiple partitions of stats dicts into one unified stats dict. Best + thought of as a reduction over multiple stats object. + + Parameters + ---------- + accum: dict + Accumlating tats dict + value: dict + New stats dict to merge with accumulator + + Returns + ------- + dict + """ + + for field_name, type_stats in value.items(): + + # Update total count + if field_name == "total_records": + accum["total_records"] = accum["total_records"] + type_stats + continue + + # Combine accum stats from different branches + if accum.get(field_name): + for value_type, val_stats in type_stats.items(): + accum_entry = accum[field_name].get(value_type) + if not accum_entry: + # If accum doesn't already have an entry, but the + # value does, continue + accum[field_name][value_type] = val_stats + continue + + # base_key is not a statistic to be updated + if value_type == "base_key": + continue + + if accum_entry: + max_ = (accum_entry.get("max"), val_stats.get("max")) + min_ = (accum_entry.get("min"), val_stats.get("min")) + count = (accum_entry.get("count"), val_stats.get("count")) + mean = (accum_entry.get("mean"), val_stats.get("mean")) + + if any(max_): + accum_entry["max"] = max(max_) + if any(min_): + accum_entry["min"] = min(min_) + if any(count): + accum_entry["count"] = sum([c for c in count + if c is not None]) + if any(mean): + accum_entry["mean"] = combine_means(mean, count) + + # Type specific entries + if value_type == "float": + + fixed_length = (accum_entry.get("fixed_length"), + val_stats.get("fixed_length")) + + # Decimals not fixed length if prec/scale do not match + a_prec, a_scale = (accum_entry.get("max_precision"), + accum_entry.get("max_scale")) + v_prec, v_scale = (val_stats.get("max_precision"), + val_stats.get("max_scale")) + prec_scale_eq = (a_prec == v_prec, a_scale == v_scale) + + if not all(fixed_length) or not all(prec_scale_eq): + accum_entry["fixed_length"] = False + + max_prec = (accum_entry.get("max_precision"), + val_stats.get("max_precision")) + accum_entry["max_precision"] = max(max_prec) + + max_scale = (accum_entry.get("max_scale"), + val_stats.get("max_scale")) + accum_entry["max_scale"] = max(max_scale) + + elif value_type == "str": + samples = accum_entry.get("sample", []) +\ + val_stats.get("sample", []) + accum_entry["sample"] = random.sample( + samples, min(len(samples), 3)) + + return accum + + def updated_entry_stats(value, current_stats, parse_timestamps=True): """ Given a value and a dict of current statistics, return a dict of new @@ -156,7 +255,7 @@ def updated_entry_stats(value, current_stats, parse_timestamps=True): return value_type, new_stats -def recur_dict(value, stats, parent=None, **kwargs): +def recur_dict(stats, value, parent=None, **kwargs): """ Recurse through a dict `value` and update `stats` for each field. Can handle nested dicts, lists of dicts, and lists of values (must be @@ -170,9 +269,12 @@ def recur_dict(value, stats, parent=None, **kwargs): Parent key to get key nesting depth. kwargs: Options for update_entry_stats """ - parent = parent or '' + if parent == '': + total_records = stats.get("total_records") + stats["total_records"] = (total_records + 1) if total_records else 1 + def update_stats(current_val, nested_path, base_key): "Updater function" if nested_path not in stats: @@ -187,14 +289,14 @@ def update_stats(current_val, nested_path, base_key): for k, v in value.items(): parent_path = '.'.join([parent, k]) if parent != '' else k if isinstance(v, (list, dict)): - recur_dict(v, stats, parent_path) + recur_dict(stats, v, parent_path) else: update_stats(v, parent_path, k) elif isinstance(value, list): for v in value: if isinstance(v, (list, dict)): - recur_dict(v, stats, parent) + recur_dict(stats, v, parent) else: base_key = parent.split(".")[-1] update_stats(json.dumps(value), parent, base_key) diff --git a/malort/test_helpers.py b/malort/test_helpers.py index a2169d8..100c75e 100644 --- a/malort/test_helpers.py +++ b/malort/test_helpers.py @@ -12,16 +12,20 @@ TEST_FILES_1 = os.path.normpath(os.path.join(os.path.abspath(__file__), '..', 'tests', 'test_files')) TEST_FILES_2 = os.path.normpath(os.path.join(os.path.abspath(__file__), - '..', 'tests', 'test_files_delimited')) + '..', 'tests', 'test_files_newline_delimited')) TEST_FILES_3 = os.path.normpath(os.path.join(os.path.abspath(__file__), '..', 'tests', 'test_files_nested')) - +TEST_FILES_4 = os.path.normpath(os.path.join(os.path.abspath(__file__), + '..', 'tests', 'test_files_mult_type')) class TestHelpers(unittest.TestCase): def assert_stats(self, result, expected): """Test helper for testing stats results""" for key, value in result.items(): + if key == 'total_records': + self.assertEqual(expected['total_records'], value) + continue for typek, typev in value.items(): if typek == 'str': for k, v in typev.items(): diff --git a/malort/tests/test_files/test1.json b/malort/tests/test_files/test1.json index 4cce51e..dbed660 100644 --- a/malort/tests/test_files/test1.json +++ b/malort/tests/test_files/test1.json @@ -1,5 +1 @@ -{"intfield": 5, - "floatfield": 2.345, - "datefield": "2014-09-26 17:00:00", - "charfield": "fixedlength", - "varcharfield": "var"} \ No newline at end of file +{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"} diff --git a/malort/tests/test_files/test2.json b/malort/tests/test_files/test2.json index a1acbac..96a6b2c 100644 --- a/malort/tests/test_files/test2.json +++ b/malort/tests/test_files/test2.json @@ -1,5 +1 @@ -{"intfield": 10, - "floatfield": 4.7891, - "datefield": "2014-09-26 17:00:00", - "charfield": "fixedlength", - "varcharfield": "varyin"} \ No newline at end of file +{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"} diff --git a/malort/tests/test_files/test3.json b/malort/tests/test_files/test3.json new file mode 100644 index 0000000..9bc437e --- /dev/null +++ b/malort/tests/test_files/test3.json @@ -0,0 +1 @@ +{"intfield": 15,"floatfield": 3.0012,"charfield": "fixedlength","varcharfield": "varyingle","datefield": "2014-09-26 17:00:00"} diff --git a/malort/tests/test_files/test4.json b/malort/tests/test_files/test4.json new file mode 100644 index 0000000..06d31d5 --- /dev/null +++ b/malort/tests/test_files/test4.json @@ -0,0 +1 @@ +{"intfield": 20,"floatfield": 10.8392,"charfield": "fixedlength","varcharfield": "varyinglengt","datefield": "2014-09-26 17:00:00"} diff --git a/malort/tests/test_files_delimited/test_pipe_delimited_1 b/malort/tests/test_files_delimited/test_pipe_delimited_1 deleted file mode 100644 index 95a7598..0000000 --- a/malort/tests/test_files_delimited/test_pipe_delimited_1 +++ /dev/null @@ -1 +0,0 @@ -{"foo": 10,"bar": 2.0,"baz": "fixed","qux": 10}|{"foo": "foo","bar": true,"baz": 2,"qux": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_files_delimited/test_pipe_delimited_2 b/malort/tests/test_files_delimited/test_pipe_delimited_2 deleted file mode 100644 index 883191a..0000000 --- a/malort/tests/test_files_delimited/test_pipe_delimited_2 +++ /dev/null @@ -1 +0,0 @@ -{"foo": 1000,"bar": "bar","baz": "fixed","qux": "var"}|{"foo": "foo","bar": 4.0,"baz": 1,"qux": "varyingle"} \ No newline at end of file diff --git a/malort/tests/test_files_mult_type/test_mult_delimited_1 b/malort/tests/test_files_mult_type/test_mult_delimited_1 new file mode 100644 index 0000000..53f36d6 --- /dev/null +++ b/malort/tests/test_files_mult_type/test_mult_delimited_1 @@ -0,0 +1,2 @@ +{"foo": 10,"bar": 2.0,"baz": "fixed","qux": 10} +{"foo": "foo","bar": true,"baz": 2,"qux": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_files_mult_type/test_mult_delimited_2 b/malort/tests/test_files_mult_type/test_mult_delimited_2 new file mode 100644 index 0000000..92e14a8 --- /dev/null +++ b/malort/tests/test_files_mult_type/test_mult_delimited_2 @@ -0,0 +1,2 @@ +{"foo": 1000,"bar": "bar","baz": "fixed","qux": "var"} +{"foo": "foo","bar": 4.0,"baz": 1,"qux": "varyingle"} \ No newline at end of file diff --git a/malort/tests/test_files/test3delimited b/malort/tests/test_files_newline_delimited/test_nl_delimited_1 similarity index 100% rename from malort/tests/test_files/test3delimited rename to malort/tests/test_files_newline_delimited/test_nl_delimited_1 diff --git a/malort/tests/test_files_newline_delimited/test_nl_delimited_2 b/malort/tests/test_files_newline_delimited/test_nl_delimited_2 new file mode 100644 index 0000000..24484a0 --- /dev/null +++ b/malort/tests/test_files_newline_delimited/test_nl_delimited_2 @@ -0,0 +1,2 @@ +{"intfield": 5,"floatfield": 2.345,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "var"} +{"intfield": 10,"floatfield": 4.7891,"datefield": "2014-09-26 17:00:00","charfield": "fixedlength","varcharfield": "varyin"} \ No newline at end of file diff --git a/malort/tests/test_malort_core.py b/malort/tests/test_malort_core.py index 7443201..9303b29 100644 --- a/malort/tests/test_malort_core.py +++ b/malort/tests/test_malort_core.py @@ -13,18 +13,16 @@ import malort as mt from malort.test_helpers import (TestHelpers, TEST_FILES_1, TEST_FILES_2, - TEST_FILES_3) + TEST_FILES_3, TEST_FILES_4) class TestCore(TestHelpers): - def test_files_1(self): - mtresult = mt.analyze(TEST_FILES_1) - expected = { + expected_1_and_2 = { 'charfield': {'str': {'count': 4, 'max': 11, 'mean': 11.0, 'min': 11, 'sample': ['fixedlength']}, 'base_key': 'charfield'}, - 'floatfield': {'float': {'count': 4, 'max': 10.8392, 'mean': 5.243, + 'floatfield': {'float': {'count': 4, 'max': 10.8392, 'mean': 5.244, 'min': 2.345, 'max_precision': 6, 'max_scale': 4, 'fixed_length': False}, 'base_key': 'floatfield'}, @@ -36,13 +34,39 @@ def test_files_1(self): 'sample': ['var', 'varyin', 'varyingle', 'varyinglengt']}, 'base_key': 'varcharfield'}, - 'datefield': {'datetime': {'count': 4}, 'base_key': 'datefield'} - } - self.assert_stats(mtresult.stats, expected) + 'datefield': {'datetime': {'count': 4}, 'base_key': 'datefield'}, + } + + def test_files_1(self): + mtresult = mt.analyze(TEST_FILES_1) + self.assertEqual(mtresult.count, 4) + self.assert_stats(mtresult.stats, self.expected_1_and_2) self.assertDictEqual(mtresult.get_conflicting_types(), {}) def test_files_2(self): - mtresult = mt.analyze(TEST_FILES_2, '|') + mtresult = mt.analyze(TEST_FILES_2) + self.assertEqual(mtresult.count, 4) + self.assert_stats(mtresult.stats, self.expected_1_and_2) + self.assertDictEqual(mtresult.get_conflicting_types(), {}) + + def test_files_3(self): + mtresult = mt.analyze(TEST_FILES_3) + expected = {'baz.qux': {'base_key': 'qux', + 'str': {'count': 3, + 'max': 5, + 'mean': 3.667, + 'min': 3, + 'sample': ['One', 'Two', 'Three']}}, + 'foo.bar': {'base_key': 'bar', + 'int': {'count': 3, 'max': 30, 'mean': 20.0, + 'min': 10}}, + 'qux': {'base_key': 'qux', 'bool': {'count': 1}}} + self.assert_stats(mtresult.stats, expected) + self.assertEqual(mtresult.count, 3) + self.assert_stats(mtresult.get_conflicting_types(), expected) + + def test_files_4(self): + mtresult = mt.analyze(TEST_FILES_4) expected = { 'bar': {'bool': {'count': 1}, 'float': {'count': 2, 'max': 4.0, 'mean': 3.0, 'min': 2.0, @@ -64,22 +88,9 @@ def test_files_2(self): 'sample': ['var', 'varyin', 'varyingle']}, 'base_key': 'qux'} } - self.assert_stats(mtresult.stats, expected) - self.assert_stats(mtresult.get_conflicting_types(), expected) - def test_files_3(self): - mtresult = mt.analyze(TEST_FILES_3) - expected = {'baz.qux': {'base_key': 'qux', - 'str': {'count': 3, - 'max': 5, - 'mean': 3.667, - 'min': 3, - 'sample': ['One', 'Two', 'Three']}}, - 'foo.bar': {'base_key': 'bar', - 'int': {'count': 3, 'max': 30, 'mean': 20.0, - 'min': 10}}, - 'qux': {'base_key': 'qux', 'bool': {'count': 1}}} self.assert_stats(mtresult.stats, expected) + self.assertEqual(mtresult.count, 4) self.assert_stats(mtresult.get_conflicting_types(), expected) def test_gen_redshift_jsonpaths(self): diff --git a/malort/tests/test_malort_mappers.py b/malort/tests/test_malort_mappers.py index 108507d..c398677 100644 --- a/malort/tests/test_malort_mappers.py +++ b/malort/tests/test_malort_mappers.py @@ -9,7 +9,7 @@ import malort as mt from malort.test_helpers import (TestHelpers, TEST_FILES_1, TEST_FILES_2, - TEST_FILES_3) + TEST_FILES_3, TEST_FILES_4) class TestTypeMappers(unittest.TestCase): @@ -27,7 +27,7 @@ def test_files_1(self): self.assertDictEqual(types, expected) def test_files_2(self): - mtresult = mt.analyze(TEST_FILES_2, '|') + mtresult = mt.analyze(TEST_FILES_4) types = mtresult.get_redshift_types() for k, v in types.items(): self.assertEqual(v, 'Multiple types detected.') diff --git a/malort/tests/test_malort_stats.py b/malort/tests/test_malort_stats.py index 04c5e90..b2fcd96 100644 --- a/malort/tests/test_malort_stats.py +++ b/malort/tests/test_malort_stats.py @@ -24,10 +24,6 @@ def test_json_files_newline(self): gen = mt.stats.dict_generator(TEST_FILES_1) self.assertEquals(len([d for d in gen]), 4) - def test_json_files_pipe(self): - gen = mt.stats.dict_generator(TEST_FILES_2, '|') - self.assertEquals(len([d for d in gen]), 4) - class TestUpdateEntryStats(TestHelpers): @@ -121,13 +117,14 @@ def test_recur_simple(self): 'key4': {'bool': {'count': 1}, 'base_key': 'key4'}, 'key5': {'str': {'count': 1, 'max': 23, 'mean': 23.0, 'min': 23, 'sample': ['["one", "two", "three"]']}, - 'base_key': 'key5'} + 'base_key': 'key5'}, + 'total_records': 1 } - stats = mt.stats.recur_dict(simple1, {}) + stats = mt.stats.recur_dict({}, simple1) self.assertDictEqual(stats, expected) - updated_stats = mt.stats.recur_dict({'key1': 2}, stats) + updated_stats = mt.stats.recur_dict(stats, {'key1': 2}) self.assertDictEqual(updated_stats['key1'], {'int': {'count': 2, 'max': 2, 'mean': 1.5, 'min': 1}, 'base_key': 'key1'}) @@ -175,9 +172,10 @@ def test_recur_depth_one(self): 'max_scale': 1, 'mean': 8.0, 'min': 8.0}}, - 'key5.key4': {'base_key': 'key4', 'bool': {'count': 1}}} + 'key5.key4': {'base_key': 'key4', 'bool': {'count': 1}}, + 'total_records': 1} - stats = mt.stats.recur_dict(depth_one, {}) + stats = mt.stats.recur_dict({}, depth_one) self.assert_stats(stats, expected) @property @@ -239,7 +237,8 @@ def depth_two_expected(self): 'max_scale': 1, 'mean': 2.0, 'min': 2.0}}, - 'key5.key6.key4': {'base_key': 'key4', 'bool': {'count': 1}}} + 'key5.key6.key4': {'base_key': 'key4', 'bool': {'count': 1}}, + 'total_records': 1} def test_recur_depth_two(self): depth_two = { @@ -252,7 +251,7 @@ def test_recur_depth_two(self): } } - stats = mt.stats.recur_dict(depth_two, {}) + stats = mt.stats.recur_dict({}, depth_two) self.assert_stats(stats, self.depth_two_expected) def test_recur_with_array(self): @@ -264,7 +263,7 @@ def test_recur_with_array(self): ] } - stats = mt.stats.recur_dict(with_list, {}) + stats = mt.stats.recur_dict({}, with_list) self.assert_stats(stats, self.depth_two_expected) def test_recur_with_val_array(self): @@ -274,7 +273,7 @@ def test_recur_with_val_array(self): "key3": [{"key2": ["foo", "bar"]}] } - stats = mt.stats.recur_dict(with_list, {}) + stats = mt.stats.recur_dict({}, with_list) expected = {'key1': {'base_key': 'key1', 'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}}, @@ -289,7 +288,8 @@ def test_recur_with_val_array(self): 'max': 14, 'mean': 14.0, 'min': 14, - 'sample': ['["foo", "bar"]']}}} + 'sample': ['["foo", "bar"]']}}, + 'total_records': 1} self.assert_stats(stats, expected) def test_raises_with_list_of_unknown(self): @@ -299,4 +299,118 @@ def test_raises_with_list_of_unknown(self): } with pytest.raises(TypeError): - mt.stats.recur_dict(with_values, {}) + mt.stats.recur_dict({}, with_values) + + +class TestStatsCombiner(TestHelpers): + + def test_simple_stat_agg(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'total_records': 3 + } + value = { + 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 9, 'max': 5, 'mean': 6.0, 'min': 0, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'total_records': 1 + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 2, 'max': 4, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 10, 'max': 5, 'mean': 5.7, 'min': 0, + 'sample': ['Foo', 'Foo']}, + 'base_key': 'key2'}, + 'total_records': 4 + } + self.assert_stats(combined, expected) + + def test_value_missing_key(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1', + 'float': {'count': 1, 'fixed_length': True, 'max': 4.0, + 'max_precision': 2, 'max_scale': 1, 'mean': 4.0, + 'min': 4.0}}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'total_records': 1 + } + value = { + 'key1': {'int': {'count': 1, 'max': 4, 'mean': 1.0, 'min': 4}, + 'base_key': 'key1', + 'float': {'count': 12, 'fixed_length': False, 'max': 2.0, + 'max_precision': 10, 'max_scale': 0, + 'mean': 10.0, 'min': 1.0}}, + 'total_records': 10 + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 2, 'max': 4, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1', + 'float': {'count': 13, 'fixed_length': False, 'max': 4.0, + 'max_precision': 10, 'max_scale': 1, + 'mean': 9.538, 'min': 1.0}}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'total_records': 11 + } + self.assert_stats(combined, expected) + + def test_accum_missing_key(self): + accum = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'base_key': 'key1'}, + 'total_records': 2 + } + + value = { + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'key1': {'str': {'count': 1, 'max': 2, 'mean': 2.0, 'min': 2, + 'sample': ['Foo']}}, + 'total_records': 2 + } + combined = mt.stats.combine_stats(accum, value) + expected = { + 'key1': {'int': {'count': 1, 'max': 1, 'mean': 1.0, 'min': 1}, + 'str': {'count': 1, 'max': 2, 'mean': 2.0, 'min': 2, + 'sample': ['Foo']}, + 'base_key': 'key1'}, + 'key2': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': ['Foo']}, + 'base_key': 'key2'}, + 'total_records': 4 + } + self.assert_stats(combined, expected) + + def test_mult_sample(self): + samples = ["foo", "bar", "baz", "qux", "Foo", "Bar"] + accum = { + 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': samples[0:4]}, + 'base_key': 'key2'}, + 'total_records': 1 + } + + value = { + 'key1': {'str': {'count': 1, 'max': 3, 'mean': 3.0, 'min': 3, + 'sample': samples[4:]}, + 'base_key': 'key2'}, + 'total_records': 1 + } + + combined = mt.stats.combine_stats(accum, value) + sample_key = combined['key1']['str']['sample'] + assert len(set(sample_key).difference(set(samples))) == 0 + assert combined['total_records'] == 2 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..aed19c8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +dask==0.7.0 +dill==0.2.4 +numpy==1.9.2 +pandas==0.16.2 +python-dateutil==2.4.2 +pytz==2015.4 +six==1.9.0 +toolz==0.7.2 +wheel==0.24.0 diff --git a/setup.py b/setup.py index 066d82d..88bfea2 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='malort', - version='0.0.3', + version='0.0.4', description='JSON to Postgres Column Types', author='Rob Story', author_email='wrobstory@gmail.com',