diff --git a/.github/workflows/distribute_socrata_from_bytes.yml b/.github/workflows/distribute_socrata_from_bytes.yml index 7c582df23..720e78db5 100644 --- a/.github/workflows/distribute_socrata_from_bytes.yml +++ b/.github/workflows/distribute_socrata_from_bytes.yml @@ -1,5 +1,5 @@ -name: 📬 Distribute Socrata From Bytes -run-name: "📬 Distribute Socrata From Bytes: ${{ inputs.PRODUCT_NAME }}-${{ inputs.DATASET_VERSION }}-${{ inputs.DATASET }}-${{ inputs.DESTINATION_ID }}" +name: 📬 Distribute From Bytes +run-name: "📬 Distribute From Bytes: ${{ inputs.PRODUCT_NAME }}-${{ inputs.DATASET_VERSION }}-${{ inputs.DATASET }}-${{ inputs.DESTINATION_ID }}" on: workflow_dispatch: @@ -8,31 +8,40 @@ on: description: "Name of the product" type: string required: true - DATASET_VERSION: + PRODUCT_VERSION: description: "Version to push" type: string required: true - DESTINATION_ID: - description: "Destination ID (e.g. `colp_socrata`)" - type: string - required: true DATASET: - description: "Dataset to push (defaults to PRODUCT_NAME when omitted)" + description: "Dataset to push" type: string - required: false + required: true PUBLISH: description: "Publish the revision? (if not, will leave the revision open)" type: boolean default: false - required: true - IGNORE_VALIDATION_ERRORS: - description: "Ignore Validation Errors? (Will still perform validation, but will just log the outputs to the console)" + required: false + METADATA_ONLY: + description: "Distribute just the metadata?" type: boolean default: false + required: false + DESTINATION_ID_FILTER: + description: "Destination ID (e.g. `colp_socrata`)" + type: string + required: false + DESTINATION_TYPE_FILTER: + description: "Destination type (e.g. 'ftp')" + type: string + required: false SKIP_VALIDATION: description: "Skip validation altogether?" type: boolean default: false + IGNORE_VALIDATION_ERRORS: + description: "Ignore Validation Errors? (Will still perform validation, but will just log the outputs to the console)" + type: boolean + default: false jobs: publish: runs-on: ubuntu-22.04 @@ -43,9 +52,17 @@ jobs: image: nycplanning/build-base:latest env: PUBLISHING_BUCKET: edm-publishing + RECIPES_BUCKET: edm-recipes + PRODUCT_METADATA_REPO_PATH: product-metadata + _TYPER_STANDARD_TRACEBACK: 1 steps: - uses: actions/checkout@v4 + - uses: actions/checkout@v4 + with: + repository: NYCPlanning/product-metadata + path: product-metadata + - name: Load Secrets uses: 1password/load-secrets-action@v1 with: @@ -58,3 +75,20 @@ jobs: SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username" SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password" + + - name: Finish container setup + run: ./bash/docker_container_setup.sh + + - name: Distribute to Destinations + run: | + python3 -m dcpy.cli lifecycle scripts package_and_distribute \ + ${{ inputs.PRODUCT_NAME }} \ + ${{ inputs.PRODUCT_VERSION }} \ + ${{ inputs.DATASET }} \ + bytes \ + $(if [ ${{ inputs.DESTINATION_ID_FILTER }} != "" ]; then echo "-a ${{ inputs.DESTINATION_ID_FILTER }}"; fi) \ + $(if [ ${{ inputs.DESTINATION_TYPE_FILTER }} != "" ]; then echo "-e ${{ inputs.DESTINATION_TYPE_FILTER }}"; fi) \ + $(if [ ${{ inputs.SKIP_VALIDATION }} = "true" ]; then echo "-y"; fi) \ + $(if [ ${{ inputs.IGNORE_VALIDATION_ERRORS }} = "true" ]; then echo "-i"; fi) \ + $(if [ ${{ inputs.PUBLISH }} = "true" ]; then echo "-p"; fi) \ + $(if [ ${{ inputs.METADATA_ONLY }} = "true" ]; then echo "-m"; fi) diff --git a/.gitignore b/.gitignore index 4ecd31071..631541102 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Lifecycle artifacts .library/ .publishing/ -.package/ +.lifecycle/ output/ .output/ .data diff --git a/dcpy/connectors/ftp.py b/dcpy/connectors/ftp.py new file mode 100644 index 000000000..4c01943be --- /dev/null +++ b/dcpy/connectors/ftp.py @@ -0,0 +1,6 @@ +class FTPConnector: + def push(self, dest_path: str, ftp_profile: str): + raise Exception("Push not implemented for FTP") + + def pull(self, **kwargs): + raise Exception("Pull not implemented for FTP") diff --git a/dcpy/connectors/socrata/publish.py b/dcpy/connectors/socrata/publish.py index 7b84e4356..e387c84de 100644 --- a/dcpy/connectors/socrata/publish.py +++ b/dcpy/connectors/socrata/publish.py @@ -578,10 +578,10 @@ def __init__(self, metadata: md.Metadata, destination_id: str): def push_dataset( + *, metadata: md.Metadata, dataset_destination_id: str, dataset_package_path: Path, - *, publish: bool = False, metadata_only: bool = False, ): diff --git a/dcpy/lifecycle/__init__.py b/dcpy/lifecycle/__init__.py index fb0143c6d..7a23d6bd6 100644 --- a/dcpy/lifecycle/__init__.py +++ b/dcpy/lifecycle/__init__.py @@ -1,5 +1,7 @@ from pathlib import Path +BASE_PATH = Path(".lifecycle") + class WORKING_DIRECTORIES: - packaging = Path(".package") + packaging = BASE_PATH / Path("package") diff --git a/dcpy/lifecycle/distribute/__init__.py b/dcpy/lifecycle/distribute/__init__.py new file mode 100644 index 000000000..efcba0ab8 --- /dev/null +++ b/dcpy/lifecycle/distribute/__init__.py @@ -0,0 +1,44 @@ +from typing import Unpack + +from dcpy.lifecycle.distribute.connectors import ( + DistributionSFTPConnector, + SocrataPublishConnector, +) +from dcpy.models.lifecycle.distribute import ( + DatasetDestinationPushArgs, + DistributeResult, +) +from dcpy.models.connectors import ConnectorDispatcher + + +# Register all default connectors for `lifecycle.distribute`. +# Third parties can similarly register their own connectors, +# so long as the connector implements a ConnectorDispatcher protocol. +dispatcher = ConnectorDispatcher[DatasetDestinationPushArgs, dict]() + +dispatcher.register(conn_type="socrata", connector=SocrataPublishConnector()) +dispatcher.register(conn_type="sftp", connector=DistributionSFTPConnector()) + + +def to_dataset_destination( + **push_kwargs: Unpack[DatasetDestinationPushArgs], +) -> DistributeResult: + """Distribute a dataset and specific dataset_destination_id. + + Requires fully rendered template, ie there should be no template variables in the metadata + """ + ds_md = push_kwargs["metadata"] + dest = ds_md.get_destination(push_kwargs["dataset_destination_id"]) + dest_type = dest.type + + try: + result = dispatcher.push(dest_type, push_kwargs) + return DistributeResult.from_push_kwargs( + result=result, success=True, push_args=push_kwargs + ) + except Exception as e: + return DistributeResult.from_push_kwargs( + result=f"Error pushing {push_kwargs['metadata'].attributes.display_name} to dest_type: {dest_type}, destination: {dest.id}: {str(e)}", + success=False, + push_args=push_kwargs, + ) diff --git a/dcpy/lifecycle/distribute/_cli.py b/dcpy/lifecycle/distribute/_cli.py index 27999a302..f9699ee9d 100644 --- a/dcpy/lifecycle/distribute/_cli.py +++ b/dcpy/lifecycle/distribute/_cli.py @@ -1,7 +1,41 @@ import typer +from pathlib import Path -from dcpy.lifecycle.distribute import socrata +from dcpy.lifecycle import distribute +import dcpy.models.product.dataset.metadata as m app = typer.Typer() -app.add_typer(socrata.socrata_app, name="socrata") + +@app.command("from_local") +def _dist_from_local( + package_path: Path = typer.Argument(), + dataset_destination_id: str = typer.Argument(), + metadata_path: Path = typer.Option( + None, + "-m", + "--metadata-path", + help="(Optional) Metadata Path Override", + ), + publish: bool = typer.Option( + False, + "-p", + "--publish", + help="Publish the Revision? Or leave it open.", + ), + metadata_only: bool = typer.Option( + False, + "-z", + "--metadata-only", + help="Only push metadata (including attachments).", + ), +): + md = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml")) + result = distribute.to_dataset_destination( + metadata=md, + dataset_destination_id=dataset_destination_id, + publish=publish, + dataset_package_path=package_path, + metadata_only=metadata_only, + ) + print(result) diff --git a/dcpy/lifecycle/distribute/connectors.py b/dcpy/lifecycle/distribute/connectors.py new file mode 100644 index 000000000..24c3b5fd9 --- /dev/null +++ b/dcpy/lifecycle/distribute/connectors.py @@ -0,0 +1,42 @@ +from typing import Any + +from dcpy.connectors.ftp import FTPConnector +from dcpy.connectors.socrata import publish as socrata_pub +from dcpy.models.lifecycle.distribute import DatasetDestinationPushArgs + +# Sadly, can't use Unpack on kwarg generics yet. +# https://github.com/python/typing/issues/1399 + + +# Wrap the connectors to bind them to the `PublisherPushKwargs` +# so that we can register and delegate calls. +# This is the recommended way for third parties to add custom Distribution Connectors. +class DistributionSFTPConnector: + conn_type: str + + def __init__(self): + self.conn_type = "sftp" + self._base_connector = FTPConnector() + + def push(self, arg: DatasetDestinationPushArgs) -> Any: + md = arg["metadata"] + dest = md.get_destination(arg["dataset_destination_id"]) + dest_path = dest.custom["destination_path"] + user_id = dest.custom["user_id"] + self._base_connector.push(dest_path=dest_path, ftp_profile=user_id) + + def pull(self, _: dict) -> Any: + raise Exception("Pull is not defined for any Distribution Connectors.") + + +class SocrataPublishConnector: + conn_type = "socrata" + + def push( + self, + arg: DatasetDestinationPushArgs, + ) -> Any: + return socrata_pub.push_dataset(**arg) + + def pull(self, _: dict): + raise Exception("Pull not implemented for Socrata Connector") diff --git a/dcpy/lifecycle/distribute/socrata.py b/dcpy/lifecycle/distribute/socrata.py deleted file mode 100644 index eb13512b8..000000000 --- a/dcpy/lifecycle/distribute/socrata.py +++ /dev/null @@ -1,298 +0,0 @@ -from pathlib import Path -import typer -from typing import TypedDict, Unpack, NotRequired, Required - -import dcpy.models.product.dataset.metadata as m -from dcpy.utils.logging import logger -import dcpy.connectors.edm.packaging as packaging -import dcpy.connectors.socrata.publish as soc_pub - - -class PublishKwargs(TypedDict): - metadata_path: NotRequired[Path] - publish: Required[bool] - ignore_validation_errors: Required[bool] - skip_validation: Required[bool] - metadata_only: Required[bool] - - -def dist_from_local( - package_path: Path, - dataset_destination_id: str, - *, - metadata_path: Path | None = None, - publish: bool = False, - ignore_validation_errors: bool = False, - skip_validation: bool = False, - metadata_only: bool = False, -) -> str: - """Distribute a dataset and specific dataset_destination_id. - - Requires fully rendered template, ie there should be no template variables in the metadata - """ - md = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml")) - validation_errors = md.validate_consistency() - if validation_errors: - logger.error( - f"The metadata file contains inconsistencies that must be fixed before pushing: {str(validation_errors)}" - ) - return str(validation_errors) - - dest = md.get_destination(dataset_destination_id) - assert dest.type == "socrata" - - if not (skip_validation or metadata_only): - logger.info("Validating package") - # validation = v.validate_package(package_path, md) - # errors = validation.get_dataset_errors() - errors: list[str] = [] # TODO. Not really used, so not urgent - - if len(errors) > 0: - if ignore_validation_errors: - logger.warn("Errors Found! But continuing to distribute") - # validation.pretty_print_errors() - else: - error_msg = "Errors Found! Aborting distribute" - logger.error(error_msg) - # validation.pretty_print_errors() - raise Exception(error_msg) - - try: - return soc_pub.push_dataset( - metadata=md, - dataset_destination_id=dataset_destination_id, - dataset_package_path=package_path, - publish=bool(publish), - metadata_only=bool(metadata_only), - ) - except Exception as e: - return f"Error pushing {md.attributes.display_name}, destination: {dest.id}: {str(e)}" - - -def dist_from_local_all_socrata( - package_path: Path, - **pub_kwargs: Unpack[PublishKwargs], -): - """Distributes all Socrata destinations within a given metadata""" - md = m.Metadata.from_path(package_path / "metadata.yml") - local_pub_kwargs = pub_kwargs.copy() - local_pub_kwargs.pop( - "metadata_path" - ) if "metadata_path" in local_pub_kwargs else None - - socrata_dests = [d.id for d in md.destinations if d.type == "socrata"] - logger.info(f"Distributing {md.attributes.display_name}: {socrata_dests}") - results = [ - dist_from_local( - package_path=package_path, - dataset_destination_id=dataset_destination_id, - **local_pub_kwargs, - ) - for dataset_destination_id in socrata_dests - ] - return results - - -def dist_from_local_product_all_socrata( - product_path: Path, - **pub_kwargs: Unpack[PublishKwargs], -): - """Distribute datasets for an entire product.""" - results = [] - for p in product_path.iterdir(): - if p.is_dir(): - md_path = p / "metadata.yml" - if md_path.exists(): - results.append( - dist_from_local_all_socrata(package_path=Path(p), **pub_kwargs) - ) - return results - - -socrata_app = typer.Typer() - - -@socrata_app.command("from_local") -def _dist_from_local( - package_path: Path = typer.Argument(), - dataset_destination_id: str = typer.Argument(), - metadata_path: Path = typer.Option( - None, - "-m", - "--metadata-path", - help="(Optional) Metadata Path Override", - ), - publish: bool = typer.Option( - False, - "-p", - "--publish", - help="Publish the Socrata Revision? Or leave it open.", - ), - ignore_validation_errors: bool = typer.Option( - False, - "-i", - "--ignore-validation-errors", - help="Ignore Validation Errors? Will still perform validation, but ignore errors, allowing a push", - ), - skip_validation: bool = typer.Option( - False, - "-y", # -y(olo) - "--skip-validation", - help="Skip Validation Altogether", - ), - metadata_only: bool = typer.Option( - False, - "-z", - "--metadata-only", - help="Only push metadata (including attachments).", - ), -): - result = dist_from_local( - package_path=package_path, - dataset_destination_id=dataset_destination_id, - metadata_path=metadata_path, - publish=publish, - ignore_validation_errors=ignore_validation_errors, - skip_validation=skip_validation, - metadata_only=metadata_only, - ) - print(result) - - -@socrata_app.command("local_all_datasets") -def _dist_from_local_all_socrata( - package_path: Path = typer.Argument(), - publish: bool = typer.Option( - False, - "-p", - "--publish", - help="Publish the Socrata Revision? Or leave it open.", - ), - ignore_validation_errors: bool = typer.Option( - False, - "-i", - "--ignore-validation-errors", - help="Ignore Validation Errors? Will still perform validation, but ignore errors, allowing a push", - ), - skip_validation: bool = typer.Option( - False, - "-y", # -y(olo) - "--skip-validation", - help="Skip Validation Altogether", - ), - metadata_only: bool = typer.Option( - False, - "-z", - "--metadata-only", - help="Only push metadata (including attachments).", - ), -): - results = dist_from_local_all_socrata( - package_path=package_path, - publish=publish, - ignore_validation_errors=ignore_validation_errors, - skip_validation=skip_validation, - metadata_only=metadata_only, - ) - print(results) - - -@socrata_app.command("local_all_product") -def _cli_dist_product_from_local_all_socrata( - product_path: Path = typer.Argument(), - publish: bool = typer.Option( - False, - "-p", - "--publish", - help="Publish the Socrata Revision? Or leave it open.", - ), - ignore_validation_errors: bool = typer.Option( - False, - "-i", - "--ignore-validation-errors", - help="Ignore Validation Errors? Will still perform validation, but ignore errors, allowing a push", - ), - skip_validation: bool = typer.Option( - False, - "-y", # -y(olo) - "--skip-validation", - help="Skip Validation Altogether", - ), - metadata_only: bool = typer.Option( - False, - "-z", - "--metadata-only", - help="Only push metadata (including attachments).", - ), -): - results = dist_from_local_product_all_socrata( - product_path=product_path, - publish=publish, - ignore_validation_errors=ignore_validation_errors, - skip_validation=skip_validation, - metadata_only=metadata_only, - ) - print(results) - - -@socrata_app.command("from_s3") -def _dist_from_s3( - product_name: str, - version: str, - dataset_destination_id: str, - dataset: str = typer.Option( - None, - "-d", - "--dataset", - help="(optional) dataset. Defaults to product name", - ), - metadata_path: Path = typer.Option( - None, - "-m", - "--metadata-path", - help="(Optional) Metadata Path", - ), - publish: bool = typer.Option( - False, - "-p", - "--publish", - help="Publish the Socrata Revision? Or leave it open.", - ), - ignore_validation_errors: bool = typer.Option( - False, - "-i", - "--ignore-validation-errors", - help="Ignore Validation Errors? Will still perform validation, but ignore errors, allowing a push", - ), - skip_validation: bool = typer.Option( - False, - "-y", # -y(olo) - "--skip-validation", - help="Skip Validation Altogether", - ), - metadata_only: bool = typer.Option( - False, - "-z", - "--metadata-only", - help="Only push metadata (including attachments).", - ), -): - logger.info( - f"Distributing {product_name}-{version} to {dataset_destination_id}. Publishing: {publish}. Ignoring Validation Errors: {ignore_validation_errors}" - ) - logger.info(f"Downloading dataset package for {product_name}-{version}") - - package_path = packaging.pull( - packaging.DatasetPackageKey(product_name, version, dataset or product_name) - ) - - result = dist_from_local( - package_path=package_path, - dataset_destination_id=dataset_destination_id, - metadata_path=metadata_path, - publish=publish, - ignore_validation_errors=ignore_validation_errors, - skip_validation=skip_validation, - metadata_only=metadata_only, - ) - print(result) diff --git a/dcpy/lifecycle/package/__init__.py b/dcpy/lifecycle/package/__init__.py index 1bcdb4253..170338d67 100644 --- a/dcpy/lifecycle/package/__init__.py +++ b/dcpy/lifecycle/package/__init__.py @@ -1,3 +1,16 @@ from pathlib import Path RESOURCES_PATH = Path(__file__).parent / "resources" + +from .assemble import assemble_package, pull_destination_package_files +from .validate import validate as validate_package +from dcpy.lifecycle import WORKING_DIRECTORIES + +ASSEMBLY_DIR = WORKING_DIRECTORIES.packaging / "assembly" + +__all__ = [ + "assemble_package", + "ASSEMBLY_DIR", + "pull_destination_package_files", + "validate_package", +] diff --git a/dcpy/lifecycle/package/assemble.py b/dcpy/lifecycle/package/assemble.py index f164e7684..1ca2a3c91 100644 --- a/dcpy/lifecycle/package/assemble.py +++ b/dcpy/lifecycle/package/assemble.py @@ -8,7 +8,6 @@ from dcpy.configuration import PRODUCT_METADATA_REPO_PATH from dcpy.lifecycle import WORKING_DIRECTORIES from dcpy.lifecycle.package import xlsx_writer -from dcpy.lifecycle.package import assemble import dcpy.models.product.dataset.metadata as md import dcpy.models.product.metadata as prod_md from dcpy.utils.logging import logger @@ -55,7 +54,7 @@ def unzip_into_package( Rationale: A product like Pluto will often zip assets (a csv, a readme) into a zip. This allows us to reverse that process, to construct packages straight from what's - zipped up on Bytes. + zipped up on a destination. """ package = product_metadata.get_package(package_id) file_ids_to_paths = _file_id_to_zipped_paths(product_metadata, package_id) @@ -90,10 +89,6 @@ def unzip_into_package( ) -BYTES_DEST_TYPE = "bytes" -NON_BYTES_DEST_ERROR = "Cannot distribute to non-bytes destination types" - - def _get_file_url_mappings_by_id( product_metadata: md.Metadata, destination_id: str ) -> dict[str, dict]: @@ -131,10 +126,8 @@ def pull_destination_files( ): """Pull all files for a given destination.""" dest = product_metadata.get_destination(destination_id) - if dest.type != BYTES_DEST_TYPE: - raise Exception(f"{NON_BYTES_DEST_ERROR}. Found: {dest.type}") - logger.info(f"Pulling BYTES package for {destination_id}") + logger.info(f"Pulling package from {destination_id}") ids_to_paths_and_dests = _get_file_url_mappings_by_id( product_metadata=product_metadata, destination_id=destination_id ) @@ -166,9 +159,16 @@ def pull_destination_files( ) -def pull_all_destination_files(local_package_path: Path, product_metadata: md.Metadata): - """Pull all files for all BYTES destinations in the metadata.""" - dests = [d for d in product_metadata.destinations if d.type == BYTES_DEST_TYPE] +def pull_destination_package_files( + *, + source_destination_id: str, + local_package_path: Path, + product_metadata: md.Metadata, +): + """Pull all files for a destination.""" + dests = [ + d for d in product_metadata.destinations if d.type == source_destination_id + ] for d in dests: pull_destination_files( local_package_path, product_metadata, d.id, unpackage_zips=True @@ -185,7 +185,7 @@ def pull_all_destination_files(local_package_path: Path, product_metadata: md.Me METADATA_OVERRIDE_KEY = "with_metadata_from" -def assemble_dataset_from_bytes( +def assemble_package( *, org_md: prod_md.OrgMetadata, product: str, @@ -196,10 +196,12 @@ def assemble_dataset_from_bytes( metadata_only: bool = False, ) -> Path: out_path = out_path or ASSEMBLY_DIR / product / version / dataset - logger.info(f"Assembling dataset from BYTES. Writing to: {out_path}") + logger.info( + f"Assembling dataset from {source_destination_id}. Writing to: {out_path}" + ) dataset_metadata = org_md.product(product).dataset(dataset) - assemble.pull_destination_files( + pull_destination_files( out_path, dataset_metadata, source_destination_id, @@ -229,8 +231,8 @@ def assemble_dataset_from_bytes( app = typer.Typer() -@app.command("assemble_from_bytes") -def assemble_dataset_from_bytes_cli( +@app.command("assemble_from_source") +def assemble_dataset_cli( product: str, version: str, org_metadata_path: Path = typer.Option( @@ -258,18 +260,19 @@ def assemble_dataset_from_bytes_cli( help="Only Assemble Metadata.", ), source_destination_id: str = typer.Option( - BYTES_DEST_TYPE, "--source-destination-id", "-s", help="The Destination which acts as a source for this assembly", ), ): + assert source_destination_id, ( + f"A {source_destination_id} is required to pull files." + ) dataset_name = dataset or product org_md = prod_md.OrgMetadata.from_path( org_metadata_path, template_vars={"version": version} ) - - assemble_dataset_from_bytes( + assemble_package( org_md=org_md, product=product, dataset=dataset_name, @@ -281,9 +284,10 @@ def assemble_dataset_from_bytes_cli( @app.command("pull_dataset") -def _dataset_from_bytes_cli( +def _pull_dataset_cli( product: str, version: str, + source_destination_id: str, dataset: str = typer.Option( None, "--dataset", @@ -303,6 +307,8 @@ def _dataset_from_bytes_cli( ) out_dir = ASSEMBLY_DIR / product / version / dataset dataset_metadata = org_md.product(product).dataset(dataset) - pull_all_destination_files( - local_package_path=out_dir, product_metadata=dataset_metadata + pull_destination_package_files( + source_destination_id=source_destination_id, + local_package_path=out_dir, + product_metadata=dataset_metadata, ) diff --git a/dcpy/lifecycle/package/validate.py b/dcpy/lifecycle/package/validate.py index aa85009ac..ad0f8cc56 100644 --- a/dcpy/lifecycle/package/validate.py +++ b/dcpy/lifecycle/package/validate.py @@ -1,12 +1,11 @@ -import ast from dataclasses import dataclass, field from enum import Enum import geopandas as gpd import pandas as pd from pathlib import Path -import pprint import re from shapely import wkb, wkt +from tabulate import tabulate # type: ignore import typer import dcpy.models.product.dataset.metadata as dataset_md @@ -47,24 +46,36 @@ class PackageValidation: file_validations: list[DatasetFileValidation] errors: list[ValidationError] - def pretty_print_errors(self): + def has_errors(self): + return bool(self.errors or any([e.errors for e in self.file_validations])) + + def make_errors_table(self): + return tabulate( + self.get_errors_list(), + headers=["location", "type", "message"], + tablefmt="presto", + ) + + def get_errors_list(self) -> list[tuple]: + errors = [] for file_validation in self.file_validations: for fe in file_validation.errors: - pprint.pp( - [ + errors.append( + ( file_validation.dataset_file_id, fe.error_type.value, fe.message, - ] + ) ) for error in self.errors: - pprint.pp( - [ + errors.append( + ( "package", error.error_type.value, error.message, - ] + ) ) + return errors def _is_valid_wkb(g): @@ -252,8 +263,11 @@ def validate_shapefile( def validate_package_files( - package_path: Path, metadata: dataset_md.Metadata + package_path: Path, metadata_override: dataset_md.Metadata | None ) -> list[DatasetFileValidation]: + metadata = metadata_override or dataset_md.Metadata.from_path( + package_path / "metadata.yml" + ) dataset_files_path = ( package_path / "dataset_files" ) # TODO: wrong place for this calculation. Move it. @@ -323,17 +337,17 @@ def validate_package_files( return file_validations -def validate_package_from_path( +def validate( package_path: Path, - metadata_override_path: Path | None = None, - metadata_args: dict | None = None, -) -> list[DatasetFileValidation]: + overridden_dataset_metadata: dataset_md.Metadata | None = None, +) -> PackageValidation: logger.info(f"Validating package at {package_path}") - metadata = dataset_md.Metadata.from_path( - metadata_override_path or (package_path / "metadata.yml"), - template_vars=metadata_args, + return PackageValidation( + file_validations=validate_package_files( + package_path=package_path, metadata_override=overridden_dataset_metadata + ), + errors=[], ) - return validate_package_files(package_path=package_path, metadata=metadata) app = typer.Typer() @@ -342,18 +356,19 @@ def validate_package_from_path( @app.command() def _validate( package_path: Path, - metadata_path: Path = typer.Option( + metadata_path_override: Path = typer.Option( None, "-m", "--metadata-path", help="(Optional) Metadata Path" ), - metadata_args: str = typer.Option(default={}, callback=ast.literal_eval), ): - validations = validate_package_from_path( + validation = validate( package_path, - metadata_path, - metadata_args=metadata_args, # type: ignore + overridden_dataset_metadata=dataset_md.Metadata.from_path( + metadata_path_override + ) + if metadata_path_override + else None, ) - for f in validations: - for e in f.errors: - print( - f"{f.dataset_file_id[0:20].ljust(20)} | {e.error_type.value[0:20].ljust(20)} | {e.message}" - ) + if validation.has_errors(): + logger.error(f"Package validation failed for {package_path}") + raise Exception(validation.make_errors_table()) + logger.info(f"Package at {package_path} completed without errors") diff --git a/dcpy/lifecycle/scripts/_cli.py b/dcpy/lifecycle/scripts/_cli.py index a6dd0b67d..a709ba9b7 100644 --- a/dcpy/lifecycle/scripts/_cli.py +++ b/dcpy/lifecycle/scripts/_cli.py @@ -1,12 +1,12 @@ import typer -from .package_and_distribute import app as package_dist_app +from .package_and_distribute import package_and_distribute_cli from .product_metadata import app as product_metadata_app from .ingest_with_library_fallback import run as ingest_or_library_archive from .validate_ingest import app as ingest_validation_app app = typer.Typer() -app.add_typer(package_dist_app, name="package_and_dist") app.add_typer(product_metadata_app, name="product_metadata") app.add_typer(ingest_validation_app, name="validate_ingest") +app.command(name="package_and_distribute")(package_and_distribute_cli) app.command(name="ingest_or_library_archive")(ingest_or_library_archive) diff --git a/dcpy/lifecycle/scripts/package_and_distribute.py b/dcpy/lifecycle/scripts/package_and_distribute.py index 6ff9c300e..aaa01ee1e 100644 --- a/dcpy/lifecycle/scripts/package_and_distribute.py +++ b/dcpy/lifecycle/scripts/package_and_distribute.py @@ -1,80 +1,109 @@ from pathlib import Path +from tabulate import tabulate # type: ignore import typer -from typing import Unpack - -from dcpy.configuration import PRODUCT_METADATA_REPO_PATH +from dcpy import configuration from dcpy.models.product import metadata as product_metadata -from dcpy.lifecycle.distribute import socrata as soc_dist -from dcpy.lifecycle.package import assemble +from dcpy.models.lifecycle.distribute import DatasetDestinationFilters +from dcpy.models.lifecycle.validate import ValidationArgs +from dcpy.lifecycle import package +from dcpy.lifecycle import distribute from dcpy.utils.logging import logger -def package_and_dist_from_bytes( - org_metadata_path: Path, +# Future thought: this type of glue function shouldn't be necessary if +# we can string together lifecycle stages a little more declaratively +def package_and_distribute( + org_md: product_metadata.OrgMetadata, product: str, + dataset: str, version: str, - destination_tag: str, - destination_id: str | None, - datasets: set[str], - destination_type: str, - **publish_kwargs: Unpack[soc_dist.PublishKwargs], + *, + source_destination_id: str, + publish: bool = False, + metadata_only: bool = False, + validation_conf: ValidationArgs = {}, + destination_filters: DatasetDestinationFilters = {}, ): - logger.info( - f"Packaging and Distributing with filters: tag={destination_tag}, datasets: {datasets}, destination_type: {destination_type} " - ) - """Package tagged datsets from bytes, and distribute to Socrata.""" - org_md = product_metadata.OrgMetadata.from_path( - path=org_metadata_path, - template_vars={"version": version}, - ) + """Package tagged datasets from the source, and distribute to specified destinations.""" + + logger.info(f"Packaging and Distributing with filters: {destination_filters}") + + destination_filters["datasets"] = frozenset({dataset}) product_md = org_md.product(product) - dests = product_md.query_destinations( - tag=destination_tag, - datasets=set(datasets), - destination_type=destination_type, - destination_id=destination_id, + dataset_md = product_md.dataset(dataset) + destinations = product_md.query_destinations(**destination_filters)[dataset] + logger.info(f"Target destinations are {list(destinations.keys())}") + package_path = package.ASSEMBLY_DIR / product / version / dataset + + package.pull_destination_package_files( + local_package_path=package_path, + source_destination_id=source_destination_id, + product_metadata=dataset_md, ) - logger.info(f"Packaging {product_md.metadata.id}. Datasets: {list(dests.keys())}") - package_paths = {} - for ds_id, dests_to_mds in dests.items(): - out_path = assemble.assemble_dataset_from_bytes( - org_md=org_md, - product=product, - dataset=ds_id, - version=version, - source_destination_id="bytes", - metadata_only=publish_kwargs["metadata_only"], - ) - package_paths[ds_id] = out_path + package.assemble_package( + org_md=org_md, + product=product, + dataset=dataset, + version=version, + source_destination_id=source_destination_id, + metadata_only=metadata_only, + ) - logger.info("\nFinished Packaging. Beginning Distribution.") - results = [] - for ds_id, dests_to_mds in dests.items(): - package_path = package_paths[ds_id] + # validate + if not (validation_conf.get("skip_validation") or metadata_only): + package_validations = package.validate_package(package_path=package_path) + if package_validations.get_errors_list(): + if validation_conf.get("ignore_validation_errors"): + logger.warning("Package Errors Found! But continuing distribute") + logger.warning(package_validations.make_errors_table()) + else: + logger.error("Errors Found! Aborting distribute") + logger.error(package_validations.make_errors_table()) + raise Exception(package_validations.make_errors_table()) + else: + logger.info("\nFinished Packaging. Beginning Distribution.") - for dest, _ in dests_to_mds.items(): - logger.info(f"Distributing {ds_id}: {dest} from {package_path}") - result = soc_dist.dist_from_local(package_path, dest, **publish_kwargs) - results.append(result) - return results + # distribute + # TODO: pull this logic into package.distribute + distribute_results: list[distribute.DistributeResult] = [] + for dest_id, dest_ds_md in destinations.items(): + logger.info(f"Distributing {dataset}-{dest_id} from {package_path}") + result = distribute.to_dataset_destination( + metadata=dest_ds_md, + dataset_destination_id=dest_id, + publish=publish, + dataset_package_path=package_path, + metadata_only=metadata_only, + ) + distribute_results.append(result) + stringified_results = tabulate( + [ + [r.dataset_id, r.destination_id, r.destination_type, r.success, r.result] + for r in distribute_results + ], + headers=["dataset", "destination_id", "destination type", "success?", "result"], + tablefmt="presto", + maxcolwidths=[20, 10, 10, 10, 50], + ) -app = typer.Typer() + if any([not r.success for r in distribute_results]): + logger.error("Distribution Finished, but issues occurred!") + logger.error(stringified_results) + raise Exception("Distribution errors occurred. See the logs for details.") + logger.info("Distribution Finished") + logger.info(stringified_results) -@app.command("from_bytes_to_socrata") -def from_bytes_to_tagged_socrata_cli( + +def package_and_distribute_cli( product: str, version: str, + dataset: str, + source_destination_id: str, # Filters - datasets: list[str] = typer.Option( - None, - "-d", - "--datasets", - help="list of dataset names to include.", - ), destination_tag: str = typer.Option( None, "-t", @@ -94,12 +123,6 @@ def from_bytes_to_tagged_socrata_cli( help="Destination type for filter for. e.g. 'Socrata'", ), # Overrides - org_metadata_path: Path = typer.Option( - PRODUCT_METADATA_REPO_PATH, - "-o", - "--metadata-path", - help="Path to metadata repo. Optionally, set in your env.", - ), publish: bool = typer.Option( False, "-p", @@ -125,18 +148,26 @@ def from_bytes_to_tagged_socrata_cli( help="Only push metadata (including attachments).", ), ): - results = package_and_dist_from_bytes( - org_metadata_path, + assert configuration.PRODUCT_METADATA_REPO_PATH + org_md = product_metadata.OrgMetadata.from_path( + path=Path(configuration.PRODUCT_METADATA_REPO_PATH), + template_vars={"version": version}, + ) + package_and_distribute( + org_md, product, + dataset, version, - datasets=set(datasets or []), - destination_id=destination_id, - destination_type=destination_type, - destination_tag=destination_tag, + source_destination_id=source_destination_id, publish=publish, - ignore_validation_errors=ignore_validation_errors, - skip_validation=skip_validation, metadata_only=metadata_only, + destination_filters={ + "destination_id": destination_id, + "destination_type": destination_type, + "destination_tag": destination_tag, + }, + validation_conf={ + "ignore_validation_errors": ignore_validation_errors, + "skip_validation": skip_validation, + }, ) - for r in results: - print(r) diff --git a/dcpy/models/connectors/__init__.py b/dcpy/models/connectors/__init__.py new file mode 100644 index 000000000..6df970f35 --- /dev/null +++ b/dcpy/models/connectors/__init__.py @@ -0,0 +1,37 @@ +from typing import Protocol, Any, TypeVar, Generic + + +_O = TypeVar("_O", contravariant=True) +_I = TypeVar("_I", contravariant=True) + + +class _ConnectorProtocol(Protocol, Generic[_O, _I]): + conn_type: str + + def push(self, arg: _O, /) -> Any: + """push""" + + def pull(self, arg: _I, /) -> Any: + """pull""" + + +class ConnectorDispatcher(Generic[_O, _I]): + _connectors: dict[str, _ConnectorProtocol[_O, _I]] + + def __init__(self): + self._connectors = {} + + def register(self, conn_type: str, connector: _ConnectorProtocol[_O, _I]): + self._connectors[conn_type] = connector + + def push(self, dest_type: str, arg: _O) -> str: + if dest_type not in self._connectors: + raise Exception(f"No connector registered for {dest_type}") + connector: _ConnectorProtocol = self._connectors[dest_type] + return connector.push(arg) + + def pull(self, source_type: str, arg: _I) -> str: + if source_type not in self._connectors: + raise Exception(f"No connector registered for {source_type}") + connector: _ConnectorProtocol = self._connectors[source_type] + return connector.pull(arg) diff --git a/dcpy/models/lifecycle/distribute.py b/dcpy/models/lifecycle/distribute.py new file mode 100644 index 000000000..dc1fb3f5a --- /dev/null +++ b/dcpy/models/lifecycle/distribute.py @@ -0,0 +1,46 @@ +from dataclasses import dataclass +from typing import TypedDict, NotRequired, Required +from pathlib import Path + +import dcpy.models.product.dataset.metadata as ds_md + + +class DatasetDestinationFilters(TypedDict): + datasets: NotRequired[frozenset[str]] + destination_tag: NotRequired[str] + destination_id: NotRequired[str] + destination_type: NotRequired[str] + + +class DatasetDestinationPushArgs(TypedDict): + metadata: Required[ds_md.Metadata] + dataset_destination_id: Required[str] + publish: NotRequired[bool] + dataset_package_path: NotRequired[Path] + metadata_only: NotRequired[bool] + + +@dataclass +class DistributeResult: + dataset_id: str + destination_id: str + destination_type: str + + result: str | None = None + success: bool | None = None + + @staticmethod + def from_push_kwargs( + result: str, success: bool, push_args: DatasetDestinationPushArgs + ): + """Convenience static factory to create the this class from push args, + since we'll probably have them handy whenever instantiating this class.""" + ds_md = push_args["metadata"] + dest = ds_md.get_destination(push_args["dataset_destination_id"]) + return DistributeResult( + result=result, + success=success, + dataset_id=ds_md.id, + destination_type=dest.type, + destination_id=dest.id, + ) diff --git a/dcpy/models/lifecycle/validate.py b/dcpy/models/lifecycle/validate.py new file mode 100644 index 000000000..1b2fcfe58 --- /dev/null +++ b/dcpy/models/lifecycle/validate.py @@ -0,0 +1,6 @@ +from typing import TypedDict, NotRequired + + +class ValidationArgs(TypedDict): + ignore_validation_errors: NotRequired[bool] + skip_validation: NotRequired[bool] diff --git a/dcpy/models/product/metadata.py b/dcpy/models/product/metadata.py index 7703ef7cd..861a4b394 100644 --- a/dcpy/models/product/metadata.py +++ b/dcpy/models/product/metadata.py @@ -87,10 +87,10 @@ def get_datasets_by_id(self) -> dict[str, DatasetMetadata]: def query_destinations( self, *, - datasets: set[str] | None = None, + datasets: frozenset[str] | None = None, destination_id: str | None = None, destination_type: str | None = None, - tag: str | None = None, + destination_tag: str | None = None, ) -> dict[str, dict[str, DatasetMetadata]]: """Retrieve a map[map] of dataset->destination->DatasetMetadata filtered by - destination type. (e.g. Socrata) @@ -101,13 +101,14 @@ def query_destinations( """ filtered_datasets = self.get_datasets_by_id() found_dests: dict[str, dict[str, DatasetMetadata]] = defaultdict(dict) + for ds in filtered_datasets.values(): for dest in ds.destinations: if ( (not destination_type or dest.type == destination_type) and (not destination_id or dest.id == destination_id) and (not datasets or ds.id in datasets) - and (not tag or tag in dest.tags) + and (not destination_tag or destination_tag in dest.tags) ): found_dests[ds.id][dest.id] = ds return found_dests diff --git a/dcpy/test/connectors/test_sftp.py b/dcpy/test/connectors/test_sftp.py new file mode 100644 index 000000000..2fc5c6a2d --- /dev/null +++ b/dcpy/test/connectors/test_sftp.py @@ -0,0 +1,10 @@ +import pytest +import dcpy.models.product.dataset.metadata as md + +# TODO use test/connectors/socrata/ as inspiration for this test +# TODO differentiate between integration and unit tests when mocking and SFTP server for this + + +@pytest.mark.skip(reason="SFTP not implemented") +def test_distribute_sftp(metadata: md.Metadata): + assert False diff --git a/dcpy/test/lifecycle/distribute/test_connector_dispatch.py b/dcpy/test/lifecycle/distribute/test_connector_dispatch.py new file mode 100644 index 000000000..000d90884 --- /dev/null +++ b/dcpy/test/lifecycle/distribute/test_connector_dispatch.py @@ -0,0 +1,65 @@ +from pathlib import Path +import pytest +from typing import Any + +from dcpy.models.lifecycle.distribute import DatasetDestinationPushArgs +from dcpy.models.product import metadata as md +from dcpy.lifecycle.distribute import dispatcher + + +@pytest.fixture +def org_metadata(resources_path: Path): + # TODO: refactor away, into conftest maybe + template_vars = { + "version": "24c", + "lion_prod_level_pub_freq": "monthly", + "pseudo_lots_pub_freq": "monthly", + "agency": "fake_agency", + } + return md.OrgMetadata.from_path( + resources_path / "test_product_metadata_repo", template_vars=template_vars + ) + + +@pytest.fixture +def COLP_PACKAGE_PATH(resources_path: Path): + return resources_path / "product_metadata" / "colp_single_feature_package" + + +SNOWFLAKE_CONNECTOR_TYPE = "snowflake" + + +class MockSnowflakeConnector: + conn_type: str + + def __init__(self): + self.conn_type = SNOWFLAKE_CONNECTOR_TYPE + self.push_counter = 0 + + def push( + self, + thing: DatasetDestinationPushArgs, + ) -> Any: + print(thing) + self.push_counter += 1 + + def pull(self, arg: dict) -> Any: + raise Exception("Pull not implemented for Socrata Connector") + + +def test_dynamic_dispatch(org_metadata: md.OrgMetadata): + snowflake_connector = MockSnowflakeConnector() + dispatcher.register( + conn_type=SNOWFLAKE_CONNECTOR_TYPE, connector=snowflake_connector + ) + dispatch_details: DatasetDestinationPushArgs = { + "metadata": org_metadata.product("lion").dataset("pseudo_lots"), + "dataset_destination_id": "garlic_sftp", + } + assert snowflake_connector.push_counter == 0 + + dispatcher.push(SNOWFLAKE_CONNECTOR_TYPE, dispatch_details) + + assert snowflake_connector.push_counter == 1, ( + "The mock snowflake connector should have been called." + ) diff --git a/dcpy/test/lifecycle/distribute/test_socrata.py b/dcpy/test/lifecycle/distribute/test_socrata.py deleted file mode 100644 index 7b33347a9..000000000 --- a/dcpy/test/lifecycle/distribute/test_socrata.py +++ /dev/null @@ -1,32 +0,0 @@ -from pathlib import Path -import pytest -from unittest.mock import patch - -import dcpy.models.product.dataset.metadata as md -import dcpy.lifecycle.distribute.socrata as soc_dist - - -@pytest.fixture -def package_path(resources_path: Path): - return resources_path / "product_metadata" / "assembled_package_and_metadata" - - -@pytest.fixture -def metadata(package_path: Path): - return md.Metadata.from_path(package_path / "metadata.yml") - - -@patch("dcpy.connectors.socrata.publish.push_dataset") -def test_dist_local_product_all_socrata(pub_mock, package_path, metadata): - """Test that the Socrata push connector is invoked on each dataset destination in - the metadata file. - """ - soc_dist.dist_from_local_product_all_socrata(package_path.parent) - - actual_destinations = [ - c.kwargs["dataset_destination_id"] for c in pub_mock.mock_calls - ] - assert len(actual_destinations) == len(pub_mock.mock_calls) - - expected_destinations = [d.id for d in metadata.destinations if d.type == "socrata"] - assert expected_destinations == actual_destinations diff --git a/dcpy/test/lifecycle/package/test_assemble_from_bytes.py b/dcpy/test/lifecycle/package/test_assemble_from_bytes.py index e2b2d6cb0..2eb865141 100644 --- a/dcpy/test/lifecycle/package/test_assemble_from_bytes.py +++ b/dcpy/test/lifecycle/package/test_assemble_from_bytes.py @@ -1,5 +1,4 @@ from pathlib import Path -import pytest from unittest.mock import patch, call from dcpy.lifecycle.package import assemble @@ -97,13 +96,6 @@ def test_plan(): ) -def test_plan_errors_for_socrata(): - with pytest.raises(Exception, match=assemble.NON_BYTES_DEST_ERROR): - _plan = assemble.pull_destination_files( - Path(""), make_metadata(), SOCRATA_DEST_ID - ) - - @patch("dcpy.lifecycle.package.assemble.unzip_into_package") @patch("urllib.request.urlretrieve") def test_pull_destination_files(mock_urlretrieve, mock_unpackage, tmp_path): diff --git a/dcpy/test/lifecycle/package/test_packages_validation.py b/dcpy/test/lifecycle/package/test_packages_validation.py index a1b739388..973cc23b5 100644 --- a/dcpy/test/lifecycle/package/test_packages_validation.py +++ b/dcpy/test/lifecycle/package/test_packages_validation.py @@ -1,7 +1,7 @@ from pathlib import Path import pytest -from dcpy.lifecycle.package import validate +from dcpy.lifecycle import package import dcpy.models.product.dataset.metadata as md @@ -21,14 +21,14 @@ def colp_metadata(COLP_PACKAGE_PATH): def test_colp_single_feature_package(colp_metadata, COLP_PACKAGE_PATH): - validation = validate.validate_package_from_path( - COLP_PACKAGE_PATH, metadata_args={"version": COLP_VERSION} + validation = package.validate_package(COLP_PACKAGE_PATH) + + non_metadata_files = [f for f in colp_metadata.files if not f.file.is_metadata] + assert len(non_metadata_files) == len(validation.file_validations), ( + "There should be a validation for each dataset file" ) - assert len([f for f in colp_metadata.files if not f.file.is_metadata]) == len( - validation - ), "There should be a validation for each dataset file" - errors = sum([v.errors for v in validation], []) - assert 0 == len(errors), "No Errors should have been found" + + assert not validation.has_errors(), "No Errors should have been found" def test_missing_attachments(colp_metadata, COLP_PACKAGE_PATH): @@ -42,13 +42,14 @@ def test_missing_attachments(colp_metadata, COLP_PACKAGE_PATH): ) ) - validations = validate.validate_package_files(COLP_PACKAGE_PATH, colp_metadata) - errors = sum([v.errors for v in validations], []) + validation = package.validate_package(COLP_PACKAGE_PATH, colp_metadata) + assert validation.has_errors(), "the Package validation should have errors" - assert len(errors) == 1, ( - f"An error should have been found for the missing attachment. Found: {errors}" + files_with_errors = [v for v in validation.file_validations if v.errors] + assert len(files_with_errors) == 1, ( + f"An single error should have been found for the missing attachment. Validation: {validation}" ) - assert fake_attachment_id in errors[0].message, ( + assert fake_attachment_id in files_with_errors[0].errors[0].message, ( "The error message should mention the missing package file." ) diff --git a/dcpy/test/models/product/test_metadata.py b/dcpy/test/models/product/test_metadata.py index f39ececc9..490fba3d9 100644 --- a/dcpy/test/models/product/test_metadata.py +++ b/dcpy/test/models/product/test_metadata.py @@ -70,7 +70,7 @@ def test_get_tagged_destinations(lion_md_path: Path): product_folder = md.ProductMetadata.from_path(root_path=lion_md_path) TAG = "school_districts_tag" - datasets = product_folder.query_destinations(tag=TAG) + datasets = product_folder.query_destinations(destination_tag=TAG) assert 1 == len(datasets.keys()) assert "school_districts" in datasets @@ -82,9 +82,9 @@ def test_query_multiple_filters_destinations(lion_md_path: Path): TAG = "prod_tag" DEST_TYPE = "socrata" - DATASET_NAMES = {"pseudo_lots", "school_districts"} + DATASET_NAMES = frozenset({"pseudo_lots", "school_districts"}) datasets = product_folder.query_destinations( - tag=TAG, destination_type=DEST_TYPE, datasets=DATASET_NAMES + destination_tag=TAG, destination_type=DEST_TYPE, datasets=DATASET_NAMES ) assert DATASET_NAMES == datasets.keys(), "The correct datasets should be returned" diff --git a/dcpy/test/models/test_connectors.py b/dcpy/test/models/test_connectors.py new file mode 100644 index 000000000..11695d1fc --- /dev/null +++ b/dcpy/test/models/test_connectors.py @@ -0,0 +1,123 @@ +from typing import Any +from dataclasses import dataclass + +from dcpy.models.connectors import ConnectorDispatcher + + +# Define Some Animals +@dataclass +class Animal: + age: int + + +@dataclass +class Dog(Animal): + name: str + + def bark(self): + print(f"bark {self.name}") + + +class Beagle(Dog): + pass + + +@dataclass +class Cat(Animal): + remaining_lives: int = 9 + + def glare(self): + pass + + +# Define Some Connectors to handle those animals +@dataclass +class AnimalConn: + conn_type: str + + def push(self, _: Animal) -> Any: + """push""" + + def pull(self, _: Animal) -> Any: + """pull""" + + +@dataclass +class DogConn: + conn_type: str + + def push(self, dog: Dog) -> Any: + dog.bark() + + def pull(self, _: Dog) -> Any: + """pull""" + + +@dataclass +class BeagleConn: + conn_type: str + + def push(self, dog: Beagle) -> Any: + dog.bark() + + def pull(self, _: Beagle) -> Any: + """pull""" + + +@dataclass +class CatConn: + conn_type: str + + def push(self, cat: Cat) -> Any: + cat.glare() + + def pull(self, _: Cat) -> Any: + """pull""" + + +def test_dog_contravariance(): + dog_dispatcher = ConnectorDispatcher[Dog, Dog]() + + # This is fine! Callables for Animal can also handle a Dog. + dog_dispatcher.register(conn_type="animal", connector=AnimalConn("animal")) + # Also fine. Obviously dogs are dogs. + dog_dispatcher.register(conn_type="dog", connector=DogConn("dog")) + + def things_that_fail_the_type_checker_if_you_remove_the_type_ignore(): + """Chucking these in a function to 1) not execute them, 2) indent them""" + + # mypy won't allow this because the CatConn might be passed a dog, and dogs can't meow(), for example + dog_dispatcher.register(conn_type="cat", connector=CatConn("Soxx")) # type: ignore + + # This one is less obvious. Due to contravariance, subclasses of Dog are not allowed + # for the dog_dispatcher, which takes `Dog` types for the push and pull method. + # This is somewhat counterintuitive, but allowing this in generics breaks type-safety + dog_dispatcher.register(conn_type="beagle", connector=BeagleConn("snoopy")) # type: ignore + + ### On to the dispatching + + # This is fine! Any function that can handle an Animal can handle Dog + dog_dispatcher.push("animal", Dog(age=4, name="rufus")) + + def more_things_that_would_fail_the_type_checker(): + # Fails for obvious reasons + dog_dispatcher.push("cat", Cat(age=4)) # type: ignore + + # This would execute just fine, but given then dynamic nature of dispatch + # mypy can't be sure that an Animal won't be passed to a DogConnector + dog_dispatcher.push("animal", Animal(age=4)) # type: ignore + + # This would actually break though, after we called .bark() on the Animal. + dog_dispatcher.push("dog", Animal(age=4)) # type: ignore + + # This should actually work just fine, but in the current implementation is prohibited. + # There may be a way to play with the type bounds to allow it. But + # I don't think it matters. + dog_dispatcher.push("dog", Beagle(age=4)) # type: ignore + + +# Example of covariance. Works fine and type-checker is happy. +# def run(animal: Animal): +# pass + +# run(Beagle(age=4, name="snoopy")) diff --git a/dcpy/test/resources/test_product_metadata_repo/products/colp/colp/metadata.yml b/dcpy/test/resources/test_product_metadata_repo/products/colp/colp/metadata.yml new file mode 100644 index 000000000..b57c2c77c --- /dev/null +++ b/dcpy/test/resources/test_product_metadata_repo/products/colp/colp/metadata.yml @@ -0,0 +1,376 @@ +id: db-colp + +attributes: + description: Raw Description + display_name: City Owned and Leased Property (COLP) + each_row_is_a: City Owned Property + tags: + - a + - b + +assembly: + - id: csv_package + filename: csv_package.zip + type: Zip + contents: + - id: primary_csv + filename: colp_{{ version }}.csv + - id: colp_readme + filename: colp_readme.pdf + - id: colp_metadata + filename: colp_metadata.pdf + +custom: {} + +destinations: + - id: garlic_sftp + type: sftp + custom: + destination_path: somewhere + user_id: someone + - id: socrata_prod + type: socrata + files: + - id: colp_readme + custom: + destination_use: attachment + - id: primary_shapefile + dataset_overrides: + attributes: + description: Socrata Prod Shapefile Description Override + custom: + destination_use: dataset_file + custom: + four_four: fn4k-qyk2 + - id: socrata_unparsed + type: socrata + files: + - id: colp_readme.pdf + custom: + destination_use: attachment + - id: primary_shapefile + custom: + destination_use: dataset_file + custom: + four_four: fn4k-abcd + is_unparsed_dataset: true + - id: bytes + type: bytes + files: + - id: csv_package + custom: + url: + https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/colp_{{ + version }}_csv.zip + - id: colp_readme + custom: + url: https://s-media.nyc.gov/agencies/dcp/assets/files/pdf/data-tools/bytes/colp_readme.pdf + +files: + - file: + id: primary_shapefile + filename: colp_single_feature_shp.zip + type: shapefile + custom: + ignore_validation: + - agreement + - dcpedited + - finalcom + dataset_overrides: + attributes: + description: description overridden at the dataset_file level + display_name: display_name overridden at the dataset_file level + overridden_columns: + - id: geom + name: geometry + data_type: geometry + - id: BBL + data_type: decimal + - id: MAPBBL + data_type: decimal + - file: + id: primary_csv + filename: colp_single_feature.csv + type: csv + dataset_overrides: + omitted_columns: + - geom + - file: + id: secondary_csv + filename: colp_single_feature_secondary.csv + type: csv + dataset_overrides: + omitted_columns: + - geom + - file: + id: colp_readme + filename: colp_readme.pdf + is_metadata: true + type: None + - file: + id: colp_metadata + filename: colp_metadata.pdf + is_metadata: true + type: None + +columns: + - id: uid + name: uid + data_type: text + data_source: Department of City Planning + checks: + is_primary_key: true + example: cbe20732be28f6ab445289d7a67bb241 + - id: borough + name: BOROUGH + data_type: text + description: + NYC borough - 1 (Manhattan), 2 (Bronx), 3 (Brooklyn), 4 (Queens), 5 + (Staten Island) + checks: + non_nullable: true + example: None + values: + - value: "1" + description: Manhattan + - value: "2" + description: Bronx + - value: "3" + description: Brooklyn + - value: "4" + description: Queens + - value: "5" + description: Staten Island + - id: tax_block + name: BLOCK + data_type: integer + description: + The tax block in which the tax lot is located. Each tax block is unique + within a borough. + checks: + non_nullable: true + example: "1637" + - id: tax_lot + name: LOT + data_type: integer + description: The number of the tax lot. Each tax lot is unique within a tax block. + checks: + non_nullable: true + example: "141" + - id: bbl + name: BBL + data_type: number # Data imported incorrectly for these in the shapefile + checks: + non_nullable: true + example: "1016370141" + custom: + readme_data_type: double + - id: mapbbl + name: MAPBBL + data_type: number # Data imported incorrectly for these in the shapefile + data_source: Department of City Planning - Geosupport + example: "1016370141" + custom: + readme_data_type: double + - id: cd + name: CD + data_type: integer + data_source: Department of City Planning + example: "111" + - id: hnum + name: HNUM + data_type: text + description: House number + example: "1955" + - id: sname + name: SNAME + data_type: text + description: Name of the street + example: Third Avenue + - id: address + name: ADDRESS + data_type: text + description: House number and street name + example: 1955 Third Avenue + - id: parcelname + name: PARCELNAME + data_type: text + example: AGUILAR BRANCH LIBRARY + - id: agency + name: AGENCY + data_type: text + example: NYPL + - id: usecode + name: USECODE + data_type: text + description: + The use code indicates how the lot is being used by the agency. See + Appendix B for a complete list of use codes and descriptions. + example: "218" + - id: usetype + name: USETYPE + data_type: text + description: + Description of how the lot is being used by the agency. See Appendix + B for a complete list of use codes and descriptions. + example: BRANCH LIBRARY + - id: ownership + name: OWNERSHIP + data_type: text + description: Type of owner + checks: + non_nullable: true + example: None + values: + - value: C + description: City owned + - value: M + description: Mixed ownership + - value: P + description: Private + - value: O + description: + Other/public authority (includes properties owned by federal and + state entities) + - id: category + name: CATEGORY + data_type: integer + description: + Category classifies lots as non-residential properties with a current + use, residential properties, or properties without a current use. + data_source: Department of City Planning + checks: + non_nullable: true + example: None + values: + - value: "1" + description: Non-residential properties with a current use + - value: "2" + description: Residential properties + - value: "3" + description: Properties with no current use + - id: expandcat + name: EXPANDCAT + data_type: integer + description: + This categorization classifies records into broad groups based on use. + Valid values are 1 - 9. + data_source: Department of City Planning + checks: + non_nullable: true + example: None + values: + - value: "1" + description: Office use + - value: "2" + description: Educational use + - value: "3" + description: Cultural & recreational use + - value: "4" + description: Public safety & criminal justice use + - value: "5" + description: Health & social service use + - value: "6" + description: Leased out to a private tenant + - value: "7" + description: Maintenance + custom: + other_details: "['storage & infrastructure']" + - value: "8" + description: Property with no use + - value: "9" + description: Property with a residential used + - id: excatdesc + name: EXCATDESC + data_type: text + description: + Descriptions for the expanded category values. See EXPANDCAT for the + domain values. + data_source: Department of City Planning + example: None + - id: leased + name: LEASED + data_type: text + description: + A value of "L" indicates that the agency's use of the property is authorized + through a lease. For questions about the lease or ownership status of specific + lots, please contact DCAS at (212) 386-0622 or RESPlanning311@dcas.nyc.gov. + example: None + values: + - value: L + description: Leased + - id: finalcom + name: FINALCOM + data_type: text + description: A value of "D" indicates potential disposition by the City. + example: None + values: + - value: D + description: Potential Disposition + - id: agreement + name: AGREEMENT + data_type: text + description: + For City-owned properties that are leased to another entity, this field + indicates whether the agreement is short-term, long-term, or there are both short- + and long-term agreements present. + example: None + values: + - value: S + description: Short-term + - value: L + description: Long-term + - value: M + description: Mixed (there are both short- and long-term agreements on the property) + - id: xcoord + name: XCOORD + data_type: integer + description: + X coordinate based on the Geosupport label point for the billing BBL. + Coordinate system is NAD 1983 State Plane New York Long Island FIPS 3104 Feet. + data_source: Department of City Planning + example: "999900" + - id: ycoord + name: YCOORD + data_type: integer + description: + Y coordinate based on the Geosupport label point for the billing BBL. + Coordinate system is NAD 1983 State Plane New York Long Island FIPS 3104 Feet. + data_source: Department of City Planning + example: "228619" + - id: latitude + name: LATITUDE + data_type: decimal + description: + Latitude based on the Geosupport label point for the billing BBL. Coordinate + system is NAD_1983. + data_source: Department of City Planning + example: "40.794169" + - id: longitude + name: LONGITUDE + data_type: decimal + description: + Longitude based on the Geosupport label point for the billing BBL. + Coordinate system is NAD_1983. + data_source: Department of City Planning + example: "-73.943479" + - id: dcpedited + name: DCPEDITED + data_type: text + description: + City Planning modifies some records to correct street names or normalize + parcel names when programmatic cleaning is insufficient. If a field has been manually + modified, the original value can be found on GitHub in the modifications_applied.csv + available in Outputs file series. + data_source: Department of City Planning + example: None + values: + - value: Y + description: "True" + - id: geom + name: Geometry + data_type: geometry + description: Point geometry type + example: None + custom: + readme_data_type: geometry diff --git a/dcpy/test/resources/test_product_metadata_repo/products/colp/metadata.yml b/dcpy/test/resources/test_product_metadata_repo/products/colp/metadata.yml new file mode 100644 index 000000000..8d087d4e9 --- /dev/null +++ b/dcpy/test/resources/test_product_metadata_repo/products/colp/metadata.yml @@ -0,0 +1,4 @@ +id: colp + +datasets: +- colp diff --git a/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml b/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml index bb16ee568..2127cde3d 100644 --- a/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml +++ b/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml @@ -21,3 +21,5 @@ destinations: - id: socrata type: socrata tags: [prod_tag, pseudo_lots_tag] + - id: garlic_sftp + type: sftp