-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support lazy schema retrieval in IO Plugins #18638
Comments
Let me know if this is worth another Issue, but I am also interested in the scenario where I am able to leverage existing built-in LazyFrame scans, but want to do some lazily-defined work beforehand. Motivating example: I would like to write my own scan_encrypted_parquet method. I can of course write a register_io_source callable which decrypts files and yields DataFrames, but this means that any compute graph defined within that method is disconnected from later compute. And the onus is then on me to pass through the with_columns args etc appropriately. Imagine for example a similar function to register_io_source, whose callable argument returns LazyFrames. |
Here is my work in progress, illustrating both questions I have: def scan_parquet_sc(
f: Path,
schema: SchemaDict, # ideally we would collect this lazily (and maybe cache the result if collect_schema is called before collect.) But until then, don't get this schema wrong!
) -> pl.LazyFrame:
"""Decrypts a parquet file then scans.
Don't ask why we're encrypting the whole file instead of using the Parquet columnar standard..."""
def pq_source(
with_columns: list[str] | None,
predicate: pl.Expr | None,
_n_rows: int | None,
_batch_size: int | None,
) -> Iterator[pl.DataFrame]:
res = _decrypt_and_scan_parquet_sc(f, with_columns, predicate, _n_rows, _batch_size)
if _batch_size is not None:
logger.warning(f"Was passed {_batch_size=} but will ignore for now - maybe we should collect row group by row group then check if that's big enough to batch")
yield res.collect() # if batch=None I would perhaps like a way to return a LazyFrame?
return register_io_source(pq_source, schema=schema)
def _decrypt_and_scan_parquet_sc(f: Path,
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
_batch_size: int | None,
) -> pl.LazyFrame:
pq_bytes = BytesIO(crypt.decrypt_bytes(f.read_bytes()))
df = pl.scan_parquet(pq_bytes) # once #10413 is released...
if with_columns is not None:
df = df.select(with_columns)
if predicate is not None:
df = df.filter(predicate)
if _n_rows is not None:
df = df.head(_n_rows) # I'm pretty sure you want head and not fetch here
return df
def _collect_schema_parquet_sc(f: Path):
# TODO implement and hook in when supported
... |
Hey @tmct, great that you made an IO plugin! That's where they are for. This example shows the ability to create the schema lazily: https://github.com/pola-rs/pyo3-polars/tree/main/example/io_plugin |
Many thanks Ritchie, I'll give it a go. |
Does 'src.schema()' in that example not realise the schema at the time of scan_... though, rather than lazily? I do not understand the significance of that last instance of the Source, so likely I've misunderstood how it works |
I suppose another way of describing what I want is this: Currently I can provide a custom Python function that can be used as the basis for a LazyFrame collect() - this is the IO plugins feature. To do so, I must currently provide a fixed schema up front. (And - I am pleased that this extensibility point exists!) But - a general LazyFrame returned from e.g. scan_parquets followed by subsequent LazyFrame operations does not necessarily know its schema up front. (Indeed, I believe the case where this is non-trivial motivated the introduction of collect_schema()?) So - I wish to implement such "fully lazy" LazyFrames using custom Python functions please, so that I can use the Polars Lazy API as a fully lazy description of my intended tabular processing. I imagine this could take the form of me passing a python callable returning the schema. Do you think this would be possible? Desirable? Thank you |
I've made an attempt at starting the public-facing side of this feature in #19232. Any feedback or help would be much welcomed, thanks. |
Thanks to some help from @itamarst I now have a branch of my fork with this feature on it! It's not prod-ready at this point but I will have a play with it. Example (passing) test from |
Description
Hi,
I just wrote my first Python IO Plugin, for fetching data from a company-internal data source - very pleased that you can create LazyFrames of completely custom origin now, thanks! This brings us one step closer to expressing entire data processing pipelines declaratively without breaks using the Lazy API.
What is missing from the example, and would really help me to put my
scan_custom_data_source
into production, is the ability to collect the schema lazily, as has been recently-ish made possible in LazyFrames.Is there already a way to do this, or are changes needed to register_io_source?
Many thanks,
Tom
The text was updated successfully, but these errors were encountered: