Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load_as methods for pyarrow dataset and table #240

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions examples/python/quickstart_pyarrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Copyright (2021) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os

import delta_sharing

# Point to the profile file. It can be a file on the local file system or a file on a remote storage.
profile_file = os.path.dirname(__file__) + "/../open-datasets.share"

# Create a SharingClient.
client = delta_sharing.SharingClient(profile_file)

# List all shared tables.
print("########### All Available Tables #############")
print(client.list_all_tables())

# Create a url to access a shared table.
# A table path is the profile file path following with `#` and the fully qualified name of a table (`<share-name>.<schema-name>.<table-name>`).
table_url = profile_file + "#delta_sharing.default.owid-covid-data"

# As PyArrow Dataset.

# Create a lazy reference for delta sharing table as a PyArrow Dataset.
print(
"########### Create a lazy reference for delta_sharing.default.owid-covid-data as a PyArrow Dataset #############")
data = delta_sharing.load_as_pyarrow_dataset(table_url)
print(data.schema)

# Create a lazy reference for delta sharing table as a PyArrow Dataset with additional pyarrow options.
print(
"########### Create a lazy reference for delta_sharing.default.owid-covid-data as a PyArrow Dataset with additional options #############")
delta_sharing.load_as_pyarrow_dataset(table_url, pyarrow_ds_options={"partitioning": "hive"})

# As PyArrow Table.

# Fetch 10 rows from a delta sharing table as a PyArrow Table. This can be used to read sample data from a table that cannot fit in the memory.
print("########### Loading 10 rows from delta_sharing.default.owid-covid-data as a PyArrow Table #############")
data = delta_sharing.load_as_pyarrow_table(table_url, limit=10)

# Print the sample.
print("########### Show the number of fetched rows #############")
print(data.num_rows)

# Load full table as a PyArrow Table. This can be used to process tables that can fit in the memory.
print("########### Loading all data from delta_sharing.default.owid-covid-data as a PyArrow Table #############")
data = delta_sharing.load_as_pyarrow_table(table_url)
print(data.num_rows)

# Load full table as a PyArrow Table with additional pyarrow options.
print(
"########### Loading all data from delta_sharing.default.owid-covid-data as a PyArrow Table with additional pyarrow options #############")
import pyarrow.dataset as ds

data = delta_sharing.load_as_pyarrow_table(
table_url,
pyarrow_ds_options={"exclude_invalid_files": False},
pyarrow_tbl_options={
"columns": ["date", "location", "total_cases"],
"filter": ds.field("date") == "2020-02-24"
}
)
print(data)

# Convert PyArrow Table to Pandas DataFrame.
print("########### Converting delta_sharing.default.owid-covid-data PyArrow Table to a Pandas DataFrame #############")
pdf = data.to_pandas()
print(pdf)
10 changes: 9 additions & 1 deletion python/delta_sharing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
# limitations under the License.
#

from delta_sharing.delta_sharing import SharingClient, load_as_pandas, load_as_spark
from delta_sharing.delta_sharing import (
SharingClient,
load_as_pandas,
load_as_spark,
load_as_pyarrow_dataset,
load_as_pyarrow_table,
)
from delta_sharing.delta_sharing import load_table_changes_as_pandas, load_table_changes_as_spark
from delta_sharing.protocol import Share, Schema, Table
from delta_sharing.version import __version__
Expand All @@ -27,6 +33,8 @@
"Table",
"load_as_pandas",
"load_as_spark",
"load_as_pyarrow_dataset",
"load_as_pyarrow_table",
"load_table_changes_as_pandas",
"load_table_changes_as_spark",
"__version__",
Expand Down
157 changes: 156 additions & 1 deletion python/delta_sharing/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import Schema as PyArrowSchema
from pyarrow import Table as PyArrowTable
from pyarrow.dataset import Dataset as PyArrowDataset
from pyarrow.dataset import dataset


def _get_dummy_column(schema_type):
Expand Down Expand Up @@ -57,7 +62,157 @@ def _get_dummy_column(schema_type):
raise ValueError(f"Could not parse datatype: {schema_type}")


