Skip to content

Commit

Permalink
Add version metadata to CytoTable Parquet output (#134)
Browse files Browse the repository at this point in the history
* add version detection utility

* manage semver with poetry-dynamic-versioning

* comments to further describe what's happening

* update github actions workflows and simplify

* remove version util and lint

* update pre-commit check versions

* add docs on semver for release publishing process

* move setup-poetry appropriately

* correct action location

* readd version getter util and test

* add metadata writer

* simplify metadata parquet write util

* add a test for _write_parquet_table_with_metadata

* move to constants module for reuse capabilities

* update convert with constants and new writer fxn

* add tool.setuptools_scm to avoid warnings

* linting update

* move dunamai to dev deps and update try block

* Apply suggestions from code review

Co-authored-by: Gregory Way <[email protected]>

* add additional notes about release drafts

* linting

* expand docs on kwargs

* add colons to docstring

---------

Co-authored-by: Gregory Way <[email protected]>
  • Loading branch information
d33bs and gwaybio authored Dec 20, 2023
1 parent 99b8e1f commit 9781519
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 82 deletions.
2 changes: 1 addition & 1 deletion cytotable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__init__.py for cytotable
"""

# note: version data is maintained by poetry-dynamic-versioning
# note: version data is maintained by poetry-dynamic-versioning (do not edit)
__version__ = "0.0.0"

from .convert import convert
Expand Down
74 changes: 74 additions & 0 deletions cytotable/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
CytoTable: constants - storing various constants to be used throughout cytotable.
"""

import multiprocessing
import os
from typing import cast

from cytotable.utils import _get_cytotable_version

# read max threads from environment if necessary
# max threads will be used with default Parsl config and Duckdb
MAX_THREADS = (
multiprocessing.cpu_count()
if "CYTOTABLE_MAX_THREADS" not in os.environ
else int(cast(int, os.environ.get("CYTOTABLE_MAX_THREADS")))
)

# enables overriding default memory mapping behavior with pyarrow memory mapping
CYTOTABLE_ARROW_USE_MEMORY_MAPPING = (
os.environ.get("CYTOTABLE_ARROW_USE_MEMORY_MAPPING", "1") == "1"
)

DDB_DATA_TYPE_SYNONYMS = {
"real": ["float32", "float4", "float"],
"double": ["float64", "float8", "numeric", "decimal"],
"integer": ["int32", "int4", "int", "signed"],
"bigint": ["int64", "int8", "long"],
}

# A reference dictionary for SQLite affinity and storage class types
# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples
SQLITE_AFFINITY_DATA_TYPE_SYNONYMS = {
"integer": [
"int",
"integer",
"tinyint",
"smallint",
"mediumint",
"bigint",
"unsigned big int",
"int2",
"int8",
],
"text": [
"character",
"varchar",
"varying character",
"nchar",
"native character",
"nvarchar",
"text",
"clob",
],
"blob": ["blob"],
"real": [
"real",
"double",
"double precision",
"float",
],
"numeric": [
"numeric",
"decimal",
"boolean",
"date",
"datetime",
],
}

CYTOTABLE_DEFAULT_PARQUET_METADATA = {
"data-producer": "https://github.com/cytomining/CytoTable",
"data-producer-version": str(_get_cytotable_version()),
}
41 changes: 29 additions & 12 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ def _source_chunk_to_parquet(
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
_write_parquet_table_with_metadata,
)

# attempt to build dest_path
source_dest_path = (
Expand Down Expand Up @@ -339,7 +343,7 @@ def _source_chunk_to_parquet(
# read data with chunk size + offset
# and export to parquet
with _duckdb_reader() as ddb_reader:
parquet.write_table(
_write_parquet_table_with_metadata(
table=ddb_reader.execute(
f"""
{base_query}
Expand All @@ -358,7 +362,7 @@ def _source_chunk_to_parquet(
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
parquet.write_table(
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
# data for special cases where column and value types
# may not align (which is valid functionality in SQLite).
Expand Down Expand Up @@ -414,7 +418,8 @@ def _prepend_column_name(

import pyarrow.parquet as parquet

from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata

targets = tuple(metadata) + tuple(compartments)

Expand Down Expand Up @@ -499,7 +504,7 @@ def _prepend_column_name(
updated_column_names.append(column_name)

# perform table column name updates
parquet.write_table(
_write_parquet_table_with_metadata(
table=table.rename_columns(updated_column_names), where=table_path
)

Expand Down Expand Up @@ -569,8 +574,12 @@ def _concat_source_group(
import pyarrow as pa
import pyarrow.parquet as parquet

from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.exceptions import SchemaException
from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata

# build a result placeholder
concatted: List[Dict[str, Any]] = [
Expand Down Expand Up @@ -600,7 +609,9 @@ def _concat_source_group(
destination_path.parent.mkdir(parents=True, exist_ok=True)

# build the schema for concatenation writer
writer_schema = pa.schema(common_schema)
writer_schema = pa.schema(common_schema).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)

# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
Expand Down Expand Up @@ -713,7 +724,7 @@ def _join_source_chunk(

import pyarrow.parquet as parquet

from cytotable.utils import _duckdb_reader
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata

# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
Expand Down Expand Up @@ -757,7 +768,7 @@ def _join_source_chunk(
)

# write the result
parquet.write_table(
_write_parquet_table_with_metadata(
table=result,
where=result_file_path,
)
Expand Down Expand Up @@ -797,7 +808,11 @@ def _concat_join_sources(

import pyarrow.parquet as parquet

from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.utils import _write_parquet_table_with_metadata

# remove the unjoined concatted compartments to prepare final dest_path usage
# (we now have joined results)
Expand All @@ -811,7 +826,7 @@ def _concat_join_sources(
shutil.rmtree(path=dest_path)

# write the concatted result as a parquet file
parquet.write_table(
_write_parquet_table_with_metadata(
table=pa.concat_tables(
tables=[
parquet.read_table(
Expand All @@ -826,7 +841,9 @@ def _concat_join_sources(
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
writer_schema = parquet.read_schema(join_sources[0])
writer_schema = parquet.read_schema(join_sources[0]).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
with parquet.ParquetWriter(str(dest_path), writer_schema) as writer:
for table_path in join_sources:
writer.write_table(
Expand Down
127 changes: 63 additions & 64 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"""

import logging
import multiprocessing
import os
import pathlib
from typing import Any, Dict, Union, cast
from typing import Any, Dict, Optional, Union, cast

import duckdb
import parsl
import pyarrow as pa
from cloudpathlib import AnyPath, CloudPath
from cloudpathlib.exceptions import InvalidPrefixError
from parsl.app.app import AppBase
Expand All @@ -19,67 +19,6 @@

logger = logging.getLogger(__name__)

# read max threads from environment if necessary
# max threads will be used with default Parsl config and Duckdb
MAX_THREADS = (
multiprocessing.cpu_count()
if "CYTOTABLE_MAX_THREADS" not in os.environ
else int(cast(int, os.environ.get("CYTOTABLE_MAX_THREADS")))
)

# enables overriding default memory mapping behavior with pyarrow memory mapping
CYTOTABLE_ARROW_USE_MEMORY_MAPPING = (
os.environ.get("CYTOTABLE_ARROW_USE_MEMORY_MAPPING", "1") == "1"
)

DDB_DATA_TYPE_SYNONYMS = {
"real": ["float32", "float4", "float"],
"double": ["float64", "float8", "numeric", "decimal"],
"integer": ["int32", "int4", "int", "signed"],
"bigint": ["int64", "int8", "long"],
}

# A reference dictionary for SQLite affinity and storage class types
# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples
SQLITE_AFFINITY_DATA_TYPE_SYNONYMS = {
"integer": [
"int",
"integer",
"tinyint",
"smallint",
"mediumint",
"bigint",
"unsigned big int",
"int2",
"int8",
],
"text": [
"character",
"varchar",
"varying character",
"nchar",
"native character",
"nvarchar",
"text",
"clob",
],
"blob": ["blob"],
"real": [
"real",
"double",
"double precision",
"float",
],
"numeric": [
"numeric",
"decimal",
"boolean",
"date",
"datetime",
],
}


# reference the original init
original_init = AppBase.__init__

Expand Down Expand Up @@ -198,6 +137,10 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
duckdb.DuckDBPyConnection
"""

import duckdb

from cytotable.constants import MAX_THREADS

return duckdb.connect().execute(
# note: we use an f-string here to
# dynamically configure threads as appropriate
Expand Down Expand Up @@ -252,8 +195,8 @@ def _sqlite_mixed_type_query_to_parquet(

import pyarrow as pa

from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS
from cytotable.exceptions import DatatypeException
from cytotable.utils import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
Expand Down Expand Up @@ -384,6 +327,9 @@ def _arrow_type_cast_if_specified(
Dict[str, str]
A potentially data type updated dictionary of column information
"""

from cytotable.constants import DDB_DATA_TYPE_SYNONYMS

# for casting to new float type
if "float" in data_type_cast_map.keys() and column["column_dtype"] in [
"REAL",
Expand Down Expand Up @@ -453,3 +399,56 @@ def _expand_path(
modifed_path = modifed_path.expanduser()

return modifed_path.resolve()


def _get_cytotable_version() -> str:
"""
Seeks the current version of CytoTable using either pkg_resources
or dunamai to determine the current version being used.
Returns:
str
A string representing the version of CytoTable currently being used.
"""

try:
# attempt to gather the development version from dunamai
# for scenarios where cytotable from source is used.
import dunamai

return dunamai.Version.from_any_vcs().serialize()
except (RuntimeError, ModuleNotFoundError):
# else grab a static version from __init__.py
# for scenarios where the built/packaged cytotable is used.
import cytotable

return cytotable.__version__


def _write_parquet_table_with_metadata(table: pa.Table, **kwargs) -> None:
"""
Adds metadata to parquet output from CytoTable.
Note: this mostly wraps pyarrow.parquet.write_table
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html
Args:
table: pa.Table:
Pyarrow table to be serialized as parquet table.
**kwargs: Any:
kwargs provided to this function roughly align with
pyarrow.parquet.write_table. The following might be
examples of what to expect here:
- where: str or pyarrow.NativeFile
"""

from pyarrow import parquet

from cytotable.constants import CYTOTABLE_DEFAULT_PARQUET_METADATA
from cytotable.utils import _get_cytotable_version

parquet.write_table(
table=table.replace_schema_metadata(
metadata=CYTOTABLE_DEFAULT_PARQUET_METADATA
),
**kwargs,
)
4 changes: 3 additions & 1 deletion docs/source/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ CytoTable semvers are controlled through [`poetry-dynamic-versioning`](https://g
CytoTable release git tags are automatically applied through [GitHub Releases](https://docs.github.com/en/repositories/releasing-projects-on-github/about-releases) and related inferred changes from [`release-drafter`](https://github.com/release-drafter/release-drafter).

1. Open a pull request and use a repository label for `release-<semver release type>` to label the pull request for visibility with [`release-drafter`](https://github.com/release-drafter/release-drafter) (for example, see [CytoTable#108](https://github.com/cytomining/CytoTable/pull/108) as a reference of a semver patch update).
1. On merging the pull request for the release, a [GitHub Actions workflow](https://docs.github.com/en/actions/using-workflows) defined in `draft-release.yml` leveraging [`release-drafter`](https://github.com/release-drafter/release-drafter) will draft a release for maintainers to modify as needed (double checking on the prepared git tag and content).
1. On merging the pull request for the release, a [GitHub Actions workflow](https://docs.github.com/en/actions/using-workflows) defined in `draft-release.yml` leveraging [`release-drafter`](https://github.com/release-drafter/release-drafter) will draft a release for maintainers.
The draft GitHub release will include a version tag based on the GitHub PR label applied and `release-drafter`.
The draft release does not normally need additional modifications but may be changed as needed.
1. Make modifications as necessary to the draft GitHub release, then publish the release.
1. On publishing the release, another GitHub Actions workflow defined in `publish-pypi.yml` will run to build and deploy the Python package to PyPI (utilizing the earlier modified `pyproject.toml` semantic version reference for labeling the release).
Loading

0 comments on commit 9781519

Please sign in to comment.