Skip to content

Commit

Permalink
feat(core): Add read_json_arrow for fast json records reading. Suppor…
Browse files Browse the repository at this point in the history
…t additional json export backends.
  • Loading branch information
JovanVeljanoski committed Sep 30, 2022
1 parent af7f39f commit d98e72d
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 25 deletions.
16 changes: 16 additions & 0 deletions packages/vaex-core/vaex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,22 @@ def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, cop
copy_index=copy_index)


def from_json_arrow(file, read_options=None, parse_options=None):
"""Create a DataFrame from a JSON file using Apache Arrow.
This is a much faster alternative to `pandas.read_json(file, lines=True)`.
The JSON file is read eagerly, and the resulting DataFrame is stored in memory.
:param str file: Path to the JSON file.
:param read_options: PyArrow JSON read options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html
:param parse_options: PyArrow JSON parse options, see https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html
:return: DataFrame
"""
import vaex.json
ds = vaex.json.DatasetJSON(file, read_options=read_options, parse_options=parse_options)
return vaex.from_dataset(ds)


@docsubst
def from_records(records : List[Dict], array_type="arrow", defaults={}) -> vaex.dataframe.DataFrame:
'''Create a dataframe from a list of dict.
Expand Down
1 change: 1 addition & 0 deletions packages/vaex-core/vaex/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pyarrow as pa

import vaex
import vaex.encoding
import vaex.utils
import vaex.cache
from .array_types import supported_array_types, supported_arrow_array_types, string_types, is_string_type
Expand Down
12 changes: 6 additions & 6 deletions packages/vaex-core/vaex/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import vaex.dataset
import vaex.file
from vaex.dataset import Dataset, DatasetFile
from vaex.dataset import DatasetFile
from .itertools import pmap, pwait, buffer, consume, filter_none
from .multithreading import thread_count_default_io, get_main_io_pool

Expand All @@ -25,7 +25,7 @@ def file_chunks(file, chunk_size, newline_readahead):
file_size = file.tell()
file.seek(0)
begin_offset = 0

done = False
while not done:
# find the next newline boundary
Expand All @@ -50,7 +50,7 @@ def reader(file_offset=begin_offset, length=end_offset - begin_offset):

def file_chunks_mmap(file, chunk_size, newline_readahead):
"""Bytes chunks, split by chunk_size bytes, on newline boundaries
Using memory mapping (which avoids a memcpy)
"""
offset = 0
Expand All @@ -67,7 +67,7 @@ def file_chunks_mmap(file, chunk_size, newline_readahead):

file_map = mmap.mmap(file.fileno(), file_size, **kwargs)
data = memoryview(file_map)

done = False
while not done:
# find the next newline boundary
Expand Down Expand Up @@ -218,7 +218,7 @@ def close(self):

def _chunk_producer(self, columns, chunk_size=None, reverse=False, start=0, end=None):
pool = get_main_io_pool()

first = True
previous = None
for i, reader in enumerate(file_chunks_mmap(self.path, self.chunk_size, self.newline_readahead)):
Expand Down Expand Up @@ -287,7 +287,7 @@ def chunk_reader(reader=reader, first=first, previous=previous, fragment_info=fr
# we only need to cut off a piece of the end
length = end - row_start
table = table.slice(0, length)

# table = table.combine_chunks()
assert len(table)
chunks = dict(zip(table.column_names, table.columns))
Expand Down
38 changes: 26 additions & 12 deletions packages/vaex-core/vaex/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7027,7 +7027,7 @@ def export_csv_pandas(self, path, progress=None, chunk_size=default_chunk_size,
return

@docsubst
def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None):
def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel=True, fs_options=None, fs=None, backend='pandas', lines=False):
""" Exports the DataFrame to a CSV file.
:param to: filename or file object
Expand All @@ -7036,36 +7036,50 @@ def export_json(self, to, progress=None, chunk_size=default_chunk_size, parallel
:param parallel: {evaluate_parallel}
:param fs_options: {fs_options}
:param fs: {fs}
:param backend: Which backend to use for writting the JSON file. Can be "pandas" or "json".
:param lines: If True, each row is written as a separate JSON record. If False, dataframe is written as a list of JSON records.
:return:
"""

json = None # we may want to pass the module as parameter to use a faster library
import json as json_std
json = json or json_std

# not sure if we want to use pandas, it will treat datetime for us, but will convert null to nan
use_pandas = True

# we take on the '[' and ']' from each chunk, and insert it back ourselves
# and we also need to but ',' between each chunk
with vaex.progress.tree(progress, title="export(json)"), vaex.file.open(path=to, mode='wb', fs_options=fs_options, fs=fs) as f:
f.write(b"[")
if not lines:
f.write(b"[")
first = True
if use_pandas:
if backend == 'pandas':
for _i1, _i2, df in self.to_pandas_df(chunk_size=chunk_size, parallel=parallel):
if not first:
f.write(b", ")
if not lines:
f.write(b", ")
first = False
f_temp = io.StringIO()
df.to_json(f_temp, orient='records')
f.write(f_temp.getvalue()[1:-1].encode('utf8'))
else:
df.to_json(f_temp, orient='records', lines=lines)
if lines:
f.write(f_temp.getvalue().encode('utf8'))
else:
f.write(f_temp.getvalue()[1:-1].encode('utf8'))
elif backend == 'json':
for _i1, _i2, records in self.to_records(chunk_size=chunk_size, parallel=parallel):
if not first:
f.write(b", ")
if not lines:
f.write(b", ")
first = False
raw = json.dumps(records)[1:-1]
if lines:
if not first:
raw = raw.replace('},', '}\n')
f.write(raw.encode("utf8"))
f.write(b"]")
f.write('\n'.encode('utf8'))
else:
raise ValueError(f"Unknown backend {backend}, should be 'pandas' or 'json'.")
if not lines:
f.write(b"]")


def _needs_copy(self, column_name):
import vaex.file.other
Expand Down
56 changes: 54 additions & 2 deletions packages/vaex-core/vaex/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import numpy as np
import pyarrow as pa

from frozendict import frozendict

import vaex
from vaex.encoding import Encoding
import vaex.dataset
import vaex.encoding


serializers = []
Expand Down Expand Up @@ -208,9 +211,58 @@ def default(self, obj):
class VaexJsonDecoder(json.JSONDecoder):
def __init__(self, *args, **kwargs):
json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

def object_hook(self, dct):
for serializer in serializers:
if serializer.can_decode(dct):
return serializer.decode(dct)
return dct


@vaex.dataset.register
class DatasetJSON(vaex.dataset.DatasetFile):
snake_name = "arrow-json"

def __init__(self, path, read_options=None, parse_options=None, fs=None, fs_options={}):
super(DatasetJSON, self).__init__(path, fs=fs, fs_options=fs_options)
self.read_options = read_options
self.parse_options = parse_options
self._read_file()

@property
def _fingerprint(self):
fp = vaex.file.fingerprint(self.path, fs_options=self.fs_options, fs=self.fs)
return f"dataset-{self.snake_name}-{fp}"

def _read_file(self):
import pyarrow.json

with vaex.file.open(self.path, fs=self.fs, fs_options=self.fs_options, for_arrow=True) as f:
try:
codec = pa.Codec.detect(self.path)
except Exception:
codec = None
if codec:
f = pa.CompressedInputStream(f, codec.name)
self._arrow_table = pyarrow.json.read_json(f, read_options=self.read_options, parse_options=self.parse_options)
self._columns = dict(zip(self._arrow_table.schema.names, self._arrow_table.columns))
self._set_row_count()
self._ids = frozendict({name: vaex.cache.fingerprint(self._fingerprint, name) for name in self._columns})

def _encode(self, encoding):
spec = super()._encode(encoding)
del spec["write"]
return spec

def __getstate__(self):
state = super().__getstate__()
state["read_options"] = self.read_options
state["parse_options"] = self.parse_options
return state

def __setstate__(self, state):
super().__setstate__(state)
self._read_file()

def close(self):
pass
9 changes: 4 additions & 5 deletions packages/vaex-core/vaex/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import ast
import collections
import concurrent.futures
import contextlib
import functools
import json
import math
import os
import platform
import re
Expand All @@ -27,7 +25,6 @@
import six
import yaml

from .json import VaexJsonEncoder, VaexJsonDecoder
import vaex.file


Expand Down Expand Up @@ -296,14 +293,15 @@ def yaml_load(f):


def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True):
import vaex.json
file, path = vaex.file.file_and_path(file, mode='w', fs_options=fs_options, fs=fs)
try:
if path:
base, ext = os.path.splitext(path)
else:
ext = '.json' # default
if ext == ".json":
json.dump(data, file, indent=2, cls=VaexJsonEncoder if old_style else None)
json.dump(data, file, indent=2, cls=vaex.json.VaexJsonEncoder if old_style else None)
elif ext == ".yaml":
yaml_dump(file, data)
else:
Expand All @@ -313,14 +311,15 @@ def write_json_or_yaml(file, data, fs_options={}, fs=None, old_style=True):


def read_json_or_yaml(file, fs_options={}, fs=None, old_style=True):
import vaex.json
file, path = vaex.file.file_and_path(file, fs_options=fs_options, fs=fs)
try:
if path:
base, ext = os.path.splitext(path)
else:
ext = '.json' # default
if ext == ".json":
return json.load(file, cls=VaexJsonDecoder if old_style else None) or {}
return json.load(file, cls=vaex.json.VaexJsonDecoder if old_style else None) or {}
elif ext == ".yaml":
return yaml_load(file) or {}
else:
Expand Down
29 changes: 29 additions & 0 deletions tests/from_json_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from common import *
import tempfile

import vaex


def test_from_json(ds_local):
df = ds_local
Expand All @@ -18,3 +20,30 @@ def test_from_json(ds_local):
assert tmp_df.x.tolist() == df.x.tolist()
assert tmp_df.bool.tolist() == df.bool.tolist()


@pytest.mark.parametrize("backend", ["pandas", "json"])
@pytest.mark.parametrize("lines", [False, True])
def test_from_and_export_json(tmpdir, ds_local, backend, lines):
df = ds_local
df = df.drop(columns=['datetime'])
if 'timedelta' in df:
df = df.drop(columns=['timedelta'])
if 'obj' in df:
df = df.drop(columns=['obj'])

# Create temporary json files
tmp = str(tmpdir.join('test.json'))
df.export_json(tmp, backend=backend, lines=lines)

# Check if file can be read with default (pandas) backend
df_read = vaex.from_json(tmp, lines=lines)
assert df.shape == df_read.shape
assert df.x.tolist() == df_read.x.tolist()
assert df.get_column_names() == df_read.get_column_names()

# If lines is True, check if the file can be read with the from_json_arrow function
if lines:
df_read_arrow = vaex.from_json_arrow(tmp)
assert df.shape == df_read_arrow.shape
assert df.x.tolist() == df_read_arrow.x.tolist()
assert df.get_column_names() == df_read_arrow.get_column_names()

0 comments on commit d98e72d

Please sign in to comment.