diff --git a/packages/vaex-core/vaex/__init__.py b/packages/vaex-core/vaex/__init__.py index fdef9502ef..8d7aacfd09 100644 --- a/packages/vaex-core/vaex/__init__.py +++ b/packages/vaex-core/vaex/__init__.py @@ -507,6 +507,22 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop copy_index=copy_index) +def from_json_arrow(file, read_options=None, parse_options=None): + """Create a DataFrame from a JSON file using Apache Arrow. + + This is a much faster alternative to `pandas.read_json(file, lines=True)`. + The JSON file is read eagerly, and the resulting DataFrame is stored in memory. + + :param str file: Path to the JSON file. + :param read_options: PyArrow JSON read options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html + :param parse_options: PyArrow JSON parse options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html + :return: DataFrame + """ + import vaex.json + ds = vaex.json.DatasetJSON(file, read_options=read_options, parse_options=parse_options) + return vaex.from_dataset(ds) + + @docsubst def from_records(records : List[Dict], array_type="arrow", defaults={}) -> vaex.dataframe.DataFrame: '''Create a dataframe from a list of dict. diff --git a/packages/vaex-core/vaex/column.py b/packages/vaex-core/vaex/column.py index f2ddabe3da..5388647d3e 100644 --- a/packages/vaex-core/vaex/column.py +++ b/packages/vaex-core/vaex/column.py @@ -8,6 +8,7 @@ import pyarrow as pa import vaex +import vaex.encoding import vaex.utils import vaex.cache from .array_types import supported_array_types, supported_arrow_array_types, string_types, is_string_type diff --git a/packages/vaex-core/vaex/csv.py b/packages/vaex-core/vaex/csv.py index 0aeea16d90..f92db080de 100644 --- a/packages/vaex-core/vaex/csv.py +++ b/packages/vaex-core/vaex/csv.py @@ -9,7 +9,7 @@ import vaex.dataset import vaex.file -from vaex.dataset import Dataset, DatasetFile +from vaex.dataset import DatasetFile from .itertools import pmap, pwait, buffer, consume, filter_none from .multithreading import thread_count_default_io, get_main_io_pool @@ -25,7 +25,7 @@ def file_chunks(file, chunk_size, newline_readahead): file_size = file.tell() file.seek(0) begin_offset = 0 - + done = False while not done: # find the next newline boundary @@ -50,7 +50,7 @@ def reader(file_offset=begin_offset, length=end_offset - begin_offset): def file_chunks_mmap(file, chunk_size, newline_readahead): """Bytes chunks, split by chunk_size bytes, on newline boundaries - + Using memory mapping (which avoids a memcpy) """ offset = 0 @@ -67,7 +67,7 @@ def file_chunks_mmap(file, chunk_size, newline_readahead): file_map = mmap.mmap(file.fileno(), file_size, **kwargs) data = memoryview(file_map) - + done = False while not done: # find the next newline boundary @@ -218,7 +218,7 @@ def close(self): def _chunk_producer(self, columns, chunk_size=None, reverse=False, start=0, end=None): pool = get_main_io_pool() - + first = True previous = None for i, reader in enumerate(file_chunks_mmap(self.path, self.chunk_size, self.newline_readahead)): @@ -287,7 +287,7 @@ def chunk_reader(reader=reader, first=first, previous=previous, fragment_info=fr # we only need to cut off a piece of the end length = end - row_start table = table.slice(0, length) - + # table = table.combine_chunks() assert len(table) chunks = dict(zip(table.column_names, table.columns)) diff --git a/packages/vaex-core/vaex/dataframe.py b/packages/vaex-core/vaex/dataframe.py index 9ee8a70211..65a3fcc533 100644 --- a/packages/vaex-core/vaex/dataframe.py +++ b/packages/vaex-core/vaex/dataframe.py @@ -7027,7 +7027,7 @@ def export_csv_pandas(self, path, progress=None, chunk_size=default_chunk_size, return @docsubst - def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None): + def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None, backend='pandas', lines=False): """ Exports the DataFrame to a CSV file. :param to: filename or file object @@ -7036,36 +7036,50 @@ def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel :param parallel: {evaluate_parallel} :param fs_options: {fs_options} :param fs: {fs} + :param backend: Which backend to use for writting the JSON file. Can be "pandas" or "json". + :param lines: If True, each row is written as a separate JSON record. If False, dataframe is written as a list of JSON records. :return: """ + json = None # we may want to pass the module as parameter to use a faster library import json as json_std json = json or json_std - # not sure if we want to use pandas, it will treat datetime for us, but will convert null to nan - use_pandas = True - # we take on the '[' and ']' from each chunk, and insert it back ourselves # and we also need to but ',' between each chunk with vaex.progress.tree(progress, title="export(json)"), vaex.file.open(path=to, mode='wb', fs_options=fs_options, fs=fs) as f: - f.write(b"[") + if not lines: + f.write(b"[") first = True - if use_pandas: + if backend == 'pandas': for _i1, _i2, df in self.to_pandas_df(chunk_size=chunk_size, parallel=parallel): if not first: - f.write(b", ") + if not lines: + f.write(b", ") first = False f_temp = io.StringIO() - df.to_json(f_temp, orient='records') - f.write(f_temp.getvalue()[1:-1].encode('utf8')) - else: + df.to_json(f_temp, orient='records', lines=lines) + if lines: + f.write(f_temp.getvalue().encode('utf8')) + else: + f.write(f_temp.getvalue()[1:-1].encode('utf8')) + elif backend == 'json': for _i1, _i2, records in self.to_records(chunk_size=chunk_size, parallel=parallel): if not first: - f.write(b", ") + if not lines: + f.write(b", ") first = False raw = json.dumps(records)[1:-1] + if lines: + if not first: + raw = raw.replace('},', '}\n') f.write(raw.encode("utf8")) - f.write(b"]") + f.write('\n'.encode('utf8')) + else: + raise ValueError(f"Unknown backend {backend}, should be 'pandas' or 'json'.") + if not lines: + f.write(b"]") + def _needs_copy(self, column_name): import vaex.file.other diff --git a/packages/vaex-core/vaex/json.py b/packages/vaex-core/vaex/json.py index a6400b4010..5c7a116ae4 100644 --- a/packages/vaex-core/vaex/json.py +++ b/packages/vaex-core/vaex/json.py @@ -4,8 +4,11 @@ import numpy as np import pyarrow as pa +from frozendict import frozendict + import vaex -from vaex.encoding import Encoding +import vaex.dataset +import vaex.encoding serializers = [] @@ -208,9 +211,58 @@ def default(self, obj): class VaexJsonDecoder(json.JSONDecoder): def __init__(self, *args, **kwargs): json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) - + def object_hook(self, dct): for serializer in serializers: if serializer.can_decode(dct): return serializer.decode(dct) return dct + + +@vaex.dataset.register +class DatasetJSON(vaex.dataset.DatasetFile): + snake_name = "arrow-json" + + def __init__(self, path, read_options=None, parse_options=None, fs=None, fs_options={}): + super(DatasetJSON, self).__init__(path, fs=fs, fs_options=fs_options) + self.read_options = read_options + self.parse_options = parse_options + self._read_file() + + @property + def _fingerprint(self): + fp = vaex.file.fingerprint(self.path, fs_options=self.fs_options, fs=self.fs) + return f"dataset-{self.snake_name}-{fp}" + + def _read_file(self): + import pyarrow.json + + with vaex.file.open(self.path, fs=self.fs, fs_options=self.fs_options, for_arrow=True) as f: + try: + codec = pa.Codec.detect(self.path) + except Exception: + codec = None + if codec: + f = pa.CompressedInputStream(f, codec.name) + self._arrow_table = pyarrow.json.read_json(f, read_options=self.read_options, parse_options=self.parse_options) + self._columns = dict(zip(self._arrow_table.schema.names, self._arrow_table.columns)) + self._set_row_count() + self._ids = frozendict({name: vaex.cache.fingerprint(self._fingerprint, name) for name in self._columns}) + + def _encode(self, encoding): + spec = super()._encode(encoding) + del spec["write"] + return spec + + def __getstate__(self): + state = super().__getstate__() + state["read_options"] = self.read_options + state["parse_options"] = self.parse_options + return state + + def __setstate__(self, state): + super().__setstate__(state) + self._read_file() + + def close(self): + pass diff --git a/packages/vaex-core/vaex/utils.py b/packages/vaex-core/vaex/utils.py index 165394e820..d45ccf9a6d 100644 --- a/packages/vaex-core/vaex/utils.py +++ b/packages/vaex-core/vaex/utils.py @@ -1,12 +1,10 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import import ast -import collections import concurrent.futures import contextlib import functools import json -import math import os import platform import re @@ -27,7 +25,6 @@ import six import yaml -from .json import VaexJsonEncoder, VaexJsonDecoder import vaex.file @@ -296,6 +293,7 @@ def yaml_load(f): def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True): + import vaex.json file, path = vaex.file.file_and_path(file, mode='w', fs_options=fs_options, fs=fs) try: if path: @@ -303,7 +301,7 @@ def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True): else: ext = '.json' # default if ext == ".json": - json.dump(data, file, indent=2, cls=VaexJsonEncoder if old_style else None) + json.dump(data, file, indent=2, cls=vaex.json.VaexJsonEncoder if old_style else None) elif ext == ".yaml": yaml_dump(file, data) else: @@ -313,6 +311,7 @@ def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True): def read_json_or_yaml(file, fs_options={}, fs=None, old_style=True): + import vaex.json file, path = vaex.file.file_and_path(file, fs_options=fs_options, fs=fs) try: if path: @@ -320,7 +319,7 @@ def read_json_or_yaml(file, fs_options={}, fs=None, old_style=True): else: ext = '.json' # default if ext == ".json": - return json.load(file, cls=VaexJsonDecoder if old_style else None) or {} + return json.load(file, cls=vaex.json.VaexJsonDecoder if old_style else None) or {} elif ext == ".yaml": return yaml_load(file) or {} else: diff --git a/tests/from_json_test.py b/tests/from_json_test.py index acba051c97..309e8e6a75 100644 --- a/tests/from_json_test.py +++ b/tests/from_json_test.py @@ -1,6 +1,8 @@ from common import * import tempfile +import vaex + def test_from_json(ds_local): df = ds_local @@ -18,3 +20,30 @@ def test_from_json(ds_local): assert tmp_df.x.tolist() == df.x.tolist() assert tmp_df.bool.tolist() == df.bool.tolist() + +@pytest.mark.parametrize("backend", ["pandas", "json"]) +@pytest.mark.parametrize("lines", [False, True]) +def test_from_and_export_json(tmpdir, ds_local, backend, lines): + df = ds_local + df = df.drop(columns=['datetime']) + if 'timedelta' in df: + df = df.drop(columns=['timedelta']) + if 'obj' in df: + df = df.drop(columns=['obj']) + + # Create temporary json files + tmp = str(tmpdir.join('test.json')) + df.export_json(tmp, backend=backend, lines=lines) + + # Check if file can be read with default (pandas) backend + df_read = vaex.from_json(tmp, lines=lines) + assert df.shape == df_read.shape + assert df.x.tolist() == df_read.x.tolist() + assert df.get_column_names() == df_read.get_column_names() + + # If lines is True, check if the file can be read with the from_json_arrow function + if lines: + df_read_arrow = vaex.from_json_arrow(tmp) + assert df.shape == df_read_arrow.shape + assert df.x.tolist() == df_read_arrow.x.tolist() + assert df.get_column_names() == df_read_arrow.get_column_names()