Skip to content

Commit

Permalink
iceberg input connector (#7955)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 1bdcb48831e5c7ebfac82e366e69c469222c8cd8
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Jan 9, 2025
1 parent 5d30c34 commit d29ca65
Show file tree
Hide file tree
Showing 18 changed files with 1,471 additions and 412 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Added
- `pw.io.iceberg.read` method for reading Apache Iceberg tables into Pathway.

### Changed
- **BREAKING**: `pw.io.deltalake.read` now requires explicit specification of primary key fields.

## [0.16.4] - 2025-01-09

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ed25519-dalek = { version = "2.1.1", features = ["serde", "pkcs8"] }
elasticsearch = "8.17.0-alpha.1"
futures = "0.3.31"
glob = "0.3.2"
half = "2.4.1"
hex = "0.4.3"
hyper = { version = "0.14", features = ["server"] }
iceberg = "0.4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Before going into more details about the different connectors and how they work,
<span class="block"><a href="/developers/user-guide/connect/connectors/fs-connector">File System</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/gdrive">Google Drive</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/http">http</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/iceberg">Iceberg</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/jsonlines-connector">JSON Lines</a></span>
<span class="block"><a href="/developers/user-guide/connect/connectors/kafka_connectors">Kafka</a></span>
<span class="block"><a href="/developers/api-docs/pathway-io/nats">NATS</a></span>
Expand Down
103 changes: 80 additions & 23 deletions integration_tests/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@

import pathway as pw
from pathway.internals.parse_graph import G
from pathway.tests.utils import wait_result_with_checker

INPUT_CONTENTS_1 = """{"id": 1, "name": "John"}
{"id": 2, "name": "Jane"}
{"id": 3, "name": "Alice"}
{"id": 4, "name": "Bob"}"""
INPUT_CONTENTS_2 = """{"id": 5, "name": "Peter"}
{"id": 6, "name": "Jake"}
{"id": 7, "name": "Dora"}
{"id": 8, "name": "Barbara"}"""
INPUT_CONTENTS_3 = """{"id": 9, "name": "Anna"}
{"id": 10, "name": "Paul"}
{"id": 11, "name": "Steve"}
{"id": 12, "name": "Sarah"}"""
from pathway.tests.utils import run, wait_result_with_checker

INPUT_CONTENTS_1 = """{"user_id": 1, "name": "John"}
{"user_id": 2, "name": "Jane"}
{"user_id": 3, "name": "Alice"}
{"user_id": 4, "name": "Bob"}"""
INPUT_CONTENTS_2 = """{"user_id": 5, "name": "Peter"}
{"user_id": 6, "name": "Jake"}
{"user_id": 7, "name": "Dora"}
{"user_id": 8, "name": "Barbara"}"""
INPUT_CONTENTS_3 = """{"user_id": 9, "name": "Anna"}
{"user_id": 10, "name": "Paul"}
{"user_id": 11, "name": "Steve"}
{"user_id": 12, "name": "Sarah"}"""

CATALOG_URI = "http://iceberg:8181"
INPUT_CONTENTS = {
Expand Down Expand Up @@ -51,21 +51,78 @@ def __call__(self):
return False


def test_iceberg_read_after_write(tmp_path):
input_path = tmp_path / "input.txt"
output_path = tmp_path / "output.txt"
pstorage_path = tmp_path / "pstorage"
table_name = str(uuid.uuid4())

def run_single_iteration(seq_number: int):
input_path.write_text(INPUT_CONTENTS[seq_number])
name_for_id_from_new_part = {}
for input_line in INPUT_CONTENTS[seq_number].splitlines():
data = json.loads(input_line)
name_for_id_from_new_part[data["user_id"]] = data["name"]

class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str

# Place some data into the Iceberg table
table = pw.io.jsonlines.read(
input_path,
schema=InputSchema,
mode="static",
)
pw.io.iceberg.write(
table,
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=table_name,
)
run()

# Read the data from the Iceberg table via Pathway
G.clear()
table = pw.io.iceberg.read(
catalog_uri=CATALOG_URI,
namespace=["my_database"],
table_name=table_name,
mode="static",
schema=InputSchema,
)
pw.io.jsonlines.write(table, output_path)
persistence_config = pw.persistence.Config(
pw.persistence.Backend.filesystem(pstorage_path),
)
run(persistence_config=persistence_config)

name_for_id = {}
with open(output_path, "r") as f:
for line in f:
data = json.loads(line)
name_for_id[data["user_id"]] = data["name"]
assert name_for_id == name_for_id_from_new_part

for seq_number in range(1, len(INPUT_CONTENTS) + 1):
run_single_iteration(seq_number)


def test_iceberg_several_runs(tmp_path):
input_path = tmp_path / "input.txt"
table_name = str(uuid.uuid4())
all_ids = set()
all_names = set()

def run(seq_number: int):
def run_single_iteration(seq_number: int):
input_path.write_text(INPUT_CONTENTS[seq_number])
for input_line in INPUT_CONTENTS[seq_number].splitlines():
data = json.loads(input_line)
all_ids.add(data["id"])
all_ids.add(data["user_id"])
all_names.add(data["name"])

class InputSchema(pw.Schema):
id: int
user_id: int
name: str

G.clear()
Expand All @@ -80,20 +137,20 @@ class InputSchema(pw.Schema):
namespace=["my_database"],
table_name=table_name,
)
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
run()

iceberg_table_name = f"my_database.{table_name}"
pandas_table = _get_pandas_table(iceberg_table_name)
assert pandas_table.shape == (4 * seq_number, 4)
assert set(pandas_table["id"]) == all_ids
assert set(pandas_table["user_id"]) == all_ids
assert set(pandas_table["name"]) == all_names
assert set(pandas_table["diff"]) == {1}
assert len(set(pandas_table["time"])) == seq_number

# The first run includes the table creation.
# The second and the following runs check that the case with the created table also works correctly.
for seq_number in range(1, len(INPUT_CONTENTS) + 1):
run(seq_number)
run_single_iteration(seq_number)


def test_iceberg_streaming(tmp_path):
Expand All @@ -108,7 +165,7 @@ def stream_inputs():
time.sleep(5)

class InputSchema(pw.Schema):
id: int
user_id: int
name: str

table = pw.io.jsonlines.read(
Expand Down Expand Up @@ -138,13 +195,13 @@ class InputSchema(pw.Schema):
for i in range(1, len(INPUT_CONTENTS) + 1):
for input_line in INPUT_CONTENTS[i].splitlines():
data = json.loads(input_line)
all_ids.add(data["id"])
all_ids.add(data["user_id"])
all_names.add(data["name"])

iceberg_table_name = f"my_database.{table_name}"
pandas_table = _get_pandas_table(iceberg_table_name)
assert pandas_table.shape == (4 * len(INPUT_CONTENTS), 4)
assert set(pandas_table["id"]) == all_ids
assert set(pandas_table["user_id"]) == all_ids
assert set(pandas_table["name"]) == all_names
assert set(pandas_table["diff"]) == {1}
assert len(set(pandas_table["time"])) == len(INPUT_CONTENTS)
11 changes: 10 additions & 1 deletion python/pathway/io/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ def read(
Reads a table from Delta Lake. Currently, local and S3 lakes are supported. The table
doesn't have to be append only, however, the deletion vectors are not supported yet.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with ``pw.column_definition``
function.
Args:
uri: URI of the Delta Lake source that must be read.
schema: Schema of the resulting table.
Expand Down Expand Up @@ -84,7 +88,7 @@ def read(
>>> import pathway as pw
>>> class KVSchema(pw.Schema):
... key: str
... key: str = pw.column_definition(primary_key=True)
... value: str
Then, this table must be written into a Delta Lake storage. In the example, it can
Expand Down Expand Up @@ -123,6 +127,11 @@ def read(
are provided but the path starts with ``s3://`` or ``s3a://``, Pathway will use the
credentials of the currently authenticated user.
"""
if schema.primary_key_columns() is None:
raise ValueError(
"DeltaLake reader requires explicit primary key fields specification"
)

_check_entitlements("deltalake")
prepared_connection_settings = _prepare_connection_settings(s3_connection_settings)

Expand Down
132 changes: 130 additions & 2 deletions python/pathway/io/iceberg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,137 @@
from pathway.internals import api, datasink
from __future__ import annotations

from typing import Any

from pathway.internals import api, datasink, datasource
from pathway.internals._io_helpers import _format_output_value_fields
from pathway.internals.config import _check_entitlements
from pathway.internals.runtime_type_check import check_arg_types
from pathway.internals.schema import Schema
from pathway.internals.table import Table
from pathway.internals.table_io import table_from_datasource
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import internal_connector_mode, read_schema


@check_arg_types
@trace_user_frame
def read(
catalog_uri: str,
namespace: list[str],
table_name: str,
schema: type[Schema],
*,
mode: str = "streaming",
warehouse: str | None = None,
autocommit_duration_ms: int | None = 1500,
persistent_id: str | None = None,
debug_data: Any = None,
) -> Table:
"""
Reads a table from Apache Iceberg. If ran in a streaming mode, the connector tracks
new row additions and old row deletions and reflects them in the table read.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with ``pw.column_definition``
function.
Args:
catalog_uri: URI of the Iceberg REST catalog.
namespace: The name of the namespace containing the table read.
table_name: The name of the table to be read.
schema: Schema of the resulting table.
mode: Denotes how the engine polls the new data from the source. Currently
``"streaming"`` and ``"static"`` are supported. If set to ``"streaming"``
the engine will wait for the updates in the specified lake. It will track
new row additions and reflect these events in the state. On the other hand,
the ``"static"`` mode will only consider the available data and ingest all
of it in one commit. The default value is ``"streaming"``.
warehouse: Optional, path to the Iceberg storage warehouse.
autocommit_duration_ms: The maximum time between two commits. Every
``autocommit_duration_ms`` milliseconds, the updates received by the connector are
committed and pushed into Pathway's computation graph.
persistent_id: (unstable) An identifier, under which the state of the table
will be persisted or ``None``, if there is no need to persist the state of this table.
When a program restarts, it restores the state for all input tables according to what
was saved for their ``persistent_id``. This way it's possible to configure the start of
computations from the moment they were terminated last time.
debug_data: Static data replacing original one when debug mode is active.
Returns:
Table: Table read from the Iceberg source.
Example:
Consider a users data table stored in the Iceberg storage. The table is located in the
``app`` namespace and is named ``users``. The catalog URI is ``http://localhost:8181``.
Below is an example of how to read this table into Pathway.
First, the schema of the table needs to be created. The schema doesn't have to contain
all the columns of the table, you can only specify the ones that are needed for the
computation:
>>> import pathway as pw
>>> class InputSchema(pw.Schema):
... user_id: int = pw.column_definition(primary_key=True)
... name: str
Then, this table must be read from the Iceberg storage.
>>> input_table = pw.io.iceberg.read(
... catalog_uri="http://localhost:8181/",
... namespace=["app"],
... table_name="users",
... schema=InputSchema,
... mode="static",
... )
Don't forget to run your program with ``pw.run`` once you define all necessary
computations. Note that you can also change the mode to ``"streaming"`` if you want
the changes in the table to be reflected in your computational pipeline.
"""

if schema.primary_key_columns() is None:
raise ValueError(
"Iceberg reader requires explicit primary key fields specification"
)

_check_entitlements("iceberg")
schema, api_schema = read_schema(
schema=schema,
value_columns=None,
primary_key=None,
types=None,
default_values=None,
)

data_storage = api.DataStorage(
storage_type="iceberg",
path=catalog_uri,
database=warehouse,
table_name=table_name,
namespace=namespace,
mode=internal_connector_mode(mode),
persistent_id=persistent_id,
)
data_format = api.DataFormat(
format_type="transparent",
**api_schema,
)

data_source_options = datasource.DataSourceOptions(
commit_duration_ms=autocommit_duration_ms
)
return table_from_datasource(
datasource.GenericDataSource(
datastorage=data_storage,
dataformat=data_format,
schema=schema,
data_source_options=data_source_options,
datasource_name="iceberg",
append_only=True,
),
debug_datasource=datasource.debug_datasource(debug_data),
)


@check_arg_types
Expand Down Expand Up @@ -60,7 +188,7 @@ def write(
>>> import pathway as pw
>>> class InputSchema(pw.Schema):
... user_id: int
... user_id: int = pw.column_definition(primary_key=True)
... name: str
Using this schema, you can read the table from the input file. You need to use the
Expand Down
Loading

0 comments on commit d29ca65

Please sign in to comment.