Skip to content

Commit

Permalink
Increase sorting scalability via CytoTable metadata columns (#204)
Browse files Browse the repository at this point in the history
* customize sorting capabilities

for further performance in #175

* simplify sql; exclude cytotable meta

* exclude duplicate columns

* updating tests

* fixing tests

* simulate csv source by removing meta

* update preset sql to use refined syntax

* address mixed type queries and tests

* simplify and further clarity in test

* correcting comment

* make sorting optional

* fix existing tests

* further sorting options applied

* add a test for unsorted output
  • Loading branch information
d33bs authored Jun 12, 2024
1 parent 674fae8 commit fac4f56
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 146 deletions.
7 changes: 7 additions & 0 deletions cytotable/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
],
}

# metadata column names and types for internal use within CytoTable
CYOTABLE_META_COLUMN_TYPES = {
"cytotable_meta_source_path": "VARCHAR",
"cytotable_meta_offset": "BIGINT",
"cytotable_meta_rownum": "BIGINT",
}

CYTOTABLE_DEFAULT_PARQUET_METADATA = {
"data-producer": "https://github.com/cytomining/CytoTable",
"data-producer-version": str(_get_cytotable_version()),
Expand Down
120 changes: 103 additions & 17 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@


@python_app
def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]:
def _get_table_columns_and_types(
source: Dict[str, Any], sort_output: bool
) -> List[Dict[str, str]]:
"""
Gather column data from table through duckdb.
Args:
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind.
sort_output:
Specifies whether to sort cytotable output or not.
Returns:
List[Dict[str, str]]
Expand Down Expand Up @@ -110,6 +114,8 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
# offset is set to 0 start at first row
# result from table
offset=0,
add_cytotable_meta=False,
sort_output=sort_output,
)
with _duckdb_reader() as ddb_reader:
return (
Expand Down Expand Up @@ -276,6 +282,7 @@ def _source_chunk_to_parquet(
chunk_size: int,
offset: int,
dest_path: str,
sort_output: bool,
) -> str:
"""
Export source data to chunked parquet file using chunk size and offsets.
Expand All @@ -292,6 +299,8 @@ def _source_chunk_to_parquet(
The offset for chunking the data from source.
dest_path: str
Path to store the output data.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str
Expand All @@ -304,6 +313,7 @@ def _source_chunk_to_parquet(
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
Expand All @@ -317,13 +327,39 @@ def _source_chunk_to_parquet(
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

source_path_str = (
source["source_path"]
if "table_name" not in source.keys()
else f"{source['source_path']}_table_{source['table_name']}"
)
# build the column selection block of query

# add cytotable metadata columns
cytotable_metadata_cols = [
(
f"CAST( '{source_path_str}' "
f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})"
' AS "cytotable_meta_source_path"'
),
f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"",
(
f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})"
' AS "cytotable_meta_rownum"'
),
]
# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]

# create selection statement from lists above
select_columns = ",".join(
[
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]
# if we should sort the output, add the metadata_cols
cytotable_metadata_cols + casted_source_cols
if sort_output
else casted_source_cols
)