def get_empty_table(schema_json: dict) -> pd.DataFrame:
def _to_pyarrow_schema(schema_value: dict) -> PyArrowSchema:
"""
Converts delta table schema to a valid PyArrow Schema with
field metadata and nullability preserved.
This implementation closely follows PySpark and supports columns
with complex data types like `struct`, `map` and `array`.

:param schema_value: Representation of delta table schema as a dict
:return: pyarrow.Schema
"""
assert schema_value is not None and schema_value["type"] == "struct", "Invalid schema found."

def _to_arrow_type(f_type) -> pa.DataType:
is_nested = isinstance(f_type, dict)

if not is_nested:
f_type = f_type.lower()

if not is_nested and f_type == "boolean":
return pa.bool_()

elif not is_nested and f_type == "byte":
return pa.int8()

elif not is_nested and f_type == "short":
return pa.int16()

elif not is_nested and f_type == "integer":
return pa.int32()

elif not is_nested and f_type == "long":
return pa.int64()

elif not is_nested and f_type == "float":
return pa.float32()

elif not is_nested and f_type == "double":
return pa.float64()

elif not is_nested and f_type.startswith("decimal"):
try:
import re

decimal_pattern = re.compile(r"(\([^\)]+\))")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment with examples that this pattern could handle and not?

res = [
int(v.strip())
for v in decimal_pattern.findall(f_type)[0].lstrip("(").rstrip(")").split(",")
]
precision = res[0]
scale = res[1]
except:
precision = 10
scale = 0
return pa.decimal128(precision, scale)

elif not is_nested and f_type == "string":
return pa.string()

elif not is_nested and f_type == "binary":
return pa.binary()

elif not is_nested and f_type == "date":
return pa.date32()

elif not is_nested and f_type == "timestamp":
return pa.timestamp("us")

elif not is_nested and f_type in ["void", "null"]:
return pa.null()

