Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check directory existence and properly deal with patterned directories #153

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions odo/backends/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,26 @@

import sys
import re
import os
import gzip
import bz2
import uuid
import csv

from contextlib import contextmanager

from glob import glob

import datashape

from datashape import discover, Record, Option
from datashape.predicates import isrecord
from datashape.dispatch import dispatch
from toolz import concat, keyfilter, keymap, merge, valfilter

from toolz import concat, keyfilter, keymap, valfilter, merge

import pandas
import pandas as pd
import os
import gzip
import bz2
import uuid
import csv
from glob import glob

from ..compatibility import unicode, PY2
from ..utils import keywords, ext
Expand All @@ -24,13 +30,16 @@
from ..resource import resource
from ..chunks import chunks
from ..temp import Temp
from ..directory import Directory
from ..numpy_dtype import dshape_to_pandas
from .pandas import coerce_datetimes

dialect_terms = '''delimiter doublequote escapechar lineterminator quotechar
quoting skipinitialspace strict'''.split()

aliases = {'sep': 'delimiter'}
aliases = {
'sep': 'delimiter'
}


def alias(key):
Expand Down Expand Up @@ -273,19 +282,20 @@ def discover_csv(c, nrows=1000, **kwargs):
return datashape.var * measure


@resource.register('.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?')
@resource.register(r'.+\.(csv|tsv|ssv|data|dat)(\.gz|\.bz2?)?', priority=19)
def resource_csv(uri, **kwargs):
return CSV(uri, **kwargs)


@resource.register('.*\*.+', priority=12)
@resource.register(r'.*\*.+', priority=12)
def resource_glob(uri, **kwargs):
filenames = sorted(glob(uri))
r = resource(filenames[0], **kwargs)
return chunks(type(r))([resource(u, **kwargs) for u in sorted(glob(uri))])


@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV))),
@convert.register(chunks(pd.DataFrame), (chunks(CSV), chunks(Temp(CSV)),
Directory(CSV), Directory(Temp(CSV))),
cost=10.0)
def convert_glob_of_csvs_to_chunks_of_dataframes(csvs, **kwargs):
def _():
Expand All @@ -303,7 +313,7 @@ def convert_dataframes_to_temporary_csv(data, **kwargs):

@dispatch(CSV)
def drop(c):
os.unlink(c.path)
os.remove(c.path)


ooc_types.add(CSV)
2 changes: 1 addition & 1 deletion odo/backends/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def SSH(cls):

ssh_pattern = '((?P<username>[a-zA-Z]\w*)@)?(?P<hostname>[\w.-]*)(:(?P<port>\d+))?:(?P<path>[/\w.*-]+)'


@resource.register('ssh://.+', priority=16)
def resource_ssh(uri, **kwargs):
if 'ssh://' in uri:
Expand All @@ -105,7 +106,6 @@ def resource_ssh(uri, **kwargs):
subtype = types_by_extension[path.split('.')[-1]]
if '*' in path:
subtype = Directory(subtype)
path = path.rsplit('/', 1)[0] + '/'
except KeyError:
subtype = type(resource(path))

Expand Down
20 changes: 14 additions & 6 deletions odo/backends/tests/test_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import pytest
paramiko = pytest.importorskip('paramiko')

import pandas as pd
import numpy as np
import getpass
import re
import os
import sys
import socket

import pandas as pd
import numpy as np

from odo.utils import tmpfile, filetext
from odo.directory import _Directory, Directory
Expand All @@ -16,7 +19,6 @@
from odo import into, discover, CSV, JSONLines, JSON, convert
from odo.temp import _Temp, Temp
from odo.compatibility import ON_TRAVIS_CI
import socket

skipif = pytest.mark.skipif

Expand Down Expand Up @@ -52,12 +54,18 @@ def test_connect():


def test_resource_directory():
r = resource('ssh://joe@localhost:/path/to/')
user = getpass.getuser()
r = resource('ssh://%s@localhost:%s/*.json.bz' % (user, os.path.dirname(__file__)))
assert issubclass(r.subtype, _Directory)

r = resource('ssh://joe@localhost:/path/to/*.csv')
with pytest.raises(AssertionError):
resource('ssh://%s@localhost:/path/to/' % user)

r = resource('ssh://%s@localhost:%s/*.csv' %
(user, os.path.dirname(__file__)))
assert r.subtype == Directory(CSV)
assert r.path == '/path/to/'
assert r.path == os.path.dirname(__file__)
assert r.pattern == '*.csv'


def test_discover():
Expand Down
56 changes: 42 additions & 14 deletions odo/directory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import absolute_import, division, print_function

import os
import re

from glob import glob
from .chunks import Chunks
from .resource import resource
from pprint import pformat

from toolz import memoize, first
from datashape import discover, var
import os

from .chunks import Chunks
from .resource import resource


class _Directory(Chunks):
Expand All @@ -14,28 +19,52 @@ class _Directory(Chunks):
For typed containers see the ``Directory`` function which generates
parametrized Directory classes.

