Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version metadata to CytoTable Parquet output #134

Merged
merged 25 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
165ed3f
add version detection utility
d33bs Dec 18, 2023
4edfbb9
manage semver with poetry-dynamic-versioning
d33bs Dec 19, 2023
71735b2
comments to further describe what's happening
d33bs Dec 19, 2023
3dd6394
update github actions workflows and simplify
d33bs Dec 19, 2023
af31b07
remove version util and lint
d33bs Dec 19, 2023
7008d8a
update pre-commit check versions
d33bs Dec 19, 2023
325f4fa
add docs on semver for release publishing process
d33bs Dec 19, 2023
933ff19
move setup-poetry appropriately
d33bs Dec 19, 2023
a522559
correct action location
d33bs Dec 19, 2023
2feef8d
readd version getter util and test
d33bs Dec 19, 2023
1a26cdf
add metadata writer
d33bs Dec 19, 2023
419bf82
simplify metadata parquet write util
d33bs Dec 19, 2023
70f8ddb
add a test for _write_parquet_table_with_metadata
d33bs Dec 19, 2023
c503214
move to constants module for reuse capabilities
d33bs Dec 19, 2023
f936390
update convert with constants and new writer fxn
d33bs Dec 19, 2023
a41d697
add tool.setuptools_scm to avoid warnings
d33bs Dec 19, 2023
616d9fc
linting update
d33bs Dec 20, 2023
6420cd2
Merge remote-tracking branch 'upstream/main' into data-versioned-output
d33bs Dec 20, 2023
540904f
move dunamai to dev deps and update try block
d33bs Dec 20, 2023
8d136d9
Apply suggestions from code review
d33bs Dec 20, 2023
04a136b
add additional notes about release drafts
d33bs Dec 20, 2023
8fe4a41
linting
d33bs Dec 20, 2023
33d09b5
Merge remote-tracking branch 'upstream/main' into data-versioned-output
d33bs Dec 20, 2023
09f4a09
expand docs on kwargs
d33bs Dec 20, 2023
8552190
add colons to docstring
d33bs Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cytotable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__init__.py for cytotable
"""

# note: version data is maintained by poetry-dynamic-versioning
# note: version data is maintained by poetry-dynamic-versioning (do not edit)
__version__ = "0.0.0"

from .convert import convert
Expand Down
74 changes: 74 additions & 0 deletions cytotable/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
CytoTable: constants - storing various constants to be used throughout cytotable.
"""

import multiprocessing
import os
from typing import cast

from cytotable.utils import _get_cytotable_version

# read max threads from environment if necessary
# max threads will be used with default Parsl config and Duckdb
MAX_THREADS = (
multiprocessing.cpu_count()
if "CYTOTABLE_MAX_THREADS" not in os.environ
else int(cast(int, os.environ.get("CYTOTABLE_MAX_THREADS")))
)

# enables overriding default memory mapping behavior with pyarrow memory mapping
CYTOTABLE_ARROW_USE_MEMORY_MAPPING = (
os.environ.get("CYTOTABLE_ARROW_USE_MEMORY_MAPPING", "1") == "1"
)

DDB_DATA_TYPE_SYNONYMS = {
"real": ["float32", "float4", "float"],
"double": ["float64", "float8", "numeric", "decimal"],
"integer": ["int32", "int4", "int", "signed"],
"bigint": ["int64", "int8", "long"],
}

# A reference dictionary for SQLite affinity and storage class types
# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples
SQLITE_AFFINITY_DATA_TYPE_SYNONYMS = {
"integer": [
"int",
"integer",
"tinyint",
"smallint",
"mediumint",
"bigint",
"unsigned big int",
"int2",
"int8",
],
"text": [
"character",
"varchar",
"varying character",
"nchar",
"native character",
"nvarchar",
"text",
"clob",
],
"blob": ["blob"],
"real": [
"real",
"double",
"double precision",
"float",
],
"numeric": [
"numeric",
"decimal",
"boolean",
"date",
"datetime",
],
}

CYTOTABLE_DEFAULT_PARQUET_METADATA = {
"data-producer": "https://github.com/cytomining/CytoTable",
"data-producer-version": str(_get_cytotable_version()),
}
41 changes: 29 additions & 12 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ def _source_chunk_to_parquet(
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
_write_parquet_table_with_metadata,
)

