Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an Avro backend #386

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
932bdde
Preliminary commit for Avro backend.
Nov 19, 2015
b2091f4
Adding test for avro to iterator
Nov 19, 2015
4cbe99f
Enabling read and write, adding append methods.
Nov 25, 2015
e07cfa7
Adding dependencies to build files
Nov 30, 2015
6cba475
Adding a convert edge from iterator back to avro
Dec 3, 2015
9ded2e1
Ensuring avro backend is registered on import
Dec 3, 2015
d7496ba
Fixing version problem with toolz
Dec 3, 2015
2adb2df
Adding support for array types
Dec 18, 2015
31ed159
Merge branch 'master' into avro
Dec 18, 2015
491c4d1
Attempting to fix travis build
Dec 18, 2015
2eeba0a
Reformatting test data
Dec 18, 2015
471becc
Adding conda selector for avro, since it is not Python 3 compatible
Dec 18, 2015
e05d1e1
Small fixes from code review.
Dec 19, 2015
31821d2
Converting tests to pytest idiom.
Dec 19, 2015
3773348
Changing import order to ensure test skip if avro not installed
Dec 19, 2015
2c0246a
Final small changes requested from code review
Dec 19, 2015
0d35449
Merge branch 'master' into avro
Dec 19, 2015
01b67a4
Use multipledispatch for schema discovery
Dec 22, 2015
a583e55
Bug fix for boolean types
Dec 22, 2015
eb004c7
Cleanup
Dec 29, 2015
58d4e6e
Adding dshape to avro schema generation, with doctest
Dec 30, 2015
64753e2
Bumping datashape version to fix build
Jan 4, 2016
d575ead
Merge branch 'master' into avro
Jan 7, 2016
5c2d082
Attempt to fix build for python3
Jan 29, 2016
78a591f
Whitespace error.
Jan 29, 2016
ee5e6f3
Python 3 support
Jan 29, 2016
ff32e9e
One more python3 bug I missed.
Jan 29, 2016
cbec204
Changing doctest not to be format sensitive.
Jan 29, 2016
4ebbf9b
Merge branch 'master' into avro
Feb 2, 2016
060c2b4
Updating avro type mappings
Feb 2, 2016
32900b9
Changing AVRO.uri attribute to AVRO.path, and allowing codec keyword …
Feb 2, 2016
95724cf
Dropping requirement for schema to be defined for new AVRO resource o…
Feb 2, 2016
76f2a72
Merge branch 'master' into avro
Jun 27, 2016
264fca2
Adding fastavro to meta.yaml
Jun 27, 2016
d9ca7ab
Merge branch 'master' into avro
Oct 8, 2016
1e56c1e
Correcting test failures due to incompatibility with pytest 3.0.3 as …
Oct 8, 2016
fd48794
Accidentally removed avro installation from travis.yml. Fixing
Oct 8, 2016
aba333a
Merge branch 'master' into avro
Jan 23, 2017
b66e7e3
Updating to new requirements tracking structure.
Jan 23, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ install:

# Install various deps
- conda uninstall toolz
- pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3'
- pip install -U toolz sas7bdat psycopg2 dill 'pymongo<3' avro
- pip install --upgrade git+git://github.com/blaze/dask.git#egg=dask-dev[complete]
- if [ -n "$PANDAS_VERSION" ]; then pip install $PANDAS_VERSION; fi

Expand Down
1 change: 1 addition & 0 deletions conda.recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ requirements:

test:
requires:
- avro
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this package isn't python 3 compatible, specify it with a conda selector like this:

- avro  # [py27]

- pytest
- h5py
- pytables >=3.0.0
Expand Down
3 changes: 3 additions & 0 deletions odo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@
from .backends.sparksql import SparkDataFrame
with ignoring(ImportError):
from .backends.url import URL
with ignoring(ImportError):
from .backends.avro import AVRO
with ignoring(ImportError):
from .backends.dask import dask



restart_ordering() # Restart multipledispatch ordering and do ordering


Expand Down
252 changes: 252 additions & 0 deletions odo/backends/avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
from __future__ import absolute_import, division, print_function

import errno
import os
import uuid

from avro import schema, datafile, io
from avro.schema import AvroException
import pandas as pd
from datashape import discover, var, Record, Map, Var, \
Option, null, string, int32, int64, float64, float32, boolean
from collections import Iterator
from ..append import append
from ..convert import convert
from ..resource import resource
from ..temp import Temp

AVRO_TYPE_MAP = {
'string': string,
'int': int32,
'long': int64,
'null': null,
'double': float64,
'float': float32,
'bool': boolean,
'map': Map,
'record': Record,
'array': Var,
}

class AVRO(object):
"""Wrapper object for reading and writing an Avro container file

Parameters
----------

uri : str
uri of avro data

schema : avro.schema.Schema
User specified Avro schema object. Used to decode file or serialize new records to file.
schema is required to create a new Avro file.
If reading or appending to an existing Avro file, the writers_schema embedded in that file
will be used.

codec : str
compression codec. Valid values: 'null', 'deflate', 'snappy'

"""
def __init__(self, uri, schema=None, codec='null', **kwargs):
self._uri = uri
self._schema = schema
self._codec = codec
self._kwargs = kwargs #CURRENTLY UNUSED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove this if it isn't being used, we can always add it back later


if not schema:
sch = self._get_writers_schema()
if sch is None:
raise AvroException("Couldn't extract writers schema from '{0}'. User must provide a valid schema".format(uri))
self._schema = sch