elif is_nested and f_type["type"] == "array":
element_type = f_type["elementType"]
if isinstance(element_type, str) and element_type == "timestamp":
raise TypeError(
f"Could not parse map field types: array<timestamp> to PyArrow type."
)
elif isinstance(element_type, dict) and element_type["type"] == "struct":
if any(
isinstance(struct_field["type"], dict)
and struct_field["type"]["type"] == "struct"
for struct_field in element_type["fields"]
):
raise TypeError("Nested StructType cannot be converted to PyArrow type.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Double Nested cannot ..."?

return pa.list_(_to_arrow_type(element_type))

elif is_nested and f_type["type"] == "map":
key_type = f_type["keyType"]
value_type = f_type["valueType"]
if (isinstance(key_type, str) and key_type == "timestamp") or (
isinstance(value_type, str) and value_type == "timestamp"
):
raise TypeError(
f"Could not parse map field with timestamp key or value types to PyArrow type."
)
elif (isinstance(key_type, dict) and key_type["type"] == "struct") or (
isinstance(value_type, dict) and value_type["type"] == "struct"
):
raise TypeError(
f"Could not parse map field with struct key or value types to PyArrow type."
)
return pa.map_(_to_arrow_type(key_type), _to_arrow_type(value_type))

elif is_nested and f_type["type"] == "struct":
if any(
isinstance(struct_field["type"], dict) and struct_field["type"]["type"] == "struct"
for struct_field in f_type["fields"]
):
raise TypeError("Nested StructType cannot be converted to PyArrow type.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double nested?

struct_fields = [
pa.field(
struct_field["name"],
_to_arrow_type(struct_field["type"]),
nullable=(str(struct_field["nullable"]).lower() == "true"),
metadata=struct_field["metadata"],
)
for struct_field in f_type["fields"]
]
return pa.struct(struct_fields)
else:
raise TypeError(f"Could not parse type: {f_type} to PyArrow type.")

fields = []
for field in schema_value["fields"]:
field_name = str(field["name"])
field_type = field["type"]
field_nullable = str(field["nullable"]).lower() == "true"
field_metadata = field["metadata"]

fields.append(
pa.field(
field_name,
_to_arrow_type(field_type),
nullable=field_nullable,
metadata=field_metadata,
)
)

return pa.schema(fields)


def get_empty_pyarrow_table(schema_json: dict) -> PyArrowTable:
schema = _to_pyarrow_schema(schema_json)
return schema.empty_table()


def get_empty_pyarrow_dataset(schema_json: dict) -> PyArrowDataset:
schema = _to_pyarrow_schema(schema_json)
return dataset([], schema=schema)


def get_empty_pandas_table(schema_json: dict) -> pd.DataFrame:
"""
For empty tables, we use dummy columns from `_get_dummy_column` and then
drop all rows to generate a table with the correct column names and
Expand Down
85 changes: 84 additions & 1 deletion python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#
from itertools import chain
from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union
from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union, Any, Mapping
from pathlib import Path

import pandas as pd
Expand All @@ -26,6 +26,12 @@
except ImportError:
pass

try:
from pyarrow.dataset import Dataset as PyArrowDataset
from pyarrow import Table as PyArrowTable
except ImportError:
pass

from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table
from delta_sharing.reader import DeltaSharingReader
from delta_sharing.rest_client import DataSharingRestClient
Expand Down Expand Up @@ -65,6 +71,8 @@ def load_as_pandas(
Use this optional parameter to explore the shared table without loading the entire table to
the memory.
:param version: an optional non-negative int. Load the snapshot of table at version
:param timestamp: an optional string. Load the snapshot of table at version corresponding
to the timestamp.
:return: A pandas DataFrame representing the shared table.
"""
profile_json, share, schema, table = _parse_url(url)
Expand Down Expand Up @@ -112,6 +120,81 @@ def load_as_spark(
return df.load(url)


def load_as_pyarrow_table(
url: str,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
pyarrow_ds_options: Optional[Mapping[str, Any]] = None,
pyarrow_tbl_options: Optional[Mapping[str, Any]] = None,
) -> PyArrowTable:
"""
Load the shared table using the given url as a Pyarrow Table.

:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:param limit: a non-negative int. Load only the ``limit`` rows if the parameter is specified.
Use this optional parameter to explore the shared table without loading the entire table to
the memory.
:param version: an optional non-negative int. Load the snapshot of table at version
:param timestamp: an optional string. Load the snapshot of table at version corresponding
to the timestamp.
:param pyarrow_ds_options: Keyword arguments while creating the pyarrow dataset.
:param pyarrow_tbl_options: Keyword arguments passed to `to_table()`
:return: A Pyarrow Table representing the shared table.
"""
try:
from pyarrow import Table as PyArrowTable
except ImportError:
raise ImportError("Unable to import pyarrow. `load_as_pyarrow_table` requires pyarrow.")

profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)

pa_table: PyArrowTable = DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
limit=limit,
version=version,
timestamp=timestamp,
).to_pyarrow_table(pyarrow_ds_options, pyarrow_tbl_options)

return pa_table


def load_as_pyarrow_dataset(
url: str,
version: Optional[int] = None,
timestamp: Optional[str] = None,
pyarrow_ds_options: Optional[Mapping[str, Any]] = None,
) -> PyArrowDataset:
"""
Load the shared table using the given url as a Pyarrow Dataset.

:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:param version: an optional non-negative int. Load the snapshot of table at version
:param timestamp: an optional string. Load the snapshot of table at version corresponding
to the timestamp.
:param pyarrow_ds_options: Keyword arguments while creating the pyarrow dataset.
:return: A Pyarrow Dataset representing the shared table.
"""
try:
from pyarrow.dataset import Dataset as PyArrowDataset
except ImportError:
raise ImportError("Unable to import pyarrow. `load_as_pyarrow_dataset` requires pyarrow.")

profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)

ds: PyArrowDataset = DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
version=version,
timestamp=timestamp,
).to_pyarrow_dataset(pyarrow_ds_options=pyarrow_ds_options)

return ds


def load_table_changes_as_spark(
url: str,
starting_version: Optional[int] = None,
Expand Down
Loading