# attempt to build dest_path
source_dest_path = (
Expand Down Expand Up @@ -339,7 +343,7 @@ def _source_chunk_to_parquet(
# read data with chunk size + offset
# and export to parquet
with _duckdb_reader() as ddb_reader:
parquet.write_table(
_write_parquet_table_with_metadata(
table=ddb_reader.execute(
f"""
{base_query}
Expand All @@ -358,7 +362,7 @@ def _source_chunk_to_parquet(
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
):
parquet.write_table(
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
# data for special cases where column and value types
# may not align (which is valid functionality in SQLite).
Expand Down Expand Up @@ -414,7 +418,8 @@ def _prepend_column_name(

import pyarrow.parquet as parquet

from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata

targets = tuple(metadata) + tuple(compartments)

Expand Down Expand Up @@ -499,7 +504,7 @@ def _prepend_column_name(
updated_column_names.append(column_name)

# perform table column name updates
parquet.write_table(
_write_parquet_table_with_metadata(
table=table.rename_columns(updated_column_names), where=table_path
)

Expand Down Expand Up @@ -569,8 +574,12 @@ def _concat_source_group(
import pyarrow as pa
import pyarrow.parquet as parquet

from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.exceptions import SchemaException
from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata

# build a result placeholder
concatted: List[Dict[str, Any]] = [
Expand Down Expand Up @@ -600,7 +609,9 @@ def _concat_source_group(
destination_path.parent.mkdir(parents=True, exist_ok=True)

# build the schema for concatenation writer
writer_schema = pa.schema(common_schema)
writer_schema = pa.schema(common_schema).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)

# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
Expand Down Expand Up @@ -713,7 +724,7 @@ def _join_source_chunk(

import pyarrow.parquet as parquet

from cytotable.utils import _duckdb_reader
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata

# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
Expand Down Expand Up @@ -757,7 +768,7 @@ def _join_source_chunk(
)

# write the result
parquet.write_table(
_write_parquet_table_with_metadata(
table=result,
where=result_file_path,
)
Expand Down Expand Up @@ -797,7 +808,11 @@ def _concat_join_sources(

import pyarrow.parquet as parquet

from cytotable.utils import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.utils import _write_parquet_table_with_metadata

# remove the unjoined concatted compartments to prepare final dest_path usage
# (we now have joined results)
Expand All @@ -811,7 +826,7 @@ def _concat_join_sources(
shutil.rmtree(path=dest_path)

# write the concatted result as a parquet file
parquet.write_table(
_write_parquet_table_with_metadata(
table=pa.concat_tables(
tables=[
parquet.read_table(
Expand All @@ -826,7 +841,9 @@ def _concat_join_sources(
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
writer_schema = parquet.read_schema(join_sources[0])
writer_schema = parquet.read_schema(join_sources[0]).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
with parquet.ParquetWriter(str(dest_path), writer_schema) as writer:
for table_path in join_sources:
writer.write_table(
Expand Down
127 changes: 63 additions & 64 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
"""

import logging
import multiprocessing
import os
import pathlib
from typing import Any, Dict, Union, cast
from typing import Any, Dict, Optional, Union, cast

import duckdb
import parsl
import pyarrow as pa
from cloudpathlib import AnyPath, CloudPath
from cloudpathlib.exceptions import InvalidPrefixError
from parsl.app.app import AppBase
Expand All @@ -19,67 +19,6 @@

logger = logging.getLogger(__name__)

# read max threads from environment if necessary
# max threads will be used with default Parsl config and Duckdb
MAX_THREADS = (
multiprocessing.cpu_count()
if "CYTOTABLE_MAX_THREADS" not in os.environ
else int(cast(int, os.environ.get("CYTOTABLE_MAX_THREADS")))
)

# enables overriding default memory mapping behavior with pyarrow memory mapping
CYTOTABLE_ARROW_USE_MEMORY_MAPPING = (
os.environ.get("CYTOTABLE_ARROW_USE_MEMORY_MAPPING", "1") == "1"
)

DDB_DATA_TYPE_SYNONYMS = {
"real": ["float32", "float4", "float"],
"double": ["float64", "float8", "numeric", "decimal"],
"integer": ["int32", "int4", "int", "signed"],
"bigint": ["int64", "int8", "long"],
}

# A reference dictionary for SQLite affinity and storage class types
# See more here: https://www.sqlite.org/datatype3.html#affinity_name_examples
SQLITE_AFFINITY_DATA_TYPE_SYNONYMS = {
"integer": [
"int",
"integer",
"tinyint",
"smallint",
"mediumint",
"bigint",
"unsigned big int",
"int2",
"int8",
],
"text": [
"character",
"varchar",
"varying character",
"nchar",
"native character",
"nvarchar",
"text",
"clob",
],
"blob": ["blob"],
"real": [
"real",
"double",
"double precision",
"float",
],
"numeric": [
"numeric",
"decimal",
"boolean",
"date",
"datetime",
],
}


# reference the original init
original_init = AppBase.__init__

Expand Down Expand Up @@ -198,6 +137,10 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
duckdb.DuckDBPyConnection
"""

import duckdb

from cytotable.constants import MAX_THREADS

return duckdb.connect().execute(
# note: we use an f-string here to
# dynamically configure threads as appropriate
Expand Down Expand Up @@ -252,8 +195,8 @@ def _sqlite_mixed_type_query_to_parquet(

import pyarrow as pa

from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS
from cytotable.exceptions import DatatypeException
from cytotable.utils import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS

# open sqlite3 connection
with sqlite3.connect(source_path) as conn:
Expand Down Expand Up @@ -384,6 +327,9 @@ def _arrow_type_cast_if_specified(
Dict[str, str]
A potentially data type updated dictionary of column information
"""

from cytotable.constants import DDB_DATA_TYPE_SYNONYMS

# for casting to new float type
if "float" in data_type_cast_map.keys() and column["column_dtype"] in [
"REAL",
Expand Down Expand Up @@ -453,3 +399,56 @@ def _expand_path(
modifed_path = modifed_path.expanduser()

return modifed_path.resolve()


def _get_cytotable_version() -> str:
"""
Seeks the current version of CytoTable using either pkg_resources
or dunamai to determine the current version being used.

Returns:
str
A string representing the version of CytoTable currently being used.
"""

try:
# attempt to gather the development version from dunamai
# for scenarios where cytotable from source is used.
import dunamai

return dunamai.Version.from_any_vcs().serialize()
except (RuntimeError, ModuleNotFoundError):
# else grab a static version from __init__.py
# for scenarios where the built/packaged cytotable is used.
import cytotable

return cytotable.__version__


def _write_parquet_table_with_metadata(table: pa.Table, **kwargs) -> None:
"""
Adds metadata to parquet output from CytoTable.
d33bs marked this conversation as resolved.
Show resolved Hide resolved
Note: this mostly wraps pyarrow.parquet.write_table
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html

Args:
table: pa.Table:
Pyarrow table to be serialized as parquet table.
**kwargs: Any:
kwargs provided to this function roughly align with
pyarrow.parquet.write_table. The following might be
examples of what to expect here:
- where: str or pyarrow.NativeFile
"""

from pyarrow import parquet

from cytotable.constants import CYTOTABLE_DEFAULT_PARQUET_METADATA
from cytotable.utils import _get_cytotable_version

parquet.write_table(
table=table.replace_schema_metadata(
metadata=CYTOTABLE_DEFAULT_PARQUET_METADATA
),
**kwargs,
)
4 changes: 3 additions & 1 deletion docs/source/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ CytoTable semvers are controlled through [`poetry-dynamic-versioning`](https://g
CytoTable release git tags are automatically applied through [GitHub Releases](https://docs.github.com/en/repositories/releasing-projects-on-github/about-releases) and related inferred changes from [`release-drafter`](https://github.com/release-drafter/release-drafter).

1. Open a pull request and use a repository label for `release-<semver release type>` to label the pull request for visibility with [`release-drafter`](https://github.com/release-drafter/release-drafter) (for example, see [CytoTable#108](https://github.com/cytomining/CytoTable/pull/108) as a reference of a semver patch update).
1. On merging the pull request for the release, a [GitHub Actions workflow](https://docs.github.com/en/actions/using-workflows) defined in `draft-release.yml` leveraging [`release-drafter`](https://github.com/release-drafter/release-drafter) will draft a release for maintainers to modify as needed (double checking on the prepared git tag and content).
1. On merging the pull request for the release, a [GitHub Actions workflow](https://docs.github.com/en/actions/using-workflows) defined in `draft-release.yml` leveraging [`release-drafter`](https://github.com/release-drafter/release-drafter) will draft a release for maintainers.
The draft GitHub release will include a version tag based on the GitHub PR label applied and `release-drafter`.
The draft release does not normally need additional modifications but may be changed as needed.
1. Make modifications as necessary to the draft GitHub release, then publish the release.
1. On publishing the release, another GitHub Actions workflow defined in `publish-pypi.yml` will run to build and deploy the Python package to PyPI (utilizing the earlier modified `pyproject.toml` semantic version reference for labeling the release).
Loading