Parameters
----------
path : str
An existing directory or glob pattern
kwargs : dict
Additional keyword arguments that will be passed to the call to
resource when iterating over a directory

Examples
--------
>>> from odo import CSV
>>> c = Directory(CSV)('path/to/data/') # doctest: +SKIP
>>> c = Directory(CSV)('path/to/data/') # doctest: +SKIP
>>> c # doctest: +SKIP
Directory(CSV)(path=..., pattern='*')

Normal use through resource strings

>>> r = resource('path/to/data/*.csv') # doctest: +SKIP
Directory(CSV)(path=..., pattern='*.csv')
>>> r = resource('path/to/data/') # doctest: +SKIP


Directory()(path=..., pattern='*')
"""
def __init__(self, path, **kwargs):
self.path = path
path = os.path.abspath(path)
if os.path.isdir(path):
self.pattern = '*'
else:
path, self.pattern = os.path.split(path)
self.path = os.path.abspath(path)
self.kwargs = kwargs

@property
def glob(self):
return os.path.join(self.path, self.pattern)

def __iter__(self):
return (resource(os.path.join(self.path, fn), **self.kwargs)
for fn in sorted(os.listdir(self.path)))
return (resource(fn, **self.kwargs) for fn in sorted(glob(self.glob)))

def __repr__(self):
return '%s(glob=%s)' % (type(self).__name__, self.glob)


def Directory(cls):
""" Parametrized DirectoryClass """
return type('Directory(%s)' % cls.__name__, (_Directory,), {'container': cls})
return type('Directory(%s)' % cls.__name__, (_Directory,),
{'container': cls})

Directory.__doc__ = Directory.__doc__

Expand All @@ -51,15 +80,14 @@ def discover_Directory(c, **kwargs):
return var * discover(first(c)).subshape[0]


@resource.register('.+' + re_path_sep + '\*\..+', priority=15)
@resource.register(r'.+%s.*(?:(?:\[!?.+\])|\?|\*)+.+' % re_path_sep, priority=18)
def resource_directory(uri, **kwargs):
path = uri.rsplit(os.path.sep, 1)[0]
try:
one_uri = first(glob(uri))
except (OSError, StopIteration):
return _Directory(path, **kwargs)
return _Directory(uri, **kwargs)
subtype = type(resource(one_uri, **kwargs))
return Directory(subtype)(path, **kwargs)
return Directory(subtype)(uri, **kwargs)


@resource.register('.+' + re_path_sep, priority=9)
Expand Down
54 changes: 48 additions & 6 deletions odo/tests/test_directory.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
from __future__ import absolute_import, division, print_function

import pytest
import tempfile
from contextlib import contextmanager
import shutil
from odo.backends.csv import CSV
from odo.directory import Directory, discover, resource, _Directory
from odo import into
from odo import odo
from datashape import dshape
from toolz import concat
import os
import pandas as pd


@contextmanager
def csvs(n=3):
assert 0 < n <= 10, \
'number of files must be greater than 0 and less than or equal to 10'
path = tempfile.mktemp()
os.mkdir(path)

fns = [os.path.join(path, 'file_%d.csv' % i) for i in range(n)]

for i, fn in enumerate(fns):
into(fn, [{'a': i, 'b': j} for j in range(5)])
odo([{'a': i, 'b': j} for j in range(5)], fn)

try:
yield path + os.path.sep
Expand All @@ -42,7 +48,43 @@ def test_resource_directory():
assert r2.path.rstrip(os.path.sep) == path.rstrip(os.path.sep)


def test_resource_directory():
assert isinstance(resource(os.path.join('a', 'nonexistent', 'directory') +
os.path.sep),
_Directory)
def test_resource_nonexistent_directory():
with pytest.raises(AssertionError):
resource(os.path.join('a', 'nonexistent', 'directory') + os.path.sep)


@contextmanager
def chdir(newdir):
curdir = os.getcwd()
os.chdir(newdir)

try:
yield os.getcwd()
finally:
os.chdir(curdir)


def test_directory_of_csvs_to_frame():
with csvs() as path:
result = odo(odo(path, pd.DataFrame), set)
expected = odo(concat(odo(fn, list) for fn in resource(path)), set)
assert result == expected


def test_directory_of_csvs_with_cd():
with csvs() as path:
with chdir(path):
result = odo(odo('./*.csv', pd.DataFrame), set)
expected = odo(concat(odo(fn, list) for fn in resource(path)), set)
assert result == expected


def test_directory_of_csvs_with_non_star_glob():
with csvs() as path:
with chdir(path):
glob_pat = '%sfile_?.csv' % path
dir_of_csv = resource(glob_pat)
assert isinstance(dir_of_csv, _Directory)
result = odo(odo(dir_of_csv, pd.DataFrame), set)
expected = odo(concat(odo(fn, list) for fn in resource(path)), set)
assert result == expected