diff --git a/setup.py b/setup.py index 175244b..28e22a6 100644 --- a/setup.py +++ b/setup.py @@ -42,17 +42,16 @@ ), ], install_requires=[ - 'datashape', 'numpy', 'pandas', 'sqlalchemy', 'psycopg2', - 'odo', 'toolz', - 'networkx<=1.11', ], extras_require={ 'dev': [ + 'odo', + 'networkx<=1.11', 'flake8==3.3.0', 'pycodestyle==2.3.1', 'pyflakes==1.5.0', diff --git a/warp_prism/__init__.py b/warp_prism/__init__.py index 7a560a5..52e8fcf 100644 --- a/warp_prism/__init__.py +++ b/warp_prism/__init__.py @@ -1,13 +1,11 @@ from io import BytesIO +import numbers -from datashape import discover -from datashape.predicates import istabular import numpy as np -from odo import convert import pandas as pd import sqlalchemy as sa +from sqlalchemy.dialects import postgresql as _postgresql from sqlalchemy.ext.compiler import compiles -from toolz import keymap from ._warp_prism import ( raw_to_arrays as _raw_to_arrays, @@ -18,7 +16,7 @@ __version__ = '0.1.1' -_typeid_map = keymap(np.dtype, _raw_typeid_map) +_typeid_map = {np.dtype(k): v for k, v in _raw_typeid_map.items()} _object_type_id = _raw_typeid_map['object'] @@ -66,14 +64,107 @@ def _compile_copy_to_binary_postgres(element, compiler, **kwargs): ) +types = {np.dtype(k): v for k, v in { + 'i8': sa.BigInteger, + 'i4': sa.Integer, + 'i2': sa.SmallInteger, + 'f4': sa.REAL, + 'f8': sa.FLOAT, + 'O': sa.Text, + 'M8[D]': sa.Date, + 'M8[us]': sa.DateTime, + '?': sa.Boolean, + "m8[D]": sa.Interval(second_precision=0, day_precision=9), + "m8[h]": sa.Interval(second_precision=0, day_precision=0), + "m8[m]": sa.Interval(second_precision=0, day_precision=0), + "m8[s]": sa.Interval(second_precision=0, day_precision=0), + "m8[ms]": sa.Interval(second_precision=3, day_precision=0), + "m8[us]": sa.Interval(second_precision=6, day_precision=0), + "m8[ns]": sa.Interval(second_precision=9, day_precision=0), +}.items()} + +_revtypes = dict(map(reversed, types.items())) +_revtypes.update({ + sa.DATETIME: np.dtype('M8[us]'), + sa.TIMESTAMP: np.dtype('M8[us]'), + sa.FLOAT: np.dtype('f8'), + sa.DATE: np.dtype('M8[D]'), + sa.BIGINT: np.dtype('i8'), + sa.INTEGER: np.dtype('i4'), + sa.BIGINT: np.dtype('i8'), + sa.types.NullType: np.dtype('O'), + sa.REAL: np.dtype('f4'), + sa.Float: np.dtype('f8'), +}) + +_precision_types = { + sa.Float, + _postgresql.base.DOUBLE_PRECISION, +} + + +def _precision_to_dtype(precision): + if isinstance(precision, numbers.Integral): + if 1 <= precision <= 24: + return np.dtype('f4') + elif 25 <= precision <= 53: + return np.dtype('f8') + raise ValueError('%s is not a supported precision' % precision) + + +_units_of_power = { + 0: 's', + 3: 'ms', + 6: 'us', + 9: 'ns' +} + + +def _discover_type(type_): + if isinstance(type_, sa.Interval): + if type_.second_precision is None and type_.day_precision is None: + return np.dtype('m8[us]') + elif type_.second_precision == 0 and type_.day_precision == 0: + return np.dtype('m8[s]') + + if (type_.second_precision in _units_of_power and + not type_.day_precision): + unit = _units_of_power[type_.second_precision] + elif type_.day_precision > 0: + unit = 'D' + else: + raise ValueError( + 'Cannot infer INTERVAL type_e with parameters' + 'second_precision=%d, day_precision=%d' % + (type_.second_precision, type_.day_precision), + ) + return np.dtype('m8[%s]' % unit) + if type(type_) in _precision_types and type_.precision is not None: + return _precision_to_dtype(type_.precision) + if type_ in _revtypes: + return _revtypes[type_] + if type(type_) in _revtypes: + return _revtypes[type(type_)] + if isinstance(type_, sa.Numeric): + raise ValueError('Cannot adapt numeric type to numpy dtype') + if isinstance(type_, (sa.String, sa.Unicode)): + return np.dtype('O') + else: + for k, v in _revtypes.items(): + if isinstance(k, type) and (isinstance(type_, k) or + hasattr(type_, 'impl') and + isinstance(type_.impl, k)): + return v + if k == type_: + return v + raise NotImplementedError('No SQL-numpy match for type %s' % type_) + + def _warp_prism_types(query): - for name, dtype in discover(query).measure.fields: + for col in query.columns: + dtype = _discover_type(col.type) try: - np_dtype = getattr(dtype, 'ty', dtype).to_numpy_dtype() - if np_dtype.kind == 'U': - yield _object_type_id - else: - yield _typeid_map[np_dtype] + yield _typeid_map[dtype] except KeyError: raise TypeError( 'warp_prism cannot query columns of type %s' % dtype, @@ -136,7 +227,7 @@ def to_arrays(query, *, bind=None): return {column_names[n]: v for n, v in enumerate(out)} -null_values = keymap(np.dtype, { +null_values = {np.dtype(k): v for k, v in { 'float32': np.nan, 'float64': np.nan, 'int16': np.nan, @@ -145,7 +236,7 @@ def to_arrays(query, *, bind=None): 'bool': np.nan, 'datetime64[ns]': np.datetime64('nat', 'ns'), 'object': None, -}) +}.items()} # alias because ``to_dataframe`` shadows this name _default_null_values_for_type = null_values @@ -216,6 +307,9 @@ def register_odo_dataframe_edge(): If the selectable is not in a postgres database, it will fallback to the default odo edge. """ + from odo import convert + from datashape.predicates import istabular + # estimating 8 times faster df_cost = convert.graph.edge[sa.sql.Select][pd.DataFrame]['cost'] / 8 diff --git a/warp_prism/tests/bench.py b/warp_prism/tests/bench.py new file mode 100644 index 0000000..db6d10d --- /dev/null +++ b/warp_prism/tests/bench.py @@ -0,0 +1,93 @@ +import numpy as np +from odo import odo +import pandas as pd +from pandas import read_sql +import perf + +from warp_prism import to_arrays, to_dataframe +from warp_prism.tests import tmp_db_uri + + +def setup(setup): + """Mark that a test needs a benchmark needs a setup function to prepare the + inputes. + """ + def dec(f): + f._setup = setup + return f + + return dec + + +def setup_largefloat(table_uri): + return odo( + pd.DataFrame({ + 'a': np.random.rand(1000000), + 'b': np.random.rand(1000000)}, + ), + table_uri, + ) + + +@setup(setup_largefloat) +def bench_largefloat_warp_prism_to_arrays(table): + counter = perf.perf_counter + start = counter() + _ = to_arrays(table) # noqa + return counter() - start + + +@setup(setup_largefloat) +def bench_largefloat_warp_prism_to_dataframe(table): + counter = perf.perf_counter + start = counter() + _ = to_dataframe(table) # noqa + return counter() - start + + +@setup(setup_largefloat) +def bench_largefloat_odo_to_dataframe(table): + counter = perf.perf_counter + start = counter() + _ = odo(table, pd.DataFrame) # noqa + return counter() - start + + +@setup(setup_largefloat) +def bench_largefloat_pandas_to_dataframe(table): + counter = perf.perf_counter + start = counter() + _ = read_sql(table) # noqa + return counter() - start + + +def main(): + from traceback import print_exc + + def wrapper(wrapped): + def bench(loops, *args): + return wrapped(*args) + + return bench + + with tmp_db_uri() as db_uri: + for k, v in globals().items(): + if k.startswith('bench_'): + runner = perf.Runner() + runner.parse_args() + table_uri = '::'.join((db_uri, k)) + setup = getattr(v, '_setup', lambda table_uri: table_uri) + try: + print('%s: ' % k, end='') + print(runner.bench_sample_func( + k, + wrapper(v), + setup(table_uri), + ).format()) + except Exception: + print('%s: error' % k) + print_exc() + + +if __name__ == '__main__': + main()