diff --git a/cytotable/constants.py b/cytotable/constants.py index 591b323..2fb3182 100644 --- a/cytotable/constants.py +++ b/cytotable/constants.py @@ -68,6 +68,13 @@ ], } +# metadata column names and types for internal use within CytoTable +CYOTABLE_META_COLUMN_TYPES = { + "cytotable_meta_source_path": "VARCHAR", + "cytotable_meta_offset": "BIGINT", + "cytotable_meta_rownum": "BIGINT", +} + CYTOTABLE_DEFAULT_PARQUET_METADATA = { "data-producer": "https://github.com/cytomining/CytoTable", "data-producer-version": str(_get_cytotable_version()), diff --git a/cytotable/convert.py b/cytotable/convert.py index 3324504..c0d782c 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -25,7 +25,9 @@ @python_app -def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]: +def _get_table_columns_and_types( + source: Dict[str, Any], sort_output: bool +) -> List[Dict[str, str]]: """ Gather column data from table through duckdb. @@ -33,6 +35,8 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind. + sort_output: + Specifies whether to sort cytotable output or not. Returns: List[Dict[str, str]] @@ -110,6 +114,8 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]] # offset is set to 0 start at first row # result from table offset=0, + add_cytotable_meta=False, + sort_output=sort_output, ) with _duckdb_reader() as ddb_reader: return ( @@ -276,6 +282,7 @@ def _source_chunk_to_parquet( chunk_size: int, offset: int, dest_path: str, + sort_output: bool, ) -> str: """ Export source data to chunked parquet file using chunk size and offsets. @@ -292,6 +299,8 @@ def _source_chunk_to_parquet( The offset for chunking the data from source. dest_path: str Path to store the output data. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str @@ -304,6 +313,7 @@ def _source_chunk_to_parquet( from cloudpathlib import AnyPath from pyarrow import parquet + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import ( _duckdb_reader, _sqlite_mixed_type_query_to_parquet, @@ -317,13 +327,39 @@ def _source_chunk_to_parquet( ) pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True) + source_path_str = ( + source["source_path"] + if "table_name" not in source.keys() + else f"{source['source_path']}_table_{source['table_name']}" + ) # build the column selection block of query + + # add cytotable metadata columns + cytotable_metadata_cols = [ + ( + f"CAST( '{source_path_str}' " + f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" + ' AS "cytotable_meta_source_path"' + ), + f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", + ( + f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" + ' AS "cytotable_meta_rownum"' + ), + ] + # add source table columns + casted_source_cols = [ + # here we cast the column to the specified type ensure the colname remains the same + f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" + for column in source["columns"] + ] + + # create selection statement from lists above select_columns = ",".join( - [ - # here we cast the column to the specified type ensure the colname remains the same - f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\"" - for column in source["columns"] - ] + # if we should sort the output, add the metadata_cols + cytotable_metadata_cols + casted_source_cols + if sort_output + else casted_source_cols ) # build output query and filepath base @@ -353,6 +389,11 @@ def _source_chunk_to_parquet( ORDER BY ALL LIMIT {chunk_size} OFFSET {offset} """ + if sort_output + else f""" + {base_query} + LIMIT {chunk_size} OFFSET {offset} + """ ).arrow(), where=result_filepath, ) @@ -375,6 +416,8 @@ def _source_chunk_to_parquet( table_name=str(source["table_name"]), chunk_size=chunk_size, offset=offset, + add_cytotable_meta=True if sort_output else False, + sort_output=sort_output, ), where=result_filepath, ) @@ -423,7 +466,10 @@ def _prepend_column_name( import pyarrow.parquet as parquet - from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING + from cytotable.constants import ( + CYOTABLE_META_COLUMN_TYPES, + CYTOTABLE_ARROW_USE_MEMORY_MAPPING, + ) from cytotable.utils import _write_parquet_table_with_metadata logger = logging.getLogger(__name__) @@ -471,8 +517,10 @@ def _prepend_column_name( # source_group_name_stem: 'Cells' # column_name: 'AreaShape_Area' # updated_column_name: 'Cells_AreaShape_Area' - if column_name not in identifying_columns and not column_name.startswith( - source_group_name_stem.capitalize() + if ( + column_name not in identifying_columns + and not column_name.startswith(source_group_name_stem.capitalize()) + and column_name not in CYOTABLE_META_COLUMN_TYPES ): updated_column_names.append(f"{source_group_name_stem}_{column_name}") # if-condition for prepending 'Metadata_' to column name @@ -680,6 +728,7 @@ def _concat_source_group( def _prepare_join_sql( sources: Dict[str, List[Dict[str, Any]]], joins: str, + sort_output: bool, ) -> str: """ Prepare join SQL statement with actual locations of data based on the sources. @@ -691,6 +740,8 @@ def _prepare_join_sql( joins: str: DuckDB-compatible SQL which will be used to perform the join operations using the join_group keys as a reference. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str: @@ -698,15 +749,30 @@ def _prepare_join_sql( """ import pathlib + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES + # replace with real location of sources for join sql + order_by_tables = [] for key, val in sources.items(): if pathlib.Path(key).stem.lower() in joins.lower(): + table_name = str(pathlib.Path(key).stem.lower()) joins = joins.replace( - f"'{str(pathlib.Path(key).stem.lower())}.parquet'", + f"'{table_name}.parquet'", str([str(table) for table in val[0]["table"]]), ) + order_by_tables.append(table_name) + + # create order by statement with from all tables using cytotable metadata + order_by_sql = "ORDER BY " + ", ".join( + [ + f"{table}.{meta_column}" + for table in order_by_tables + for meta_column in CYOTABLE_META_COLUMN_TYPES + ] + ) - return joins + # add the order by statements to the join + return joins + order_by_sql if sort_output else joins @python_app @@ -740,8 +806,7 @@ def _join_source_chunk( import pathlib - import pyarrow.parquet as parquet - + from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata # Attempt to read the data to parquet file @@ -749,12 +814,21 @@ def _join_source_chunk( # writing data to a parquet file. # read data with chunk size + offset # and export to parquet + exclude_meta_cols = [ + f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys()) + ] with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" + WITH joined AS ( {joins} - {"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""} LIMIT {chunk_size} OFFSET {offset} + ) + SELECT + /* exclude metadata columns from the results + by using a lambda on column names based on exclude_meta_cols. */ + COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) + FROM joined; """ ).arrow() @@ -974,6 +1048,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals chunk_size: Optional[int], infer_common_schema: bool, drop_null: bool, + sort_output: bool, data_type_cast_map: Optional[Dict[str, str]] = None, **kwargs, ) -> Union[Dict[str, List[Dict[str, Any]]], str]: @@ -1012,6 +1087,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals Whether to infer a common schema when concatenating sources. drop_null: bool: Whether to drop null results. + sort_output: bool + Specifies whether to sort cytotable output or not. data_type_cast_map: Dict[str, str] A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: @@ -1083,7 +1160,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals **{ "columns": _prep_cast_column_data_types( columns=_get_table_columns_and_types( - source=source, + source=source, sort_output=sort_output ), data_type_cast_map=data_type_cast_map, ) @@ -1109,6 +1186,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals chunk_size=chunk_size, offset=offset, dest_path=expanded_dest_path, + sort_output=sort_output, ), source_group_name=source_group_name, identifying_columns=identifying_columns, @@ -1169,7 +1247,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals evaluated_results = evaluate_futures(results) prepared_joins_sql = _prepare_join_sql( - sources=evaluated_results, joins=joins + sources=evaluated_results, joins=joins, sort_output=sort_output ).result() # map joined results based on the join groups gathered above @@ -1222,6 +1300,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema: bool = True, drop_null: bool = False, data_type_cast_map: Optional[Dict[str, str]] = None, + sort_output: bool = True, preset: Optional[str] = "cellprofiler_csv", parsl_config: Optional[parsl.Config] = None, **kwargs, @@ -1263,8 +1342,14 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals DuckDB-compatible SQL which will be used to perform the join operations. chunk_size: Optional[int] (Default value = None) Size of join chunks which is used to limit data size during join ops - infer_common_schema: bool: (Default value = True) + infer_common_schema: bool (Default value = True) Whether to infer a common schema when concatenating sources. + data_type_cast_map: Dict[str, str], (Default value = None) + A dictionary mapping data type groups to specific types. + Roughly includes Arrow data types language from: + https://arrow.apache.org/docs/python/api/datatypes.html + sort_output: bool (Default value = True) + Specifies whether to sort cytotable output or not. drop_null: bool (Default value = False) Whether to drop nan/null values from results preset: str (Default value = "cellprofiler_csv") @@ -1379,6 +1464,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema=infer_common_schema, drop_null=drop_null, data_type_cast_map=data_type_cast_map, + sort_output=sort_output, **kwargs, ) diff --git a/cytotable/presets.py b/cytotable/presets.py index 1503d2f..4f4191a 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -29,25 +29,19 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Image_Filtered AS ( - SELECT - /* seeks columns by name, avoiding failure if some do not exist */ - COLUMNS('^Metadata_ImageNumber$|^Image_Metadata_Well$|^Image_Metadata_Plate$') - FROM - read_parquet('image.parquet') - ) SELECT - * + image.Metadata_ImageNumber, + cytoplasm.* EXCLUDE (Metadata_ImageNumber), + cells.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber), + nuclei.* EXCLUDE (Metadata_ImageNumber, Metadata_ObjectNumber) FROM read_parquet('cytoplasm.parquet') AS cytoplasm - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_ImageNumber) + WHERE + cells.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells AND nuclei.Metadata_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Image_Filtered AS image ON - image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, }, "cellprofiler_sqlite": { @@ -74,26 +68,21 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Per_Image_Filtered AS ( - SELECT - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('per_image.parquet') - ) SELECT - * + per_image.Metadata_ImageNumber, + per_image.Image_Metadata_Well, + per_image.Image_Metadata_Plate, + per_cytoplasm.* EXCLUDE (Metadata_ImageNumber), + per_cells.* EXCLUDE (Metadata_ImageNumber), + per_nuclei.* EXCLUDE (Metadata_ImageNumber) FROM read_parquet('per_cytoplasm.parquet') AS per_cytoplasm - LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON - per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber - AND per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON - per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber) + WHERE + per_cells.Cells_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Cells AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei - LEFT JOIN Per_Image_Filtered AS per_image ON - per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, "cellprofiler_sqlite_pycytominer": { @@ -125,26 +114,21 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Per_Image_Filtered AS ( - SELECT - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('per_image.parquet') - ) SELECT - * + per_image.Metadata_ImageNumber, + per_image.Image_Metadata_Well, + per_image.Image_Metadata_Plate, + per_cytoplasm.* EXCLUDE (Metadata_ImageNumber), + per_cells.* EXCLUDE (Metadata_ImageNumber), + per_nuclei.* EXCLUDE (Metadata_ImageNumber) FROM read_parquet('per_cytoplasm.parquet') AS per_cytoplasm - LEFT JOIN read_parquet('per_cells.parquet') AS per_cells ON - per_cells.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber - AND per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei ON - per_nuclei.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('per_cells.parquet') AS per_cells USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_nuclei.parquet') AS per_nuclei USING (Metadata_ImageNumber) + LEFT JOIN read_parquet('per_image.parquet') AS per_image USING (Metadata_ImageNumber) + WHERE + per_cells.Metadata_Cells_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Cells AND per_nuclei.Metadata_Nuclei_Number_Object_Number = per_cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Per_Image_Filtered AS per_image ON - per_image.Metadata_ImageNumber = per_cytoplasm.Metadata_ImageNumber """, }, "cell-health-cellprofiler-to-cytominer-database": { @@ -178,30 +162,22 @@ # compartment and metadata joins performed using DuckDB SQL # and modified at runtime as needed "CONFIG_JOINS": """ - WITH Image_Filtered AS ( - SELECT - Metadata_TableNumber, - Metadata_ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM - read_parquet('image.parquet') - ) SELECT - * + image.Metadata_TableNumber, + image.Metadata_ImageNumber, + image.Image_Metadata_Well, + image.Image_Metadata_Plate, + cytoplasm.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber), + cells.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber), + nuclei.* EXCLUDE (Metadata_TableNumber, Metadata_ImageNumber) FROM read_parquet('cytoplasm.parquet') AS cytoplasm - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber + LEFT JOIN read_parquet('cells.parquet') AS cells USING (Metadata_TableNumber, Metadata_ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (Metadata_TableNumber, Metadata_ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (Metadata_TableNumber, Metadata_ImageNumber) + WHERE + cells.Cells_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Metadata_Cytoplasm_Parent_Nuclei - LEFT JOIN Image_Filtered AS image ON - image.Metadata_TableNumber = cytoplasm.Metadata_TableNumber - AND image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber """, }, "in-carta": { diff --git a/cytotable/utils.py b/cytotable/utils.py index 83f0b43..62972b9 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -171,6 +171,8 @@ def _sqlite_mixed_type_query_to_parquet( table_name: str, chunk_size: int, offset: int, + sort_output: bool, + add_cytotable_meta: bool = False, ) -> str: """ Performs SQLite table data extraction where one or many @@ -186,6 +188,10 @@ def _sqlite_mixed_type_query_to_parquet( Row count to use for chunked output. offset: int: The offset for chunking the data from source. + sort_output: bool + Specifies whether to sort cytotable output or not. + add_cytotable_meta: bool, default=False: + Whether to add CytoTable metadata fields or not Returns: pyarrow.Table: @@ -195,7 +201,10 @@ def _sqlite_mixed_type_query_to_parquet( import pyarrow as pa - from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS + from cytotable.constants import ( + CYOTABLE_META_COLUMN_TYPES, + SQLITE_AFFINITY_DATA_TYPE_SYNONYMS, + ) from cytotable.exceptions import DatatypeException # open sqlite3 connection @@ -207,7 +216,7 @@ def _sqlite_mixed_type_query_to_parquet( # See the following for more information: # https://sqlite.org/pragma.html#pragma_table_info cursor.execute( - f""" + """ SELECT :table_name as table_name, name as column_name, type as column_type @@ -255,15 +264,45 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: for col in column_info ] + if add_cytotable_meta: + query_parts += [ + ( + f"CAST( '{f'{source_path}_table_{table_name}'}' " + f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path'].lower())}) " + "AS cytotable_meta_source_path" + ), + ( + f"CAST( {offset} " + f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset'].lower())}) " + "AS cytotable_meta_offset" + ), + ( + f"CAST( (ROW_NUMBER() OVER ()) AS " + f"{_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum'].lower())}) " + "AS cytotable_meta_rownum" + ), + ] + # perform the select using the cases built above and using chunksize + offset - cursor.execute( + sql_stmt = ( f""" - SELECT {', '.join(query_parts)} + SELECT + {', '.join(query_parts)} FROM {table_name} ORDER BY {', '.join([col['column_name'] for col in column_info])} LIMIT {chunk_size} OFFSET {offset}; """ + if sort_output + else f""" + SELECT + {', '.join(query_parts)} + FROM {table_name} + LIMIT {chunk_size} OFFSET {offset}; + """ ) + + # execute the sql stmt + cursor.execute(sql_stmt) # collect the results and include the column name with values results = [ dict(zip([desc[0] for desc in cursor.description], row)) diff --git a/tests/conftest.py b/tests/conftest.py index b11ceeb..8323af2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,6 +28,7 @@ from pycytominer.cyto_utils.cells import SingleCells from sqlalchemy.util import deprecations +from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded # filters sqlalchemy 2.0 uber warning @@ -219,13 +220,25 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Image_Metadata_Plate": pa.array(["001", "001", "002", "002"]), "Image_Metadata_Well": pa.array(["A1", "A1", "A2", "A2"]), + "cytotable_meta_source_path": pa.array( + ["image.csv", "image.csv", "image.csv", "image.csv"] + ), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_cytoplasm = pa.Table.from_pydict( { "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cytoplasm_ObjectNumber": pa.array([1, 2, 1, 2]), + "Cytoplasm_Parent_Cells": pa.array([1, 2, 1, 2]), + "Cytoplasm_Parent_Nuclei": pa.array([1, 2, 1, 2]), "Cytoplasm_Feature_X": pa.array([0.1, 0.2, 0.1, 0.2]), + "cytotable_meta_source_path": pa.array( + ["cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv", "cytoplasm.csv"] + ), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_cells = pa.Table.from_pydict( @@ -233,6 +246,11 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["1", "1", "2", "2"]), "Cells_ObjectNumber": pa.array([1, 2, 1, 2]), "Cells_Feature_Y": pa.array([0.01, 0.02, 0.01, 0.02]), + "cytotable_meta_source_path": pa.array( + ["cells.csv", "cells.csv", "cells.csv", "cells.csv"] + ), + "cytotable_meta_offset": pa.array([50, 50, 100, 100]), + "cytotable_meta_rownum": pa.array([1, 2, 3, 4]), } ) table_nuclei_1 = pa.Table.from_pydict( @@ -255,6 +273,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: 0.002, ] ), + "cytotable_meta_source_path": pa.array(["nuclei_1.csv", "nuclei_1.csv"]), + "cytotable_meta_offset": pa.array([50, 50]), + "cytotable_meta_rownum": pa.array([1, 2]), } ) @@ -263,6 +284,9 @@ def fixture_example_tables() -> Tuple[pa.Table, ...]: "ImageNumber": pa.array(["2", "2"]), "Nuclei_ObjectNumber": pa.array([1, 2]), "Nuclei_Feature_Z": pa.array([0.001, 0.002]), + "cytotable_meta_source_path": pa.array(["nuclei_1.csv", "nuclei_1.csv"]), + "cytotable_meta_offset": pa.array([50, 50]), + "cytotable_meta_rownum": pa.array([1, 2]), } ) @@ -291,7 +315,18 @@ def fixture_example_local_sources( parents=True, exist_ok=True ) # write example input - csv.write_csv(table, f"{fx_tempdir}/example/{number}/{name}.csv") + csv.write_csv( + # we remove simulated cytotable metadata columns to be more realistic + # (incoming sources would not usually contain these) + table.select( + [ + column + for column in table.column_names + if column not in CYOTABLE_META_COLUMN_TYPES + ] + ), + f"{fx_tempdir}/example/{number}/{name}.csv", + ) # write example output parquet.write_table( table, f"{fx_tempdir}/example_dest/{name}/{number}/{name}.parquet" @@ -356,13 +391,25 @@ def col_renames(name: str, table: pa.Table): """ return table.rename_columns( [ - f"Metadata_{colname}" - if colname in ["ImageNumber", "ObjectNumber"] - else f"Metadata_{name}_{colname}" - if any(name in colname for name in ["Parent_Cells", "Parent_Nuclei"]) - else f"{name}_{colname}" - if not (colname.startswith(name) or colname.startswith("Metadata_")) - else colname + ( + f"Metadata_{colname}" + if colname in ["ImageNumber", "ObjectNumber"] + else ( + f"Metadata_{name}_{colname}" + if any( + name in colname + for name in ["Parent_Cells", "Parent_Nuclei"] + ) + else ( + f"{name}_{colname}" + if not ( + colname.startswith(name) + or colname.startswith("Metadata_") + ) + else colname + ) + ) + ) for colname in table.column_names ] ) @@ -448,22 +495,19 @@ def fixture_cellprofiler_merged_nf1data( .execute( """ /* perform query on sqlite tables through duckdb */ - WITH Per_Image_Filtered AS ( - SELECT - ImageNumber, - Image_Metadata_Well, - Image_Metadata_Plate - FROM Per_Image - ) - SELECT * - FROM Per_Image_Filtered image - LEFT JOIN Per_Cytoplasm cytoplasm ON - image.ImageNumber = cytoplasm.ImageNumber - LEFT JOIN Per_Cells cells ON - cells.ImageNumber = cytoplasm.ImageNumber - AND cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN Per_Nuclei nuclei ON - nuclei.ImageNumber = cytoplasm.ImageNumber + SELECT + image.ImageNumber, + image.Image_Metadata_Well, + image.Image_Metadata_Plate, + cytoplasm.*, + cells.*, + nuclei.* + FROM Per_Cytoplasm cytoplasm + LEFT JOIN Per_Cells cells USING (ImageNumber) + LEFT JOIN Per_Nuclei nuclei USING (ImageNumber) + LEFT JOIN Per_Image image USING (ImageNumber) + WHERE + cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Nuclei """ ) @@ -505,7 +549,7 @@ def fixture_cytominerdatabase_merged_cellhealth( """ sql_stmt = """ - WITH Image_Filtered AS ( + WITH image_filtered AS ( SELECT TableNumber, ImageNumber, @@ -534,9 +578,9 @@ def fixture_cytominerdatabase_merged_cellhealth( FROM Nuclei ) SELECT DISTINCT * - FROM Image_Filtered image + FROM image_filtered LEFT JOIN Cytoplasm_renamed cytoplasm ON - image.ImageNumber = cytoplasm.ImageNumber + image_filtered.ImageNumber = cytoplasm.ImageNumber LEFT JOIN Cells_renamed cells ON cells.ImageNumber = cytoplasm.ImageNumber AND cells.Cells_Number_Object_Number = cytoplasm.Cytoplasm_Parent_Cells diff --git a/tests/test_convert.py b/tests/test_convert.py index df29400..f221ff1 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -21,6 +21,7 @@ from pyarrow import csv, parquet from pycytominer.cyto_utils.cells import SingleCells +from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.convert import ( _concat_join_sources, _concat_source_group, @@ -367,7 +368,7 @@ def test_prepare_join_sql( example_local_sources: Dict[str, List[Dict[str, Any]]], ): """ - Tests _prepare_join_sql + Tests _prepare_join_sql by using sources to run the SQL join statement. After running _prepare_join_sql we'd expect something like: SELECT @@ -384,31 +385,54 @@ def test_prepare_join_sql( # attempt to run query against prepared_join_sql with test data with _duckdb_reader() as ddb_reader: - result = ( - ddb_reader.execute( - _prepare_join_sql( - sources=example_local_sources, - # simplified join for example dataset - joins=""" + result = ddb_reader.execute( + _prepare_join_sql( + sources=example_local_sources, + # simplified join for example dataset + joins=""" SELECT - * + image.ImageNumber, + cytoplasm.*, + cells.*, + nuclei.* FROM - read_parquet('image.parquet') AS image - LEFT JOIN read_parquet('cytoplasm.parquet') AS cytoplasm ON - cytoplasm.ImageNumber = image.ImageNumber - LEFT JOIN read_parquet('cells.parquet') AS cells ON - cells.ImageNumber = cytoplasm.ImageNumber - LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON - nuclei.ImageNumber = cytoplasm.ImageNumber + read_parquet('cytoplasm.parquet') AS cytoplasm + LEFT JOIN read_parquet('cells.parquet') AS cells USING (ImageNumber) + LEFT JOIN read_parquet('nuclei.parquet') AS nuclei USING (ImageNumber) + LEFT JOIN read_parquet('image.parquet') AS image USING (ImageNumber) + WHERE + cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells + AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, - ).result() - ) - .arrow() - .to_pydict() - ) - - # check that we received data back - assert len(result) == 9 + sort_output=True, + ).result() + ).df() + + # check that we received expected data back + assert result.shape == (4, 21) + assert result.iloc[0].to_dict() == { + "ImageNumber": "1", + "ImageNumber_1": "1", + "Cytoplasm_ObjectNumber": 1, + "Cytoplasm_Parent_Cells": 1, + "Cytoplasm_Parent_Nuclei": 1, + "Cytoplasm_Feature_X": 0.1, + "cytotable_meta_source_path": "cytoplasm.csv", + "cytotable_meta_offset": 50, + "cytotable_meta_rownum": 1, + "ImageNumber_2": "1", + "Cells_ObjectNumber": 1, + "Cells_Feature_Y": 0.01, + "cytotable_meta_source_path_1": "cells.csv", + "cytotable_meta_offset_1": 50, + "cytotable_meta_rownum_1": 1, + "ImageNumber_3": "1", + "Nuclei_ObjectNumber": 1, + "Nuclei_Feature_Z": 0.001, + "cytotable_meta_source_path_2": "nuclei_1.csv", + "cytotable_meta_offset_2": 50, + "cytotable_meta_rownum_2": 1, + } def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): @@ -448,10 +472,8 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): dest_path=f"{fx_tempdir}/destination.parquet", joins=f""" SELECT * - FROM read_parquet('{fx_tempdir}/example_a_merged.parquet') as example_a - JOIN read_parquet('{fx_tempdir}/example_b_merged.parquet') as example_b ON - example_b.id1 = example_a.id1 - AND example_b.id2 = example_a.id2 + FROM read_parquet('{test_path_a}') as example_a + JOIN read_parquet('{test_path_b}') as example_b USING(id1, id2) """, chunk_size=2, offset=0, @@ -464,10 +486,10 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): assert result_table.equals( other=pa.Table.from_pydict( { - "field1": ["foo", "foo"], - "field2": [True, True], - "id1": [1, 1], - "id2": ["a", "b"], + "field1": ["foo", "bar"], + "field2": [True, False], + "id1": [1, 2], + "id2": ["a", "a"], }, # use schema from result as a reference for col order schema=result_table.schema, @@ -617,6 +639,65 @@ def test_to_parquet( chunk_size=4, infer_common_schema=False, drop_null=True, + sort_output=True, + ), + ) + + flattened_results = list(itertools.chain(*list(result.values()))) + for i, flattened_result in enumerate(flattened_results): + csv_source = ( + _duckdb_reader() + .execute( + f""" + select * from + read_csv_auto('{str(flattened_example_sources[i]["source_path"])}', + ignore_errors=TRUE) + """ + ) + .arrow() + ) + parquet_result = parquet.ParquetDataset( + path_or_paths=flattened_result["table"], + # set the order of the columns uniformly for schema comparison + schema=csv_source.schema, + ).read() + assert parquet_result.schema.equals(csv_source.schema) + assert parquet_result.shape == csv_source.shape + + +def test_to_parquet_unsorted( + load_parsl_default: None, + fx_tempdir: str, + example_local_sources: Dict[str, List[Dict[str, Any]]], +): + """ + Tests _to_parquet with sort_output == False (unsorted) + """ + + flattened_example_sources = list( + itertools.chain(*list(example_local_sources.values())) + ) + + # note: we cast here for mypy linting (dict and str treatment differ) + result: Dict[str, List[Dict[str, Any]]] = cast( + dict, + _to_parquet( + source_path=str( + example_local_sources["image.csv"][0]["source_path"].parent + ), + dest_path=fx_tempdir, + source_datatype=None, + compartments=["cytoplasm", "cells", "nuclei"], + metadata=["image"], + identifying_columns=["imagenumber"], + concat=False, + join=False, + joins=None, + chunk_columns=None, + chunk_size=4, + infer_common_schema=False, + drop_null=True, + sort_output=False, ), ) @@ -1004,6 +1085,7 @@ def test_sqlite_mixed_type_query_to_parquet( table_name=table_name, chunk_size=2, offset=0, + sort_output=True, ), where=result_filepath, ) @@ -1048,6 +1130,12 @@ def test_sqlite_mixed_type_query_to_parquet( "Tbl_a_col_text": ["sample", "sample"], "Tbl_a_col_blob": [b"another_blob", b"sample_blob"], "Tbl_a_col_real": [None, 0.5], + "cytotable_meta_source_path": [ + f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", + f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", + ], + "cytotable_meta_offset": [0, 0], + "cytotable_meta_rownum": [2, 1], } @@ -1159,6 +1247,15 @@ def test_in_carta_to_parquet( ]["table"][0] ) + # drop cytotable metadata columns for comparisons (example sources won't contain these) + cytotable_result_table = cytotable_result_table.select( + [ + column + for column in cytotable_result_table.column_names + if column not in CYOTABLE_META_COLUMN_TYPES + ] + ) + # check the data against one another assert cytotable_result_table.schema.equals(ddb_result.schema) assert cytotable_result_table.shape == ddb_result.shape