diff --git a/cytotable/constants.py b/cytotable/constants.py index 2fb3182..591b323 100644 --- a/cytotable/constants.py +++ b/cytotable/constants.py @@ -68,13 +68,6 @@ ], } -# metadata column names and types for internal use within CytoTable -CYOTABLE_META_COLUMN_TYPES = { - "cytotable_meta_source_path": "VARCHAR", - "cytotable_meta_offset": "BIGINT", - "cytotable_meta_rownum": "BIGINT", -} - CYTOTABLE_DEFAULT_PARQUET_METADATA = { "data-producer": "https://github.com/cytomining/CytoTable", "data-producer-version": str(_get_cytotable_version()), diff --git a/cytotable/convert.py b/cytotable/convert.py index a6e3e26..48b9514 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -4,7 +4,6 @@ import itertools import logging -import uuid from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast import parsl @@ -33,7 +32,7 @@ def _get_table_columns_and_types( Args: source: Dict[str, Any] - Contains the source data to be chunked. Represents a single + Contains source data details. Represents a single file or table of some kind. sort_output: Specifies whether to sort cytotable output or not. @@ -43,10 +42,7 @@ def _get_table_columns_and_types( list of dictionaries which each include column level information """ - import pathlib - import duckdb - from cloudpathlib import AnyPath from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet @@ -89,7 +85,7 @@ def _get_table_columns_and_types( # with exception handling to read mixed-type data # using sqlite3 and special utility function try: - # isolate using new connection to read data with chunk size + offset + # isolate using new connection to read data based on pageset # and export directly to parquet via duckdb (avoiding need to return data to python) # perform the query and create a list of dictionaries with the column data for table with _duckdb_reader() as ddb_reader: @@ -109,13 +105,8 @@ def _get_table_columns_and_types( arrow_data_tbl = _sqlite_mixed_type_query_to_parquet( source_path=str(source["source_path"]), table_name=str(source["table_name"]), - # chunk size is set to 5 as a limit similar - # to above SQL within select_query variable - chunk_size=5, - # offset is set to 0 start at first row - # result from table - offset=0, - add_cytotable_meta=False, + page_key=source["page_key"], + pageset=source["pagesets"][0], sort_output=sort_output, ) with _duckdb_reader() as ddb_reader: @@ -183,13 +174,14 @@ def _prep_cast_column_data_types( @python_app -def _get_table_chunk_offsets( +def _get_table_keyset_pagination_sets( chunk_size: int, + page_key: str, source: Optional[Dict[str, Any]] = None, sql_stmt: Optional[str] = None, -) -> Union[List[int], None]: +) -> Union[List[Tuple[Union[int, float], Union[int, float]]], None]: """ - Get table data chunk offsets for later use in capturing segments + Get table data chunk keys for later use in capturing segments of values. This work also provides a chance to catch problematic input data which will be ignored with warnings. @@ -199,21 +191,27 @@ def _get_table_chunk_offsets( file or table of some kind. chunk_size: int The size in rowcount of the chunks to create. + page_key: str + The column name to be used to identify pagination chunks. + Expected to be of numeric type (int, float) for ordering. + sql_stmt: + Optional sql statement to form the pagination set from. + Default behavior extracts pagination sets from the full + data source. Returns: - List[int] - List of integers which represent offsets to use for reading - the data later on. + List[Any] + List of keys to use for reading the data later on. """ import logging - import pathlib + import sqlite3 + from contextlib import closing import duckdb - from cloudpathlib import AnyPath, CloudPath from cytotable.exceptions import NoInputDataException - from cytotable.utils import _duckdb_reader + from cytotable.utils import _duckdb_reader, _generate_pagesets logger = logging.getLogger(__name__) @@ -223,18 +221,29 @@ def _get_table_chunk_offsets( source_type = str(source_path.suffix).lower() try: - # gather the total rowcount from csv or sqlite data input sources with _duckdb_reader() as ddb_reader: - rowcount = int( - ddb_reader.execute( - # nosec - f"SELECT COUNT(*) from read_csv_auto('{source_path}', header=TRUE, delim=',')" - if source_type == ".csv" - else f"SELECT COUNT(*) from sqlite_scan('{source_path}', '{table_name}')" - ).fetchone()[0] - ) + if source_type == ".csv": + sql_query = f"SELECT {page_key} FROM read_csv_auto('{source_path}', header=TRUE, delim=',') ORDER BY {page_key}" + else: + sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}" + + page_keys = [ + results[0] for results in ddb_reader.execute(sql_query).fetchall() + ] + + # exception case for when we have mixed types + # (i.e. integer col with string and ints) in a sqlite column + except duckdb.TypeMismatchException: + with closing(sqlite3.connect(source_path)) as cx: + with cx: + page_keys = [ + key[0] + for key in cx.execute( + f"SELECT {page_key} FROM {table_name} ORDER BY {page_key};" + ).fetchall() + if isinstance(key[0], (int, float)) + ] - # catch input errors which will result in skipped files except ( duckdb.InvalidInputException, NoInputDataException, @@ -245,34 +254,20 @@ def _get_table_chunk_offsets( return None - # find chunk offsets from sql statement elif sql_stmt is not None: - # gather the total rowcount from csv or sqlite data input sources with _duckdb_reader() as ddb_reader: - rowcount = int( - ddb_reader.execute( - # nosec - f"SELECT COUNT(*) FROM ({sql_stmt})" - ).fetchone()[0] - ) + sql_query = f"SELECT {page_key} FROM ({sql_stmt}) ORDER BY {page_key}" + page_keys = ddb_reader.execute(sql_query).fetchall() + page_keys = [key[0] for key in page_keys] - return list( - range( - 0, - # gather rowcount from table and use as maximum for range - rowcount, - # step through using chunk size - chunk_size, - ) - ) + return _generate_pagesets(page_keys, chunk_size) @python_app -def _source_chunk_to_parquet( +def _source_pageset_to_parquet( source_group_name: str, source: Dict[str, Any], - chunk_size: int, - offset: int, + pageset: Tuple[Union[int, float], Union[int, float]], dest_path: str, sort_output: bool, ) -> str: @@ -285,10 +280,8 @@ def _source_chunk_to_parquet( source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind along with collected information about table. - chunk_size: int - Row count to use for chunked output. - offset: int - The offset for chunking the data from source. + pageset: Tuple[int, int] + The pageset for chunking the data from source. dest_path: str Path to store the output data. sort_output: bool @@ -303,9 +296,7 @@ def _source_chunk_to_parquet( import duckdb from cloudpathlib import AnyPath - from pyarrow import parquet - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import ( _duckdb_reader, _sqlite_mixed_type_query_to_parquet, @@ -319,26 +310,6 @@ def _source_chunk_to_parquet( ) pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True) - source_path_str = ( - source["source_path"] - if "table_name" not in source.keys() - else f"{source['source_path']}_table_{source['table_name']}" - ) - # build the column selection block of query - - # add cytotable metadata columns - cytotable_metadata_cols = [ - ( - f"CAST( '{source_path_str}' " - f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" - ' AS "cytotable_meta_source_path"' - ), - f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", - ( - f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" - ' AS "cytotable_meta_rownum"' - ), - ] # add source table columns casted_source_cols = [ # here we cast the column to the specified type ensure the colname remains the same @@ -349,7 +320,7 @@ def _source_chunk_to_parquet( # create selection statement from lists above select_columns = ",".join( # if we should sort the output, add the metadata_cols - cytotable_metadata_cols + casted_source_cols + casted_source_cols if sort_output else casted_source_cols ) @@ -364,7 +335,8 @@ def _source_chunk_to_parquet( base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')" result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}" - result_filepath = f"{result_filepath_base}-{offset}.parquet" + # form a filepath which indicates the pageset + result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet" # Attempt to read the data to parquet file # using duckdb for extraction and pyarrow for @@ -377,14 +349,9 @@ def _source_chunk_to_parquet( table=ddb_reader.execute( f""" {base_query} - /* order by all columns for deterministic output */ - ORDER BY ALL - LIMIT {chunk_size} OFFSET {offset} - """ - if sort_output - else f""" - {base_query} - LIMIT {chunk_size} OFFSET {offset} + WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} + /* optional ordering per pageset */ + {"ORDER BY " + source['page_key'] if sort_output else ""}; """ ).arrow(), where=result_filepath, @@ -406,9 +373,8 @@ def _source_chunk_to_parquet( table=_sqlite_mixed_type_query_to_parquet( source_path=str(source["source_path"]), table_name=str(source["table_name"]), - chunk_size=chunk_size, - offset=offset, - add_cytotable_meta=True if sort_output else False, + page_key=source["page_key"], + pageset=pageset, sort_output=sort_output, ), where=result_filepath, @@ -458,10 +424,7 @@ def _prepend_column_name( import pyarrow.parquet as parquet - from cytotable.constants import ( - CYOTABLE_META_COLUMN_TYPES, - CYTOTABLE_ARROW_USE_MEMORY_MAPPING, - ) + from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING from cytotable.utils import _write_parquet_table_with_metadata logger = logging.getLogger(__name__) @@ -472,7 +435,7 @@ def _prepend_column_name( if len(targets) == 0: logger.warning( msg=( - "Skipping column name prepend operations" + "Skipping column name prepend operations " "because no compartments or metadata were provided." ) ) @@ -509,10 +472,8 @@ def _prepend_column_name( # source_group_name_stem: 'Cells' # column_name: 'AreaShape_Area' # updated_column_name: 'Cells_AreaShape_Area' - if ( - column_name not in identifying_columns - and not column_name.startswith(source_group_name_stem.capitalize()) - and column_name not in CYOTABLE_META_COLUMN_TYPES + if column_name not in identifying_columns and not column_name.startswith( + source_group_name_stem.capitalize() ): updated_column_names.append(f"{source_group_name_stem}_{column_name}") # if-condition for prepending 'Metadata_' to column name @@ -574,6 +535,7 @@ def _concat_source_group( source_group: List[Dict[str, Any]], dest_path: str, common_schema: Optional[List[Tuple[str, str]]] = None, + sort_output: bool = True, ) -> List[Dict[str, Any]]: """ Concatenate group of source data together as single file. @@ -620,6 +582,8 @@ def _concat_source_group( common_schema: List[Tuple[str, str]] (Default value = None) Common schema to use for concatenation amongst arrow tables which may have slightly different but compatible schema. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: List[Dict[str, Any]] @@ -637,7 +601,7 @@ def _concat_source_group( CYTOTABLE_DEFAULT_PARQUET_METADATA, ) from cytotable.exceptions import SchemaException - from cytotable.utils import _write_parquet_table_with_metadata + from cytotable.utils import _natural_sort # build a result placeholder concatted: List[Dict[str, Any]] = [ @@ -676,7 +640,10 @@ def _concat_source_group( # (all must be the same schema) with parquet.ParquetWriter(str(destination_path), writer_schema) as writer: for source in source_group: - for table in [table for table in source["table"]]: + tables = [table for table in source["table"]] + if sort_output: + tables = _natural_sort(tables) + for table in tables: # if we haven't inferred the common schema # check that our file matches the expected schema, otherwise raise an error if common_schema is None and not writer_schema.equals( @@ -720,7 +687,6 @@ def _concat_source_group( def _prepare_join_sql( sources: Dict[str, List[Dict[str, Any]]], joins: str, - sort_output: bool, ) -> str: """ Prepare join SQL statement with actual locations of data based on the sources. @@ -741,8 +707,6 @@ def _prepare_join_sql( """ import pathlib - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES - # replace with real location of sources for join sql order_by_tables = [] for key, val in sources.items(): @@ -754,25 +718,17 @@ def _prepare_join_sql( ) order_by_tables.append(table_name) - # create order by statement with from all tables using cytotable metadata - order_by_sql = "ORDER BY " + ", ".join( - [ - f"{table}.{meta_column}" - for table in order_by_tables - for meta_column in CYOTABLE_META_COLUMN_TYPES - ] - ) - # add the order by statements to the join - return joins + order_by_sql if sort_output else joins + return joins @python_app -def _join_source_chunk( +def _join_source_pageset( dest_path: str, joins: str, - chunk_size: int, - offset: int, + page_key: str, + pageset: Tuple[int, int], + sort_output: bool, drop_null: bool, ) -> str: """ @@ -798,31 +754,20 @@ def _join_source_chunk( import pathlib - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata - # Attempt to read the data to parquet file - # using duckdb for extraction and pyarrow for - # writing data to a parquet file. - # read data with chunk size + offset - # and export to parquet - exclude_meta_cols = [ - f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys()) - ] - with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" - WITH joined AS ( + WITH joined AS ( {joins} - LIMIT {chunk_size} OFFSET {offset} - ) - SELECT - /* exclude metadata columns from the results - by using a lambda on column names based on exclude_meta_cols. */ - COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) - FROM joined; - """ + ) + SELECT * + FROM joined + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + /* optional sorting per pagset */ + {"ORDER BY " + page_key if sort_output else ""}; + """ ).arrow() # drop nulls if specified @@ -847,10 +792,8 @@ def _join_source_chunk( f"{str(pathlib.Path(dest_path).parent)}/" # use the dest_path stem in the name f"{str(pathlib.Path(dest_path).stem)}-" - # give the join chunk result a unique to arbitrarily - # differentiate from other chunk groups which are mapped - # and before they are brought together as one dataset - f"{str(uuid.uuid4().hex)}.parquet" + # add the pageset indication to the filename + f"{pageset[0]}-{pageset[1]}.parquet" ) # write the result @@ -867,6 +810,7 @@ def _concat_join_sources( sources: Dict[str, List[Dict[str, Any]]], dest_path: str, join_sources: List[str], + sort_output: bool = True, ) -> str: """ Concatenate join sources from parquet-based chunks. @@ -883,6 +827,8 @@ def _concat_join_sources( join_sources: List[str]: List of local filepath destination for join source chunks which will be concatenated. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str @@ -898,7 +844,7 @@ def _concat_join_sources( CYTOTABLE_ARROW_USE_MEMORY_MAPPING, CYTOTABLE_DEFAULT_PARQUET_METADATA, ) - from cytotable.utils import _write_parquet_table_with_metadata + from cytotable.utils import _natural_sort # remove the unjoined concatted compartments to prepare final dest_path usage # (we now have joined results) @@ -918,7 +864,11 @@ def _concat_join_sources( CYTOTABLE_DEFAULT_PARQUET_METADATA ) with parquet.ParquetWriter(str(dest_path), writer_schema) as writer: - for table_path in join_sources: + for table_path in ( + join_sources + if not sort_output + else _natural_sort(list_to_sort=join_sources) + ): writer.write_table( parquet.read_table( table_path, @@ -1042,6 +992,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals infer_common_schema: bool, drop_null: bool, sort_output: bool, + page_keys: Dict[str, str], data_type_cast_map: Optional[Dict[str, str]] = None, **kwargs, ) -> Union[Dict[str, List[Dict[str, Any]]], str]: @@ -1082,6 +1033,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals Whether to drop null results. sort_output: bool Specifies whether to sort cytotable output or not. + page_keys: Dict[str, str] + A dictionary which defines which column names are used for keyset pagination + in order to perform data extraction. data_type_cast_map: Dict[str, str] A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: @@ -1112,16 +1066,35 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # expand the destination path expanded_dest_path = _expand_path(path=dest_path) - # prepare offsets for chunked data export from source tables - offsets_prepared = { + # check that each source group name has a pagination key + for source_group_name in sources.keys(): + matching_keys = [ + key for key in page_keys.keys() if key.lower() in source_group_name.lower() + ] + if not matching_keys: + raise CytoTableException( + f"No matching key found in page_keys for source_group_name: {source_group_name}." + "Please include a pagination key based on a column name from the table." + ) + + # prepare pagesets for chunked data export from source tables + pagesets_prepared = { source_group_name: [ dict( source, **{ - "offsets": _get_table_chunk_offsets( + "page_key": ( + page_key := [ + value + for key, value in page_keys.items() + if key.lower() in source_group_name.lower() + ][0] + ), + "pagesets": _get_table_keyset_pagination_sets( source=source, chunk_size=chunk_size, - ) + page_key=page_key, + ), }, ) for source in source_group_vals @@ -1129,17 +1102,17 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals for source_group_name, source_group_vals in sources.items() } - # if offsets is none and we haven't halted, remove the file as there + # if pagesets is none and we haven't halted, remove the file as there # were input formatting errors which will create challenges downstream invalid_files_dropped = { source_group_name: [ - # ensure we have offsets + # ensure we have pagesets source for source in source_group_vals - if source["offsets"] is not None + if source["pagesets"] is not None ] for source_group_name, source_group_vals in evaluate_futures( - offsets_prepared + pagesets_prepared ).items() # ensure we have source_groups with at least one source table if len(source_group_vals) > 0 @@ -1172,12 +1145,11 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals "table": [ # perform column renaming and create potential return result _prepend_column_name( - # perform chunked data export to parquet using offsets - table_path=_source_chunk_to_parquet( + # perform chunked data export to parquet using pagesets + table_path=_source_pageset_to_parquet( source_group_name=source_group_name, source=source, - chunk_size=chunk_size, - offset=offset, + pageset=pageset, dest_path=expanded_dest_path, sort_output=sort_output, ), @@ -1186,7 +1158,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals metadata=metadata, compartments=compartments, ) - for offset in source["offsets"] + for pageset in source["pagesets"] ] }, ) @@ -1227,6 +1199,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals source_group=source_group_vals[0]["sources"], dest_path=expanded_dest_path, common_schema=source_group_vals[0]["common_schema"], + sort_output=sort_output, ) for source_group_name, source_group_vals in evaluate_futures( common_schema_determined @@ -1240,28 +1213,34 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals evaluated_results = evaluate_futures(results) prepared_joins_sql = _prepare_join_sql( - sources=evaluated_results, joins=joins, sort_output=sort_output + sources=evaluated_results, joins=joins ).result() + page_key_join = [ + value for key, value in page_keys.items() if key.lower() == "join" + ][0] + # map joined results based on the join groups gathered above # note: after mapping we end up with a list of strings (task returns str) join_sources_result = [ - _join_source_chunk( + _join_source_pageset( # gather the result of concatted sources prior to # join group merging as each mapped task run will need # full concat results dest_path=expanded_dest_path, joins=prepared_joins_sql, - chunk_size=chunk_size, - offset=offset, + page_key=page_key_join, + pageset=pageset, + sort_output=sort_output, drop_null=drop_null, ) # create join group for querying the concatenated # data in order to perform memory-safe joining # per user chunk size specification. - for offset in _get_table_chunk_offsets( + for pageset in _get_table_keyset_pagination_sets( sql_stmt=prepared_joins_sql, chunk_size=chunk_size, + page_key=page_key_join, ).result() ] @@ -1272,6 +1251,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals dest_path=expanded_dest_path, join_sources=[join.result() for join in join_sources_result], sources=evaluated_results, + sort_output=sort_output, ) # wrap the final result as a future and return @@ -1293,6 +1273,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema: bool = True, drop_null: bool = False, data_type_cast_map: Optional[Dict[str, str]] = None, + page_keys: Optional[Dict[str, str]] = None, sort_output: bool = True, preset: Optional[str] = "cellprofiler_csv", parsl_config: Optional[parsl.Config] = None, @@ -1341,6 +1322,12 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: https://arrow.apache.org/docs/python/api/datatypes.html + page_keys: str: + The table and column names to be used for key pagination. + Uses the form: {"table_name":"column_name"}. + Expects columns to include numeric data (ints or floats). + Interacts with the `chunk_size` parameter to form + pages of `chunk_size`. sort_output: bool (Default value = True) Specifies whether to sort cytotable output or not. drop_null: bool (Default value = False) @@ -1440,6 +1427,24 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals if chunk_size is None else chunk_size ) + page_keys = ( + cast(dict, config[preset]["CONFIG_PAGE_KEYS"]) + if page_keys is None + else page_keys + ) + + # Raise an exception for scenarios where one configures CytoTable to join + # but does not provide a pagination key for the joins. + if join and (page_keys is None or "join" not in page_keys.keys()): + raise CytoTableException( + ( + "When using join=True one must pass a 'join' pagination key " + "in the page_keys parameter. The 'join' pagination key is a column " + "name found within the joined results based on the SQL provided from " + "the joins parameter. This special key is required as not all columns " + "from the source tables might not be included." + ) + ) # send sources to be written to parquet if selected if dest_datatype == "parquet": @@ -1458,6 +1463,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals drop_null=drop_null, data_type_cast_map=data_type_cast_map, sort_output=sort_output, + page_keys=cast(dict, page_keys), **kwargs, ) diff --git a/cytotable/presets.py b/cytotable/presets.py index c53e958..efaa05c 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -22,6 +22,16 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -61,6 +71,16 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "Cells_Number_Object_Number", + "nuclei": "Nuclei_Number_Object_Number", + "cytoplasm": "Cytoplasm_Number_Object_Number", + "join": "Cytoplasm_Number_Object_Number", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -104,6 +124,16 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -155,6 +185,16 @@ "Cells_Number_Object_Number", "Nuclei_Number_Object_Number", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "Cells_Number_Object_Number", + "nuclei": "Nuclei_Number_Object_Number", + "cytoplasm": "Cytoplasm_Number_Object_Number", + "join": "Cytoplasm_Number_Object_Number", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -203,6 +243,16 @@ "Cells_ObjectNumber", "Nuclei_ObjectNumber", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -248,6 +298,12 @@ "Z", "T", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) + "CONFIG_PAGE_KEYS": { + "test": '"OBJECT ID"', + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. diff --git a/cytotable/utils.py b/cytotable/utils.py index c95d4fb..1ea9792 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -5,7 +5,7 @@ import logging import os import pathlib -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union, cast import duckdb import parsl @@ -173,10 +173,9 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection: def _sqlite_mixed_type_query_to_parquet( source_path: str, table_name: str, - chunk_size: int, - offset: int, + page_key: str, + pageset: Tuple[Union[int, float], Union[int, float]], sort_output: bool, - add_cytotable_meta: bool = False, ) -> str: """ Performs SQLite table data extraction where one or many @@ -188,10 +187,10 @@ def _sqlite_mixed_type_query_to_parquet( A str which is a path to a SQLite database file. table_name: str: The name of the table being queried. - chunk_size: int: - Row count to use for chunked output. - offset: int: - The offset for chunking the data from source. + page_key: str: + The column name to be used to identify pagination chunks. + pageset: Tuple[int, int]: + The range for values used for paginating data from source. sort_output: bool Specifies whether to sort cytotable output or not. add_cytotable_meta: bool, default=False: @@ -205,10 +204,7 @@ def _sqlite_mixed_type_query_to_parquet( import pyarrow as pa - from cytotable.constants import ( - CYOTABLE_META_COLUMN_TYPES, - SQLITE_AFFINITY_DATA_TYPE_SYNONYMS, - ) + from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS from cytotable.exceptions import DatatypeException # open sqlite3 connection @@ -268,42 +264,14 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: for col in column_info ] - if add_cytotable_meta: - query_parts += [ - ( - f"CAST( '{f'{source_path}_table_{table_name}'}' " - f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path'].lower())}) " - "AS cytotable_meta_source_path" - ), - ( - f"CAST( {offset} " - f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset'].lower())}) " - "AS cytotable_meta_offset" - ), - ( - f"CAST( (ROW_NUMBER() OVER ()) AS " - f"{_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum'].lower())}) " - "AS cytotable_meta_rownum" - ), - ] - # perform the select using the cases built above and using chunksize + offset - sql_stmt = ( - f""" + sql_stmt = f""" SELECT {', '.join(query_parts)} FROM {table_name} - ORDER BY {', '.join([col['column_name'] for col in column_info])} - LIMIT {chunk_size} OFFSET {offset}; + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + {"ORDER BY " + page_key if sort_output else ""}; """ - if sort_output - else f""" - SELECT - {', '.join(query_parts)} - FROM {table_name} - LIMIT {chunk_size} OFFSET {offset}; - """ - ) # execute the sql stmt cursor.execute(sql_stmt) @@ -600,3 +568,77 @@ def evaluate_futures(sources: Union[Dict[str, List[Dict[str, Any]]], str]) -> An if isinstance(sources, dict) else _unwrap_value(sources) ) + + +def _generate_pagesets( + keys: List[Union[int, float]], chunk_size: int +) -> List[Tuple[Union[int, float], Union[int, float]]]: + """ + Generate a pageset (keyset pagination) from a list of keys. + + Parameters: + keys List[Union[int, float]]: + List of keys to paginate. + chunk_size int: + Size of each chunk/page. + + Returns: + List[Tuple[Union[int, float], Union[int, float]]]: + List of (start_key, end_key) tuples representing each page. + """ + + # Initialize an empty list to store the chunks/pages + chunks = [] + + # Start index for iteration through the keys + i = 0 + + while i < len(keys): + # Get the start key for the current chunk + start_key = keys[i] + + # Calculate the end index for the current chunk + end_index = min(i + chunk_size, len(keys)) - 1 + + # Get the end key for the current chunk + end_key = keys[end_index] + + # Ensure non-overlapping by incrementing the start of the next range if there are duplicates + while end_index + 1 < len(keys) and keys[end_index + 1] == end_key: + end_index += 1 + + # Append the current chunk (start_key, end_key) to the list of chunks + chunks.append((start_key, end_key)) + + # Update the index to start from the next chunk + i = end_index + 1 + + # Return the list of chunks/pages + return chunks + + +def _natural_sort(list_to_sort): + """ + Sorts the given iterable using natural sort adapted from approach + provided by the following link: + https://stackoverflow.com/a/4836734 + + Args: + list_to_sort: List: + The list to sort. + + Returns: + List: The sorted list. + """ + import re + + return sorted( + list_to_sort, + # use a custom key to sort the list + key=lambda key: [ + # use integer of c if it's a digit, otherwise str + int(c) if c.isdigit() else c + # Split the key into parts, separating numbers from alphabetic characters + for c in re.split("([0-9]+)", str(key)) + ], + ) diff --git a/docs/source/overview.md b/docs/source/overview.md index 5aacc2e..fb24bde 100644 --- a/docs/source/overview.md +++ b/docs/source/overview.md @@ -286,3 +286,19 @@ Also see CytoTable's presets found here: :data:`presets.config `). +The ``page_keys`` parameter expects a dictionary where the keys are names of tables and values which are columns to be used for the keyset pagination pages. +Pagination is implemented in conjunction with the ``chunk_size`` parameter which indicates the size of each page. +We provide preset configurations for these parameters through the ``preset`` parameter :code:`convert(..., preset="", ...)`. +Customizing the ``chunk_size`` or ``page_keys`` parameters allows you to tune the process to the size of your data and the resources available on your system. +For large datasets, smaller chunk sizes or specific pagination columns can help manage the workload by enabling smaller, more manageable data extraction at a time. +``` diff --git a/docs/source/python-api.md b/docs/source/python-api.md index 7868f2c..ab1fc62 100644 --- a/docs/source/python-api.md +++ b/docs/source/python-api.md @@ -25,7 +25,7 @@ Convert | -.. autofunction:: _get_table_chunk_offsets +.. autofunction:: _get_table_keyset_pagination_sets | @@ -33,7 +33,7 @@ Convert | -.. autofunction:: _join_source_chunk +.. autofunction:: _join_source_pageset | @@ -49,7 +49,7 @@ Convert | -.. autofunction:: _source_chunk_to_parquet +.. autofunction:: _source_pageset_to_parquet | diff --git a/tests/conftest.py b/tests/conftest.py index 43d1fbe..6045ace 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,6 @@ from pycytominer.cyto_utils.cells import SingleCells from sqlalchemy.util import deprecations -from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded # filters sqlalchemy 2.0 uber warning @@ -324,13 +323,7 @@ def fixture_example_local_sources( csv.write_csv( # we remove simulated cytotable metadata columns to be more realistic # (incoming sources would not usually contain these) - table.select( - [ - column - for column in table.column_names - if column not in CYOTABLE_META_COLUMN_TYPES - ] - ), + table.select(list(table.column_names)), f"{fx_tempdir}/example/{number}/{name}.csv", ) # write example output diff --git a/tests/test_convert.py b/tests/test_convert.py index ead14dc..bb823ee 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -21,12 +21,11 @@ from pyarrow import csv, parquet from pycytominer.cyto_utils.cells import SingleCells -from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.convert import ( _concat_join_sources, _concat_source_group, _infer_source_group_common_schema, - _join_source_chunk, + _join_source_pageset, _prepare_join_sql, _prepend_column_name, _to_parquet, @@ -61,6 +60,7 @@ def test_config(): "CONFIG_CHUNK_SIZE", "CONFIG_JOINS", "CONFIG_SOURCE_VERSION", + "CONFIG_PAGE_KEYS", ] ) == sorted(config_preset.keys()) @@ -404,7 +404,6 @@ def test_prepare_join_sql( cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei """, - sort_output=True, ).result() ).df() @@ -435,9 +434,9 @@ def test_prepare_join_sql( } -def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): +def test_join_source_pageset(load_parsl_default: None, fx_tempdir: str): """ - Tests _join_source_chunk + Tests _join_source_pageset """ # form test path a @@ -468,28 +467,30 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): where=test_path_b, ) - result = _join_source_chunk( + result = _join_source_pageset( dest_path=f"{fx_tempdir}/destination.parquet", joins=f""" SELECT * FROM read_parquet('{test_path_a}') as example_a JOIN read_parquet('{test_path_b}') as example_b USING(id1, id2) """, - chunk_size=2, - offset=0, + page_key="id1", + pageset=(1, 2), drop_null=True, + sort_output=True, ).result() assert isinstance(result, str) result_table = parquet.read_table(source=result) + assert result_table.equals( other=pa.Table.from_pydict( { - "field1": ["foo", "bar"], - "field2": [True, False], - "id1": [1, 2], - "id2": ["a", "a"], + "field1": ["foo", "foo", "bar", "bar"], + "field2": [True, True, False, False], + "id1": [1, 1, 2, 2], + "id2": ["a", "b", "a", "b"], }, # use schema from result as a reference for col order schema=result_table.schema, @@ -629,6 +630,12 @@ def test_to_parquet( compartments=["cytoplasm", "cells", "nuclei"], metadata=["image"], identifying_columns=["imagenumber"], + page_keys={ + "image": "ImageNumber", + "cells": "Cells_ObjectNumber", + "nuclei": "Nuclei_ObjectNumber", + "cytoplasm": "Cytoplasm_ObjectNumber", + }, concat=False, join=False, joins=None, @@ -687,6 +694,12 @@ def test_to_parquet_unsorted( compartments=["cytoplasm", "cells", "nuclei"], metadata=["image"], identifying_columns=["imagenumber"], + page_keys={ + "image": "ImageNumber", + "cells": "Cells_ObjectNumber", + "nuclei": "Nuclei_ObjectNumber", + "cytoplasm": "Cytoplasm_ObjectNumber", + }, concat=False, join=False, joins=None, @@ -1048,6 +1061,7 @@ def test_convert_cellprofiler_sqlite_pycytominer_merge( # note: we cast into pycytominer_table's schema types in order to # properly perform comparisons as pycytominer and cytotable differ in their # datatyping implementations + assert pycytominer_table.schema.equals( cytotable_table.cast(target_schema=pycytominer_table.schema).schema ) @@ -1083,8 +1097,8 @@ def test_sqlite_mixed_type_query_to_parquet( table=_sqlite_mixed_type_query_to_parquet( source_path=example_sqlite_mixed_types_database, table_name=table_name, - chunk_size=2, - offset=0, + page_key="col_integer", + pageset=(1, 2), sort_output=True, ), where=result_filepath, @@ -1106,10 +1120,11 @@ def test_sqlite_mixed_type_query_to_parquet( ] # check the values per column assert parquet.read_table(source=result_filepath).to_pydict() == { - "col_integer": [None, 1], - "col_text": ["sample", "sample"], - "col_blob": [b"another_blob", b"sample_blob"], - "col_real": [None, 0.5], + # note: we drop None / NA values due to pagination keys + "col_integer": [1], + "col_text": ["sample"], + "col_blob": [b"sample_blob"], + "col_real": [0.5], } # run full convert on mixed type database @@ -1119,6 +1134,7 @@ def test_sqlite_mixed_type_query_to_parquet( dest_datatype="parquet", source_datatype="sqlite", compartments=[table_name], + page_keys={"tbl_a": "col_integer"}, join=False, ) @@ -1126,16 +1142,10 @@ def test_sqlite_mixed_type_query_to_parquet( assert parquet.read_table( source=result["Tbl_a.sqlite"][0]["table"][0] ).to_pydict() == { - "Tbl_a_col_integer": [None, 1], - "Tbl_a_col_text": ["sample", "sample"], - "Tbl_a_col_blob": [b"another_blob", b"sample_blob"], - "Tbl_a_col_real": [None, 0.5], - "cytotable_meta_source_path": [ - f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", - f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", - ], - "cytotable_meta_offset": [0, 0], - "cytotable_meta_rownum": [2, 1], + "Tbl_a_col_integer": [1], + "Tbl_a_col_text": ["sample"], + "Tbl_a_col_blob": [b"sample_blob"], + "Tbl_a_col_real": [0.5], } @@ -1249,11 +1259,7 @@ def test_in_carta_to_parquet( # drop cytotable metadata columns for comparisons (example sources won't contain these) cytotable_result_table = cytotable_result_table.select( - [ - column - for column in cytotable_result_table.column_names - if column not in CYOTABLE_META_COLUMN_TYPES - ] + list(cytotable_result_table.column_names) ) # check the data against one another diff --git a/tests/test_convert_threaded.py b/tests/test_convert_threaded.py index baac08b..b513562 100644 --- a/tests/test_convert_threaded.py +++ b/tests/test_convert_threaded.py @@ -102,41 +102,16 @@ def test_convert_s3_path_sqlite_join( # sequential s3 SQLite files. See below for more information # https://cloudpathlib.drivendata.org/stable/caching/#automatically local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/2", - # note: we use a custom join to limit the - # data processing required within the context - # of GitHub Actions runner image resources. - joins=""" - SELECT - image.Image_TableNumber, - image.Metadata_ImageNumber, - image.Metadata_Plate, - image.Metadata_Well, - image.Image_Metadata_Site, - image.Image_Metadata_Row, - cytoplasm.* EXCLUDE (Metadata_ImageNumber), - cells.* EXCLUDE (Metadata_ImageNumber), - nuclei.* EXCLUDE (Metadata_ImageNumber) - FROM - (SELECT * FROM read_parquet('cytoplasm.parquet') LIMIT 5000) AS cytoplasm - LEFT JOIN (SELECT * FROM read_parquet('cells.parquet') LIMIT 5000) AS cells ON - cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN (SELECT * FROM read_parquet('nuclei.parquet') LIMIT 5000) AS nuclei ON - nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND nuclei.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei - LEFT JOIN (SELECT * FROM read_parquet('image.parquet') LIMIT 5000) AS image ON - image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - """, ) # read only the metadata from parquet file parquet_file_meta = parquet.ParquetFile(s3_result).metadata # check the shape of the data - assert (parquet_file_meta.num_rows, parquet_file_meta.num_columns) == (5000, 5928) + assert (parquet_file_meta.num_rows, parquet_file_meta.num_columns) == (74226, 5928) # check that dropping duplicates results in the same shape - assert pd.read_parquet(s3_result).drop_duplicates().shape == (5000, 5928) + assert pd.read_parquet(s3_result).drop_duplicates().shape == (74226, 5928) def test_get_source_filepaths( diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..5df0546 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,108 @@ +""" +Testing CytoTable utility functions found within util.py +""" + +import pytest + +from cytotable.utils import _generate_pagesets, _natural_sort + + +def test_generate_pageset(): # pylint: disable=too-many-statements + """ + Test the generate_pageset function with various scenarios. + """ + + # Test case with a single element + keys = [1] + chunk_size = 3 + expected = [(1, 1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case when chunk size is larger than the list + keys = [1, 2, 3] + chunk_size = 10 + expected = [(1, 3)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with all elements being the same + keys = [1, 1, 1, 1, 1] + chunk_size = 2 + expected = [(1, 1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with one duplicate of chunk size and others + keys = [1, 1, 1, 2, 3, 4] + chunk_size = 3 + expected = [(1, 1), (2, 4)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with a chunk size of one + keys = [1, 2, 3, 4, 5] + chunk_size = 1 + expected = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with no duplicates + keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + chunk_size = 3 + expected = [(1, 3), (4, 6), (7, 9), (10, 10)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with non-continuous keys + keys = [1, 3, 5, 7, 9, 12, 14] + chunk_size = 2 + expected = [(1, 3), (5, 7), (9, 12), (14, 14)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with inconsistent duplicates + keys = [1, 1, 3, 4, 5, 5, 8, 8, 8] + chunk_size = 3 + expected = [(1, 3), (4, 5), (8, 8)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Bigger test case with inconsistent duplicates + keys = [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 12, 12, 12] + chunk_size = 3 + expected = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (12, 12)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with no duplicates + keys = [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1] + chunk_size = 3 + expected = [(1.1, 3.3), (4.4, 6.6), (7.7, 9.9), (10.1, 10.1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with non-continuous float keys + keys = [1.1, 3.3, 5.5, 7.7, 9.9, 12.12, 14.14] + chunk_size = 2 + expected = [(1.1, 3.3), (5.5, 7.7), (9.9, 12.12), (14.14, 14.14)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with inconsistent duplicates + keys = [1.1, 1.1, 3.3, 4.4, 5.5, 5.5, 8.8, 8.8, 8.8] + chunk_size = 3 + expected = [(1.1, 3.3), (4.4, 5.5), (8.8, 8.8)] + assert _generate_pagesets(keys, chunk_size) == expected + + +@pytest.mark.parametrize( + "input_list, expected", + [ + ([], []), + (["a1"], ["a1"]), + (["a1", "a10", "a2", "a3"], ["a1", "a2", "a3", "a10"]), + (["1", "10", "2", "11", "21", "20"], ["1", "2", "10", "11", "20", "21"]), + (["b1", "a1", "b2", "a2"], ["a1", "a2", "b1", "b2"]), + (["apple1", "Apple10", "apple2"], ["Apple10", "apple1", "apple2"]), + (["a1", "A1", "a10", "A10"], ["A1", "A10", "a1", "a10"]), + ( + ["a-1", "a-10", "b-2", "B-1", "b-3", "a-2", "A-3"], + ["A-3", "B-1", "a-1", "a-2", "a-10", "b-2", "b-3"], + ), + ], +) +def test_natural_sort(input_list, expected): + """ + Tests for _natural_sort + """ + assert _natural_sort(input_list) == expected