diff --git a/conda.recipe/meta.yaml b/conda.recipe/meta.yaml index e6898683..61c20acb 100644 --- a/conda.recipe/meta.yaml +++ b/conda.recipe/meta.yaml @@ -33,6 +33,9 @@ requirements: test: requires: + - avro # [py27] + - avro-python3 # [not py27] + - fastavro - pytest - h5py - pytables >=3.0.0 diff --git a/etc/requirements.txt b/etc/requirements.txt index 0748cb8b..93fc20d6 100644 --- a/etc/requirements.txt +++ b/etc/requirements.txt @@ -1,7 +1,7 @@ datashape >= 0.5.0 numpy >= 1.7 pandas >= 0.15.0 -toolz >= 0.7.3 +toolz >= 0.7.4 multipledispatch >= 0.4.7 networkx >= 1.0 dask >= 0.11.1 diff --git a/etc/requirements_avro.txt b/etc/requirements_avro.txt new file mode 100644 index 00000000..baf45049 --- /dev/null +++ b/etc/requirements_avro.txt @@ -0,0 +1,3 @@ +fastavro==0.12.1 +avro==1.8.1; python_version < '3' +avro-python3==1.8.1; python_version > '2.7' diff --git a/etc/requirements_ci.txt b/etc/requirements_ci.txt index b6f878d8..96547d50 100644 --- a/etc/requirements_ci.txt +++ b/etc/requirements_ci.txt @@ -1,3 +1,5 @@ +avro==1.8.1; python_version < '3' +avro-python3==1.8.1; python_version > '2.7' bcolz==0.12.1 bokeh==0.12.3 boto3==1.4.1 @@ -48,3 +50,4 @@ tblib==1.3.0 toolz==0.8.0 tornado==4.4.2 zict==0.0.3 +fastavro==0.12.1 diff --git a/extra-requirements.txt b/extra-requirements.txt index 6d69f7ac..f0050c60 100644 --- a/extra-requirements.txt +++ b/extra-requirements.txt @@ -6,4 +6,7 @@ sas7bdat paramiko pywebhdfs boto +avro; python_version < '3' +avro-python3; python_version > '2.7' +fastavro sqlalchemy-redshift diff --git a/odo/__init__.py b/odo/__init__.py index e3abdcaf..1b2d8105 100644 --- a/odo/__init__.py +++ b/odo/__init__.py @@ -61,10 +61,13 @@ from .backends.sparksql import SparkDataFrame with ignoring(ImportError): from .backends.url import URL +with ignoring(ImportError): + from .backends.avro import AVRO with ignoring(ImportError): from .backends.dask import dask + restart_ordering() # Restart multipledispatch ordering and do ordering diff --git a/odo/backends/avro.py b/odo/backends/avro.py new file mode 100644 index 00000000..7897458f --- /dev/null +++ b/odo/backends/avro.py @@ -0,0 +1,412 @@ +from __future__ import absolute_import, division, print_function + +import errno +import os +import uuid +import json +import fastavro +import six +from avro import schema, datafile, io +from avro.schema import AvroException +from multipledispatch import dispatch +import pandas as pd +from datashape import dshape, discover, var, Record, Map, Var, \ + Option, null, string, int8, int32, int64, float64, float32, boolean, bytes_ +import datashape.coretypes as ct +from collections import Iterator +from ..append import append +from ..convert import convert +from ..resource import resource +from ..temp import Temp + +try: + from avro.schema import make_avsc_object as schema_from_dict #Python 2.x +except ImportError: + from avro.schema import SchemaFromJSONData as schema_from_dict #Python 3.x + +PRIMITIVE_TYPES_MAP = { + 'null': null, + 'boolean': boolean, + 'string': string, + 'bytes': int8, + 'int': int32, + 'long': int64, + 'double': float64, + 'float': float32, +} + +NAMED_TYPES_MAP = { + 'fixed': bytes_, #TODO + 'enum': int8, #TODO + 'record': Record, + 'error': Record, #TODO +} + +COMPOUND_TYPES_MAP = { + 'array': Var, + 'map': Map, + 'union': null, #TODO + 'request': null, #TODO + 'error_union': null, #TODO +} + + +AVRO_TYPE_MAP = {} +AVRO_TYPE_MAP.update(PRIMITIVE_TYPES_MAP) +AVRO_TYPE_MAP.update(NAMED_TYPES_MAP) +AVRO_TYPE_MAP.update(COMPOUND_TYPES_MAP) + +dshape_to_avro_primitive_types = { + ct.int8: 'bytes', + ct.int16: 'int', + ct.int32: 'int', + ct.int64: 'long', + ct.float32: 'float', + ct.float64: 'double', + ct.date_: 'long', + ct.datetime_: 'long', + ct.string: 'string', + ct.bool_: 'boolean' +} + + +class AVRO(object): + """Wrapper object for reading and writing an Avro container file + + Parameters + ---------- + + uri : str + uri of avro data + + schema : avro.schema.Schema + User specified Avro schema object. Used to decode file or serialize new records to file. + schema is required to create a new Avro file. + If reading or appending to an existing Avro file, the writers_schema embedded in that file + will be used. + + codec : str + compression codec. Valid values: 'null', 'deflate', 'snappy' + + """ + def __init__(self, path, schema=None, codec='null', **kwargs): + self._path = path + self._schema = schema + self._codec = codec + + def __iter__(self): + return self.reader.__iter__() + + def next(self): + return self.reader.next() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + # Perform a close if there's no exception + if type is None: + self.reader.close() + self.writer.close() + + def _get_writers_schema(self): + """ + Extract writers schema embedded in an existing Avro file. + """ + reader = self.reader + return schema_from_dict(self.reader.schema) if reader else None + + path = property(lambda self: self._path) + codec = property(lambda self: self._codec) + + @property + def schema(self): + if not self._schema: + sch = self._get_writers_schema() + if sch is None: + raise AvroException("Couldn't extract writers schema from '{0}'. User must provide a valid schema".format(self.path)) + self._schema = sch + return self._schema + + @property + def reader(self): + if hasattr(self, '_reader'): + if hasattr(self, '_writer'): + self.flush() + return self._reader + else: + try: + reader_schema = self._schema.to_json() if self._schema else None + df_reader = fastavro.reader( + open(self.path, 'rb'), + reader_schema=reader_schema + ) + + return df_reader + except IOError as exc: + #If file doesn't exist, don't set _reader now. + #Allow for reevaluation later after file has been created. + #Otherwise, reraise exception + if exc.errno != errno.ENOENT: + raise exc + return None + + @staticmethod + def _get_append_writer(uri, writers_schema=None, codec='null'): + """ + Returns an isntance of avro.datafile.DataFileWriter for appending + to an existing avro file at `uri`. Does not take a writers schema, + because avro library requires that writers_schema embedded in existing + file be used for appending. + + Parameters + ---------- + + uri : str + uri of avro existing, non-empty avro file + + writers_schema : avro.schema.Schema object + If not None, checks that writers_schema in existing file is the same as supplied schema. + Avro does not allow writing records to a container file with multiple writers_schema. + + Returns + ------- + avro.datafile.DataFileWriter + """ + rec_writer = io.DatumWriter() + df_writer = datafile.DataFileWriter( + open(uri, 'a+b'), + rec_writer, + codec=codec + ) + #Check for embedded schema to ensure existing file is an avro file. + try: #Python 2.x API + schema_str = df_writer.get_meta('avro.schema') + except AttributeError: #Python 3.x API + schema_str = df_writer.GetMeta('avro.schema').decode("utf-8") + + embedded_schema = schema_from_dict(json.loads(schema_str)) + + #If writers_schema supplied, check for equality with embedded schema. + if writers_schema: + try: + assert embedded_schema == writers_schema + except AssertionError: + raise ValueError("writers_schema embedded in {uri} differs from user supplied schema for appending.") + + return df_writer + + @staticmethod + def _get_new_writer(uri, sch, codec='null'): + """ + Returns an isntance of avro.datafile.DataFileWriter for writing + to a new avro file at `uri`. + + Parameters + ---------- + + uri : str + uri of avro existing, non-empty avro file + + sch : avro.schema.Schema object + + Returns + ------- + avro.datafile.DataFileWriter + """ + rec_writer = io.DatumWriter() + try: #Python 2.x API + df_writer = datafile.DataFileWriter( + open(uri, 'wb'), + rec_writer, + writers_schema = sch, + codec=codec + ) + except TypeError: #Python 3.x API + df_writer = datafile.DataFileWriter( + open(uri, 'wb'), + rec_writer, + writer_schema = sch, + codec=codec + ) + + return df_writer + + @property + def writer(self): + if hasattr(self, '_writer'): + return self._writer + else: + if os.path.exists(self.path) and os.path.getsize(self.path) > 0: + df_writer = self._get_append_writer(self.path, self.schema, codec=self.codec) + else: + df_writer = self._get_new_writer(self.path, self.schema, codec=self.codec) + self._writer = df_writer + return df_writer + + def flush(self): + if hasattr(self, '_writer'): + self._writer.close() + del(self._writer) + +@resource.register('.+\.(avro)') +def resource_avro(uri, schema=None, codec='null', **kwargs): + return AVRO(uri, schema=schema, codec=codec, **kwargs) + +@dispatch(schema.RecordSchema) +def discover_schema(sch): + return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields]) + +@dispatch(schema.UnionSchema) +def discover_schema(sch): + try: + types = [s.type for s in sch.schemas] + assert "null" in types + types.remove("null") + assert len(types) == 1 + return Option(AVRO_TYPE_MAP[types[0]]) + except AssertionError: + raise TypeError("odo supports avro UnionSchema only for nullable fields." + \ + "Received {0}".format(str([s.type for s in sch.schemas]))) + +@dispatch(schema.PrimitiveSchema) +def discover_schema(sch): + return AVRO_TYPE_MAP[sch.type] + +@dispatch(schema.MapSchema) +def discover_schema(sch): + # Avro map types always have string keys, see https://avro.apache.org/docs/1.7.7/spec.html#Maps + return Map(string, discover_schema(sch.values)) + +@dispatch(schema.ArraySchema) +def discover_schema(sch): + return var * discover_schema(sch.items) + +@dispatch(object) +def discover_schema(sch): + raise TypeError('Unable to discover avro type %r' % type(sch).__name__) + +@discover.register(AVRO) +def discover_avro(f, **kwargs): + return discover_schema(f.schema) + +def make_avsc_object(ds, name="name", namespace="default", depth=0): + """ + Build Avro Schema from datashape definition + + Parameters + ---------- + ds : str, unicode, DataShape, or datashapes.coretypes.* + : string -- applied to named schema elements (i.e. record, error, fixed, enum) + : string -- applied to named schema elements + depth=0: Tracking parameter for recursion depth. Should not be set by user. + + Examples + -------- + >>> test_dshape = "var * {letter: string, value: ?int32}" + >>> x = make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo") + >>> x == {'fields': [{'name': 'letter', 'type': 'string'}, + ... {'name': 'value', 'type': ['null', 'int']}], + ... 'name': 'my_record', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'} + True + >>> test_dshape = ''' + ... var * { + ... field_1: int32, + ... field_2: string, + ... field_3: ?int64, + ... features: map[string, float64], + ... words: var * string, + ... nested_record: var * {field_1: int64, field_2: float32} + ... } + ... ''' + >>> x = make_avsc_object(test_dshape, name="my_record", namespace="com.blaze.odo") + >>> x == {'fields': [{'name': 'field_1', 'type': 'int'}, + ... {'name': 'field_2', 'type': 'string'}, + ... {'name': 'field_3', 'type': ['null', 'long']}, + ... {'name': 'features', 'type': {'type': 'map', 'values': 'double'}}, + ... {'name': 'words', 'type': {'items': 'string', 'type': 'array'}}, + ... {'name': 'nested_record', + ... 'type': {'items': {'fields': [{'name': 'field_1', + ... 'type': 'long'}, + ... {'name': 'field_2', + ... 'type': 'float'}], + ... 'name': 'my_recordd0d1', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'}, + ... 'type': 'array'}}], + ... 'name': 'my_record', + ... 'namespace': 'com.blaze.odo', + ... 'type': 'record'} + True + """ + + try: + assert depth >= 0 + except AssertionError: + raise ValueError("depth argument must be >= 0") + + #parse string to datashape object if necessary + if isinstance(ds, six.string_types) or isinstance(ds, six.text_type): + ds = dshape(ds) + if isinstance(ds, ct.DataShape): + if depth>0: + assert isinstance(ds.parameters[0], ct.Var), "Cannot support fixed length substructure in Avro schemas" + return {"type": "array", "items": make_avsc_object(ds.measure, name=name+"d%d" % depth, namespace=namespace, depth=depth+1)} + elif depth==0: + ds = ds.measure + + if isinstance(ds, ct.Record): + return { + "type": "record", + "namespace": namespace, + "name": name, + "fields": [{"type": make_avsc_object(typ, name=name+"d%d" % depth, namespace=namespace, depth=depth+1), + "name": n} for (typ, n) in zip(ds.measure.types, ds.measure.names)] + + } + if isinstance(ds, ct.Map): + assert ds.key == ct.string, "Avro map types only support string keys. Cannot form map with key type %s" % ds.key + return { + "type": "map", + "values": make_avsc_object(ds.value, name=name+"d%d" % depth, namespace=namespace, depth=depth+1) + } + + if isinstance(ds, ct.Option): + return ["null", make_avsc_object(ds.ty, name=name+"d%d" % depth, namespace=namespace, depth=depth+1)] + if ds in dshape_to_avro_primitive_types: + return dshape_to_avro_primitive_types[ds] + + raise NotImplementedError("No avro type known for %s" % ds) + + +@convert.register(pd.DataFrame, AVRO, cost=4.0) +def avro_to_DataFrame(avro, dshape=None, **kwargs): + #XXX:AEH:todo - correct for pandas automated type conversions. e.g. strings containing numbers get cast to numeric. + #XXX:AEH:todo - column with nulls just becomes an "object" column. + df = pd.DataFrame([r for r in avro]) + names = [f.name for f in avro.schema.fields] + df = df[names] + return df + +@convert.register(Temp(AVRO), Iterator, cost=1.0) +def convert_iterator_to_temporary_avro(data, schema=None, **kwargs): + fn = '.%s.avro' % uuid.uuid1() + avro = Temp(AVRO)(fn, schema, **kwargs) + return append(avro, data, **kwargs) + +@convert.register(Iterator, AVRO, cost=1.0) +def avro_to_iterator(s, **kwargs): + return iter(s) + +@append.register(AVRO, Iterator) +def append_iterator_to_avro(tgt_avro, src_itr, **kwargs): + for datum in src_itr: + tgt_avro.writer.append(datum) + tgt_avro.flush() + +@append.register(AVRO, object) # anything else +def append_anything_to_iterator(tgt, src, **kwargs): + source_as_iter = convert(Iterator, src, **kwargs) + return append(tgt, source_as_iter, **kwargs) \ No newline at end of file diff --git a/odo/backends/tests/test_avro.py b/odo/backends/tests/test_avro.py new file mode 100644 index 00000000..7b59f72f --- /dev/null +++ b/odo/backends/tests/test_avro.py @@ -0,0 +1,162 @@ +from __future__ import absolute_import, division, print_function + +import pytest + +import avro.schema as schema +try: + from avro.schema import parse #Python 2.x +except ImportError: + from avro.schema import Parse as parse #Python 3.x + +from collections import Iterator +import pandas as pd +from pandas.util.testing import assert_frame_equal +from odo.backends.avro import discover, AVRO + +from odo.utils import tmpfile, into_path +from odo import append, convert, resource, dshape + +test_schema_str = """ +{ + "type" : "record", + "namespace" : "dataset", + "name" : "test_dataset", + "fields": [ + {"type": "int" , "name": "field_1"}, + {"type": "string", "name": "field_2"}, + {"default": null, "name": "field_3", "type": ["null", "long"]}, + { "name": "features", "type": { "type": "map", "values": "double"}}, + { "name": "words", "type": {"type": "array", "items": "string"}} + ] +} +""" + +test_data = [{'features': {'bhettcdl': 0.8581552641969377, + 'ka': 0.9873135485253831, + 'sgmlbagyfb': 0.5796466618955293, + 'vdqvnqgqbrjtkug': 0.4938648291874551}, + 'field_1': 2072373602, + 'field_2': 'mxllbfxk', + 'field_3': -3887990995227229804, + 'words': ['ciplc', 'htvixoujptehr', 'rbeiimkevsn']}, + {'features': {'atqqsuttysdrursxlynwcrmfrwcrdxaegfnidvwjxamoj': 0.2697279678696263, + 'dpjw': 0.760489536392584, + 'inihhrtnawyopu': 0.08511455977126114, + 'kjb': 0.8279248178446112, + 'wqlecjb': 0.8241169129373344}, + 'field_1': 517434305, + 'field_2': 'frgcnqrocddimu', + 'field_3': None, + 'words': ['ignsrafxpgu', 'ckg']}, + {'features': {'': 0.4304848508533662, + 'crslipya': 0.1596449079423896, + 'imbfgwnaphh': 0.19323554138270294}, + 'field_1': 1925434607, + 'field_2': 'aurlydvgfygmu', + 'field_3': None, + 'words': ['rqdpanbbcemg', 'auurshsxxkp', 'rdngxdthekt']}, + {'features': {'dv': 0.9635053430456509, + 'lhljgywersxjp': 0.5289026834129389, + 'nmtns': 0.7645922724023969}, + 'field_1': 636669589, + 'field_2': '', + 'field_3': -1858103537322807465, + 'words': ['vviuffehxh', 'jpquemsx', 'xnoj', '']}, + {'features': {'ekqfnn': 0.6685382939302145, + 'fbryid': 0.7244784428105817, + 'fd': 0.8572519278668735, + 'iaen': 0.7412670573684966, + 'innfcqqbdrpcdn': 0.39528359165136695}, + 'field_1': -1311284713, + 'field_2': 'infejerere', + 'field_3': 5673921375069484569, + 'words': ['ciqu', 'emfruneloqh']}, + {'features': {'fgh': 0.9547116485401502, + 'gqpdtvncno': 0.027038814818686197, + 'm': 0.9576395352199625, + 'ourul': 0.1849234265503661, + 'vhvwhech': 0.41140968300430625}, + 'field_1': 1716247766, + 'field_2': 'gmmfghijngo', + 'field_3': None, + 'words': ['ugwcfecipffmkwi', + 'kttgclwjlk', + 'siejdtrpjkqennx', + 'ixwrpmywtbgiygaoxpwnvuckdygttsssqfrplbyyv', + 'mfsrhne']}, + {'features': {'ffps': 0.02989888991738765, 'fxkhyomw': 0.2963204572188527}, + 'field_1': 101453273, + 'field_2': 'frjaqnrbfspsuw', + 'field_3': None, + 'words': ['jwi', 'rfxlxngyethg']}, + {'features': {'eovoiix': 0.10890846076556715, + 'vsovnbsdhbkydf': 0.09777409545072746}, + 'field_1': -1792425886, + 'field_2': 'pqkawoyw', + 'field_3': None, + 'words': ['xntmmvpbrq', 'uof']}, + {'features': {'qewmpdviapfyjma': 0.8727493942139006}, + 'field_1': -1828393530, + 'field_2': 'nkflrmkxiry', + 'field_3': None, + 'words': ['lgtrtjhpf']}, + {'features': {'qbndce': 0.5459572647413652}, + 'field_1': 1048099453, + 'field_2': 'jsle', + 'field_3': None, + 'words': ['d']}] + +ds = dshape("""var * { + field_1: int32, + field_2: string, + field_3: ?int64, + features: map[string, float64], + words: var * string + }""") + +test_path = into_path('backends', 'tests', 'test_file.avro') + +@pytest.fixture +def avrofile(): + return resource(test_path) + +@pytest.yield_fixture +def temp_output_path(): + with tmpfile('.avro') as fn: + yield fn + +def test_discover(avrofile): + assert discover(avrofile) == ds + +def test_resource_datafile(): + assert isinstance(resource(test_path), AVRO) + +def test_convert_avro_to_dataframe(avrofile): + df = convert(pd.DataFrame, avrofile) + + assert isinstance(df, pd.DataFrame) + + names = ["field_1", "field_2", "field_3", "features", "words"] + expected_output = pd.DataFrame(test_data, columns=names) + assert_frame_equal(df, expected_output) + +def test_convert_avro_to_iterator(avrofile): + itr = convert(Iterator, avrofile) + assert isinstance(itr, Iterator) + assert list(itr) == test_data + +def test_require_schema_for_new_file(): + try: + x = AVRO("doesntexist.avro") + sch = x.schema + assert False, "Previous line should throw an schema.AvroException" + except schema.AvroException: + assert True + except Exception as e: + assert False, "Expected AvroException, got exception of type %s" % str(type(e)) + +def test_append_and_convert_round_trip(temp_output_path): + x = AVRO(temp_output_path, schema=parse(test_schema_str)) + append(x, test_data) + append(x, test_data) + assert convert(list, x) == test_data * 2 diff --git a/odo/backends/tests/test_file.avro b/odo/backends/tests/test_file.avro new file mode 100755 index 00000000..b3ea5d12 Binary files /dev/null and b/odo/backends/tests/test_file.avro differ