# build output query and filepath base
Expand Down Expand Up @@ -353,6 +389,11 @@ def _source_chunk_to_parquet(
ORDER BY ALL
LIMIT {chunk_size} OFFSET {offset}
"""
if sort_output
else f"""
{base_query}
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow(),
where=result_filepath,
)
Expand All @@ -375,6 +416,8 @@ def _source_chunk_to_parquet(
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
add_cytotable_meta=True if sort_output else False,
sort_output=sort_output,
),
where=result_filepath,
)
Expand Down Expand Up @@ -423,7 +466,10 @@ def _prepend_column_name(

import pyarrow.parquet as parquet

from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import (
CYOTABLE_META_COLUMN_TYPES,
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
from cytotable.utils import _write_parquet_table_with_metadata

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -471,8 +517,10 @@ def _prepend_column_name(
# source_group_name_stem: 'Cells'
# column_name: 'AreaShape_Area'
# updated_column_name: 'Cells_AreaShape_Area'
if column_name not in identifying_columns and not column_name.startswith(
source_group_name_stem.capitalize()
if (
column_name not in identifying_columns
and not column_name.startswith(source_group_name_stem.capitalize())
and column_name not in CYOTABLE_META_COLUMN_TYPES
):
updated_column_names.append(f"{source_group_name_stem}_{column_name}")
# if-condition for prepending 'Metadata_' to column name
Expand Down Expand Up @@ -680,6 +728,7 @@ def _concat_source_group(
def _prepare_join_sql(
sources: Dict[str, List[Dict[str, Any]]],
joins: str,
sort_output: bool,
) -> str:
"""
Prepare join SQL statement with actual locations of data based on the sources.
Expand All @@ -691,22 +740,39 @@ def _prepare_join_sql(
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str:
String representing the SQL to be used in later join work.
"""
import pathlib

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES

# replace with real location of sources for join sql
order_by_tables = []
for key, val in sources.items():
if pathlib.Path(key).stem.lower() in joins.lower():
table_name = str(pathlib.Path(key).stem.lower())
joins = joins.replace(
f"'{str(pathlib.Path(key).stem.lower())}.parquet'",
f"'{table_name}.parquet'",
str([str(table) for table in val[0]["table"]]),
)
order_by_tables.append(table_name)

# create order by statement with from all tables using cytotable metadata
order_by_sql = "ORDER BY " + ", ".join(
[
f"{table}.{meta_column}"
for table in order_by_tables
for meta_column in CYOTABLE_META_COLUMN_TYPES
]
)

return joins
# add the order by statements to the join
return joins + order_by_sql if sort_output else joins


@python_app
Expand Down Expand Up @@ -740,21 +806,29 @@ def _join_source_chunk(

import pathlib

import pyarrow.parquet as parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata

# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
# writing data to a parquet file.
# read data with chunk size + offset
# and export to parquet
exclude_meta_cols = [
f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())
]
with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
WITH joined AS (
{joins}
{"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""}
LIMIT {chunk_size} OFFSET {offset}
)
SELECT
/* exclude metadata columns from the results
by using a lambda on column names based on exclude_meta_cols. */
COLUMNS (c -> ({" AND ".join(exclude_meta_cols)}))
FROM joined;
"""
).arrow()

Expand Down Expand Up @@ -974,6 +1048,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
chunk_size: Optional[int],
infer_common_schema: bool,
drop_null: bool,
sort_output: bool,
data_type_cast_map: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
Expand Down Expand Up @@ -1012,6 +1087,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Whether to drop null results.
sort_output: bool
Specifies whether to sort cytotable output or not.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
Expand Down Expand Up @@ -1083,7 +1160,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
**{
"columns": _prep_cast_column_data_types(
columns=_get_table_columns_and_types(
source=source,
source=source, sort_output=sort_output
),
data_type_cast_map=data_type_cast_map,
)
Expand All @@ -1109,6 +1186,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
chunk_size=chunk_size,
offset=offset,
dest_path=expanded_dest_path,
sort_output=sort_output,
),
source_group_name=source_group_name,
identifying_columns=identifying_columns,
Expand Down Expand Up @@ -1169,7 +1247,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
evaluated_results = evaluate_futures(results)

prepared_joins_sql = _prepare_join_sql(
sources=evaluated_results, joins=joins
sources=evaluated_results, joins=joins, sort_output=sort_output
).result()

# map joined results based on the join groups gathered above
Expand Down Expand Up @@ -1222,6 +1300,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema: bool = True,
drop_null: bool = False,
data_type_cast_map: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
Expand Down Expand Up @@ -1263,8 +1342,14 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int] (Default value = None)
Size of join chunks which is used to limit data size during join ops
infer_common_schema: bool: (Default value = True)
infer_common_schema: bool (Default value = True)
Whether to infer a common schema when concatenating sources.
data_type_cast_map: Dict[str, str], (Default value = None)
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
sort_output: bool (Default value = True)
Specifies whether to sort cytotable output or not.
drop_null: bool (Default value = False)
Whether to drop nan/null values from results
preset: str (Default value = "cellprofiler_csv")
Expand Down Expand Up @@ -1379,6 +1464,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema=infer_common_schema,
drop_null=drop_null,
data_type_cast_map=data_type_cast_map,
sort_output=sort_output,
**kwargs,
)

Expand Down
Loading

0 comments on commit fac4f56

Please sign in to comment.