def __iter__(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to make a separate AvroIter class that's returned from iter(avro_obj) to allow multiple iterators at once and isolate the statefulness of iteration in separate objects.

I don't see a straightforward way to do that separation and I'm fine marking that as out-of-scope for this PR to be done in future work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see your point. Managing the state of the iterator was definitely one of the more awkward aspects of implementing this class.

return self.reader

def next(self):
return self.reader.next()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
# Perform a close if there's no exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this case:

with AVRO('path/to/file.avro', sch) as afh:
    for x in afh:
        print(x) 
    raise RuntimeError()

Won't the if type is None check leave our avro file unclosed, and wouldn't the user expect it to be closed even with an exception?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I have no idea why I did it this way, so I'll remove the exception check.

if type is None:
self.reader.close()
self.writer.close()

def _get_writers_schema(self):
"""
Extract writers schema embedded in an existing Avro file.
"""
reader = self.reader
return schema.parse(self.reader.meta['avro.schema']) if reader else None

uri = property(lambda self: self._uri)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to just make self._uri be self.uri and be a public attribute?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with self._codec and self._schema

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uri, schema, and codec feel like properties that should be immutable. If a user changed them, you would need to refresh any open readers and writers. It feels more natural to expect the user to create a new AVRO object rather than reusing an existing one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, no biggie

codec = property(lambda self: self._codec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make these proper data attributes, could you please define a set callable that simply raises AttributeError? See here:

https://docs.python.org/3/howto/descriptor.html#descriptor-protocol

Copy link
Author

@ahasha ahasha Oct 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the property function here achieves what you're describing. Would you like me to add a test verifying it, or refactor it into a more standard method format with the @property decorator? I'll just add the test for now.

schema = property(lambda self: self._schema)

@property
def reader(self):
if hasattr(self, '_reader'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this branch ever be taken? I can't find a place where self._reader is set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, forgot to set self._reader

if hasattr(self, '_writer'):
self.flush()
return self._reader
else:
try:
rec_reader = io.DatumReader(readers_schema=self.schema)

df_reader = datafile.DataFileReader(
open(self.uri, 'rb'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could potentially open a bunch of file handles if self.reader is called a bunch of times ... is that a valid concern?

rec_reader
)

return df_reader
except IOError as exc:
#If file doesn't exist, don't set _reader now.
#Allow for reevaluation later after file has been created.
#Otherwise, reraise exception
if exc.errno != errno.ENOENT:
raise exc
return None

@staticmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason for the staticmethod here and below, rather than private top-level helper functions? IMO we should convert these to module-scoped functions instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, seems like a good idea!

def _get_append_writer(uri, writers_schema=None):
"""
Returns an isntance of avro.datafile.DataFileWriter for appending
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

to an existing avro file at `uri`. Does not take a writers schema,
because avro library requires that writers_schema embedded in existing
file be used for appending.

Parameters
----------

uri : str
uri of avro existing, non-empty avro file

writers_schema : avro.schema.Schema object
If not None, checks that writers_schema in existing file is the same as supplied schema.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere -- could you linewrap the docstring lines where they're over 80ish columns?

Avro does not allow writing records to a container file with multiple writers_schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring for codec

Returns
-------
avro.datafile.DataFileWriter
"""
rec_writer = io.DatumWriter()
df_writer = datafile.DataFileWriter(
open(uri, 'ab+'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ab+ the same as a+b?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so -- I lifted this mode from the avro documentation. I'll change to 'a+b' to be more Pythonic since it doesn't break any tests.

rec_writer
)
#Check for embedded schema to ensure existing file is an avro file.
embedded_schema = schema.parse(df_writer.get_meta('avro.schema'))

#If writers_schema supplied, check for equality with embedded schema.
if writers_schema:
assert embedded_schema == writers_schema, \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something that could bubble up to a user? if so, can we turn this into something other than an AssertionError, maybe a ValueError?

"writers_schema embedded in {uri} differs from user supplied schema for appending."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linewrap.

return df_writer

@staticmethod
def _get_new_writer(uri, sch):
"""
Returns an isntance of avro.datafile.DataFileWriter for writing
to a new avro file at `uri`.

Parameters
----------

uri : str
uri of avro existing, non-empty avro file

sch : avro.schema.Schema object

Returns
-------
avro.datafile.DataFileWriter
"""
rec_writer = io.DatumWriter()
df_writer = datafile.DataFileWriter(
open(uri, 'wb'),
rec_writer,
writers_schema = sch
)
return df_writer

@property
def writer(self):
if hasattr(self, '_writer'):
return self._writer
else:
if os.path.exists(self.uri) and os.path.getsize(self.uri) > 0:
df_writer = self._get_append_writer(self.uri, self.schema)
else:
df_writer = self._get_new_writer(self.uri, self.schema)
self._writer = df_writer
return df_writer

def flush(self):
if hasattr(self, '_writer'):
self._writer.close()
del(self._writer)


@resource.register('.+\.(avro)')
def resource_avro(uri, schema=None, **kwargs):
return AVRO(uri, schema=schema, **kwargs)

def discover_schema(sch):
if isinstance(sch, schema.RecordSchema):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could potentially use multiple dispatch here to avoid the large number of isinstance checks.

return var * Record([(f.name, discover_schema(f.type)) for f in sch.fields])
elif isinstance(sch, schema.UnionSchema):
try:
types = [s.type for s in sch.schemas]
assert "null" in types
types.remove("null")
assert len(types) == 1
return Option(AVRO_TYPE_MAP[types[0]])
except AssertionError:
raise TypeError("odo supports avro UnionSchema only for nullabel fields. "
"Received {0}".format(str([s.type for s in sch.schemas])))
elif isinstance(sch, schema.PrimitiveSchema):
return AVRO_TYPE_MAP[sch.type]
elif isinstance(sch, schema.MapSchema):
return Map(string, discover_schema(sch.values))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the type of all keys in Map types in avro string?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great thanks for clarifying.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment with the link.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's already a link there.

elif isinstance(sch, schema.ArraySchema):
return var * discover_schema(sch.items)
else:
raise Exception(str(type(sch)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a TypeError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And should probably say something like 'Unable to discover avro type %r' % type(sch).__name__


@discover.register(AVRO)
def discover_avro(f, **kwargs):
return discover_schema(f.schema)

@convert.register(pd.DataFrame, AVRO, cost=4.0)
def avro_to_DataFrame(avro, dshape=None, **kwargs):
#XXX:AEH:todo - correct for pandas automated type conversions. e.g. strings containing numbers get cast to numeric.
#XXX:AEH:todo - column with nulls just becomes an "object" column.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding these comments: there is odo.numpy_dtype.dshape_to_numpy and odo.numpy_dtype.dshape_to_pandas, which could be used to help deal with the type conversions here.

df = pd.DataFrame([r for r in avro])
names = [f.name.decode('utf-8') for f in avro.schema.fields]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the decoding necessary here?

df = df[names]
return df

@convert.register(Temp(AVRO), Iterator, cost=1.0)
def convert_iterator_to_temporary_avro(data, schema=None, **kwargs):
fn = '.%s.avro' % uuid.uuid1()
avro = Temp(AVRO)(fn, schema, **kwargs)
return append(avro, data, **kwargs)


@convert.register(Iterator, AVRO, cost=1.0)
def avro_to_iterator(s, **kwargs):
return s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be iter(s)? Right now an AVRO instance is an iterable, but not an iterator.


@append.register(AVRO, Iterator)
def append_iterator_to_avro(tgt_avro, src_itr, **kwargs):
for datum in src_itr:
tgt_avro.writer.append(datum)
tgt_avro.flush()

@append.register(AVRO, object) # anything else
def append_anything_to_iterator(tgt, src, **kwargs):
source_as_iter = convert(Iterator, src, **kwargs)
return append(tgt, source_as_iter, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a trailing newline? No newline at the end makes git unhappy...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

92 changes: 92 additions & 0 deletions odo/backends/tests/test_avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import absolute_import, division, print_function

from avro import datafile, io, schema
from collections import Iterator
import pandas as pd
from pandas.util.testing import assert_frame_equal
from odo.backends.avro import discover, avro_to_DataFrame, avro_to_iterator, resource, AVRO

import unittest
import tempfile

from odo.utils import tmpfile, into_path
from odo import append, convert, resource, dshape

test_schema_str = """
{
"type" : "record",
"namespace" : "dataset",
"name" : "test_dataset",
"fields": [
{"type": "int" , "name": "field_1"},
{"type": "string", "name": "field_2"},
{"default": null, "name": "field_3", "type": ["null", "long"]},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why the first null is unquoted and the second is quoted?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to double check this, but I believe null (unquoted) is a value, whereas "null" (quoted) is a type value. If I did "default": "null", I would get a default value of the string "null".

{ "name": "features", "type": { "type": "map", "values": "double"}},
{ "name": "words", "type": {"type": "array", "items": "string"}}
]
}
"""

test_data = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clean up the formatting here? thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could probably just copypaste the result of this in:

from pprint import pprint
pprint(test_data)

{"field_1":2072373602,"field_2":"mxllbfxk","field_3":-3887990995227229804,"features":{"bhettcdl":0.8581552641969377,"vdqvnqgqbrjtkug":0.4938648291874551,"sgmlbagyfb":0.5796466618955293,"ka":0.9873135485253831},"words":["ciplc","htvixoujptehr","rbeiimkevsn"]},
{"field_1":517434305,"field_2":"frgcnqrocddimu","field_3":None,"features":{"atqqsuttysdrursxlynwcrmfrwcrdxaegfnidvwjxamoj":0.2697279678696263,"kjb":0.8279248178446112,"wqlecjb":0.8241169129373344,"inihhrtnawyopu":0.08511455977126114,"dpjw":0.760489536392584},"words":["ignsrafxpgu","ckg"]},
{"field_1":1925434607,"field_2":"aurlydvgfygmu","field_3":None,"features":{"crslipya":0.1596449079423896,"":0.4304848508533662,"imbfgwnaphh":0.19323554138270294},"words":["rqdpanbbcemg","auurshsxxkp","rdngxdthekt"]},
{"field_1":636669589,"field_2":"","field_3":-1858103537322807465,"features":{"dv":0.9635053430456509,"lhljgywersxjp":0.5289026834129389,"nmtns":0.7645922724023969},"words":["vviuffehxh","jpquemsx","xnoj",""]},
{"field_1":-1311284713,"field_2":"infejerere","field_3":5673921375069484569,"features":{"iaen":0.7412670573684966,"ekqfnn":0.6685382939302145,"innfcqqbdrpcdn":0.39528359165136695,"fd":0.8572519278668735,"fbryid":0.7244784428105817},"words":["ciqu","emfruneloqh"]},
{"field_1":1716247766,"field_2":"gmmfghijngo","field_3":None,"features":{"ourul":0.1849234265503661,"vhvwhech":0.41140968300430625,"m":0.9576395352199625,"fgh":0.9547116485401502,"gqpdtvncno":0.027038814818686197},"words":["ugwcfecipffmkwi","kttgclwjlk","siejdtrpjkqennx","ixwrpmywtbgiygaoxpwnvuckdygttsssqfrplbyyv","mfsrhne"]},
{"field_1":101453273,"field_2":"frjaqnrbfspsuw","field_3":None,"features":{"ffps":0.02989888991738765,"fxkhyomw":0.2963204572188527},"words":["jwi","rfxlxngyethg"]},
{"field_1":-1792425886,"field_2":"pqkawoyw","field_3":None,"features":{"vsovnbsdhbkydf":0.09777409545072746,"eovoiix":0.10890846076556715},"words":["xntmmvpbrq","uof"]},
{"field_1":-1828393530,"field_2":"nkflrmkxiry","field_3":None,"features":{"qewmpdviapfyjma":0.8727493942139006},"words":["lgtrtjhpf"]},
{"field_1":1048099453,"field_2":"jsle","field_3":None,"features":{"qbndce":0.5459572647413652},"words":["d"]},
]

ds = dshape("""var * {
field_1: int32,
field_2: string,
field_3: ?int64,
features: map[string, float64],
words: var * string
}""")

test_path = into_path('backends', 'tests', 'test_file.avro')

class TestAvro(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be a lot of trouble to ask you to write this in the style of pytest? ie, tests are functions and instead of state on classes use fixtures ... here's an example:

import pytest

@pytest.fixture
def avrofile():
    return avrofile


def test_discover(avrofile):
    assert discover(avrofile) == ds


def test_convert_avro_to_iterator(avrofile):
    itr = convert(Iterator, avrofile)
    assert isinstance(itr, Iterator)
    assert list(itr) == test_data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also already have a temporary file context manager that attempts to handle deletions on Windows. Additionally, pytest has a type of fixture called "yield fixture" that is their version of a fixture that requires a tearDown step.

So, your temp_output instance variable would be written like this in pytest:

import pytest
from odo.utils import tmpfile


@pytest.yield_fixture
def temp_output():
    with tmpfile('.avro') as fn:
        with open(fn, 'w+b') as f:
            yield f

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to leave this as is and at some point in the future it'll get converted to pytest style

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll give it a shot.


def setUp(self):
self.avrofile = resource(test_path)
self.temp_output = tempfile.NamedTemporaryFile(delete=False, suffix=".avro")

def tearDown(self):
self.temp_output.unlink(self.temp_output.name)

def test_resource_datafile(self):
self.assertIsInstance(resource(test_path), AVRO)

def test_discover(self):
self.assertEquals(discover(self.avrofile), ds)

def test_convert_avro_to_dataframe(self):
df = convert(pd.DataFrame, self.avrofile)
self.assertIsInstance(df, pd.DataFrame)

names = ["field_1", "field_2", "field_3", "features", "words"]
expected_output = pd.DataFrame(test_data, columns=names)
assert_frame_equal(df, expected_output)

def test_convert_avro_to_iterator(self):
itr = convert(Iterator, self.avrofile)
self.assertIsInstance(itr, Iterator)
self.assertEqual(list(itr), test_data)

def test_require_schema_for_new_file(self):
self.assertRaises(schema.AvroException, AVRO, "doesntexist.avro")

def test_append_and_convert_round_trip(self):
x = AVRO(self.temp_output.name, schema=schema.parse(test_schema_str))
append(x, test_data)
append(x, test_data)
assert convert(list, x) == test_data * 2


if __name__=="__main__":
unittest.main()
Binary file added odo/backends/tests/test_file.avro
Binary file not shown.
1 change: 1 addition & 0 deletions recommended-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sas7bdat
paramiko
pywebhdfs
boto
avro
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
datashape >= 0.4.6
numpy >= 1.7
pandas >= 0.15.0
toolz >= 0.7.2
toolz == 0.7.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for the stricter version here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got an error with toolz==0.7.2 that was fixed by upgrading to 0.7.4. Unfortunately, I didn't document the error message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do >= 0.7.4 then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is toolz >= 0.7.4 better?

multipledispatch >= 0.4.7
networkx