diff --git a/odo/backends/csv.py b/odo/backends/csv.py index c1e49267..06c39541 100644 --- a/odo/backends/csv.py +++ b/odo/backends/csv.py @@ -2,20 +2,26 @@ import sys import re +import os +import gzip +import bz2 +import uuid +import csv + from contextlib import contextmanager + +from glob import glob + import datashape + from datashape import discover, Record, Option from datashape.predicates import isrecord from datashape.dispatch import dispatch -from toolz import concat, keyfilter, keymap, merge, valfilter + +from toolz import concat, keyfilter, keymap, valfilter, merge + import pandas import pandas as pd -import os -import gzip -import bz2 -import uuid -import csv -from glob import glob from ..compatibility import unicode, PY2 from ..utils import keywords, ext @@ -24,13 +30,16 @@ from ..resource import resource from ..chunks import chunks from ..temp import Temp +from ..directory import Directory from ..numpy_dtype import dshape_to_pandas from .pandas import coerce_datetimes dialect_terms = '''delimiter doublequote escapechar lineterminator quotechar quoting skipinitialspace strict'''.split() -aliases = {'sep': 'delimiter'} +aliases = { + 'sep': 'delimiter' +} def alias(key): @@ -273,19 +282,20 @@ def discover_csv(c, nrows=1000, **kwargs): return datashape.var * measure -@resource.register('.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?') +@resource.register(r'.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?', priority=19) def resource_csv(uri, **kwargs): return CSV(uri, **kwargs) -@resource.register('.*\*.+', priority=12) +@resource.register(r'.*\*.+', priority=12) def resource_glob(uri, **kwargs): filenames = sorted(glob(uri)) r = resource(filenames[0], **kwargs) return chunks(type(r))([resource(u, **kwargs) for u in sorted(glob(uri))]) -@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))), +@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV)), + Directory(CSV), Directory(Temp(CSV))), cost=10.0) def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs): def _(): @@ -303,7 +313,7 @@ def convert_dataframes_to_temporary_csv(data, **kwargs): @dispatch(CSV) def drop(c): - os.unlink(c.path) + os.remove(c.path) ooc_types.add(CSV) diff --git a/odo/backends/ssh.py b/odo/backends/ssh.py index 78cc8e0d..f08b2c26 100644 --- a/odo/backends/ssh.py +++ b/odo/backends/ssh.py @@ -90,6 +90,7 @@ def SSH(cls): ssh_pattern = '((?P[a-zA-Z]\w*)@)?(?P[\w.-]*)(:(?P\d+))?:(?P[/\w.*-]+)' + @resource.register('ssh://.+', priority=16) def resource_ssh(uri, **kwargs): if 'ssh://' in uri: @@ -105,7 +106,6 @@ def resource_ssh(uri, **kwargs): subtype = types_by_extension[path.split('.')[-1]] if '*' in path: subtype = Directory(subtype) - path = path.rsplit('/', 1)[0] + '/' except KeyError: subtype = type(resource(path)) diff --git a/odo/backends/tests/test_ssh.py b/odo/backends/tests/test_ssh.py index 7b4c7109..29a43168 100644 --- a/odo/backends/tests/test_ssh.py +++ b/odo/backends/tests/test_ssh.py @@ -3,11 +3,14 @@ import pytest paramiko = pytest.importorskip('paramiko') -import pandas as pd -import numpy as np +import getpass import re import os import sys +import socket + +import pandas as pd +import numpy as np from odo.utils import tmpfile, filetext from odo.directory import _Directory, Directory @@ -16,7 +19,6 @@ from odo import into, discover, CSV, JSONLines, JSON, convert from odo.temp import _Temp, Temp from odo.compatibility import ON_TRAVIS_CI -import socket skipif = pytest.mark.skipif @@ -52,12 +54,18 @@ def test_connect(): def test_resource_directory(): - r = resource('ssh://joe@localhost:/path/to/') + user = getpass.getuser() + r = resource('ssh://%s@localhost:%s/*.json.bz' % (user, os.path.dirname(__file__))) assert issubclass(r.subtype, _Directory) - r = resource('ssh://joe@localhost:/path/to/*.csv') + with pytest.raises(AssertionError): + resource('ssh://%s@localhost:/path/to/' % user) + + r = resource('ssh://%s@localhost:%s/*.csv' % + (user, os.path.dirname(__file__))) assert r.subtype == Directory(CSV) - assert r.path == '/path/to/' + assert r.path == os.path.dirname(__file__) + assert r.pattern == '*.csv' def test_discover(): diff --git a/odo/directory.py b/odo/directory.py index 1479ecee..6462540e 100644 --- a/odo/directory.py +++ b/odo/directory.py @@ -1,11 +1,16 @@ from __future__ import absolute_import, division, print_function +import os +import re + from glob import glob -from .chunks import Chunks -from .resource import resource +from pprint import pformat + from toolz import memoize, first from datashape import discover, var -import os + +from .chunks import Chunks +from .resource import resource class _Directory(Chunks): @@ -14,28 +19,52 @@ class _Directory(Chunks): For typed containers see the ``Directory`` function which generates parametrized Directory classes. + Parameters + ---------- + path : str + An existing directory or glob pattern + kwargs : dict + Additional keyword arguments that will be passed to the call to + resource when iterating over a directory + + Examples + -------- >>> from odo import CSV - >>> c = Directory(CSV)('path/to/data/') # doctest: +SKIP + >>> c = Directory(CSV)('path/to/data/') # doctest: +SKIP + >>> c # doctest: +SKIP + Directory(CSV)(path=..., pattern='*') Normal use through resource strings >>> r = resource('path/to/data/*.csv') # doctest: +SKIP + Directory(CSV)(path=..., pattern='*.csv') >>> r = resource('path/to/data/') # doctest: +SKIP - - + Directory()(path=..., pattern='*') """ def __init__(self, path, **kwargs): - self.path = path + path = os.path.abspath(path) + if os.path.isdir(path): + self.pattern = '*' + else: + path, self.pattern = os.path.split(path) + self.path = os.path.abspath(path) self.kwargs = kwargs + @property + def glob(self): + return os.path.join(self.path, self.pattern) + def __iter__(self): - return (resource(os.path.join(self.path, fn), **self.kwargs) - for fn in sorted(os.listdir(self.path))) + return (resource(fn, **self.kwargs) for fn in sorted(glob(self.glob))) + + def __repr__(self): + return '%s(glob=%s)' % (type(self).__name__, self.glob) def Directory(cls): """ Parametrized DirectoryClass """ - return type('Directory(%s)' % cls.__name__, (_Directory,), {'container': cls}) + return type('Directory(%s)' % cls.__name__, (_Directory,), + {'container': cls}) Directory.__doc__ = Directory.__doc__ @@ -51,15 +80,14 @@ def discover_Directory(c, **kwargs): return var * discover(first(c)).subshape[0] -@resource.register('.+' + re_path_sep + '\*\..+', priority=15) +@resource.register(r'.+%s.*(?:(?:\[!?.+\])|\?|\*)+.+' % re_path_sep, priority=18) def resource_directory(uri, **kwargs): - path = uri.rsplit(os.path.sep, 1)[0] try: one_uri = first(glob(uri)) except (OSError, StopIteration): - return _Directory(path, **kwargs) + return _Directory(uri, **kwargs) subtype = type(resource(one_uri, **kwargs)) - return Directory(subtype)(path, **kwargs) + return Directory(subtype)(uri, **kwargs) @resource.register('.+' + re_path_sep, priority=9) diff --git a/odo/tests/test_directory.py b/odo/tests/test_directory.py index 8b80f228..a81e0ef9 100644 --- a/odo/tests/test_directory.py +++ b/odo/tests/test_directory.py @@ -1,23 +1,29 @@ from __future__ import absolute_import, division, print_function +import pytest import tempfile from contextlib import contextmanager import shutil from odo.backends.csv import CSV from odo.directory import Directory, discover, resource, _Directory -from odo import into +from odo import odo from datashape import dshape +from toolz import concat import os +import pandas as pd + @contextmanager def csvs(n=3): + assert 0 < n <= 10, \ + 'number of files must be greater than 0 and less than or equal to 10' path = tempfile.mktemp() os.mkdir(path) fns = [os.path.join(path, 'file_%d.csv' % i) for i in range(n)] for i, fn in enumerate(fns): - into(fn, [{'a': i, 'b': j} for j in range(5)]) + odo([{'a': i, 'b': j} for j in range(5)], fn) try: yield path + os.path.sep @@ -42,7 +48,43 @@ def test_resource_directory(): assert r2.path.rstrip(os.path.sep) == path.rstrip(os.path.sep) -def test_resource_directory(): - assert isinstance(resource(os.path.join('a', 'nonexistent', 'directory') + - os.path.sep), - _Directory) +def test_resource_nonexistent_directory(): + with pytest.raises(AssertionError): + resource(os.path.join('a', 'nonexistent', 'directory') + os.path.sep) + + +@contextmanager +def chdir(newdir): + curdir = os.getcwd() + os.chdir(newdir) + + try: + yield os.getcwd() + finally: + os.chdir(curdir) + + +def test_directory_of_csvs_to_frame(): + with csvs() as path: + result = odo(odo(path, pd.DataFrame), set) + expected = odo(concat(odo(fn, list) for fn in resource(path)), set) + assert result == expected + + +def test_directory_of_csvs_with_cd(): + with csvs() as path: + with chdir(path): + result = odo(odo('./*.csv', pd.DataFrame), set) + expected = odo(concat(odo(fn, list) for fn in resource(path)), set) + assert result == expected + + +def test_directory_of_csvs_with_non_star_glob(): + with csvs() as path: + with chdir(path): + glob_pat = '%sfile_?.csv' % path + dir_of_csv = resource(glob_pat) + assert isinstance(dir_of_csv, _Directory) + result = odo(odo(dir_of_csv, pd.DataFrame), set) + expected = odo(concat(odo(fn, list) for fn in resource(path)), set) + assert result == expected