diff --git a/dcpy/connectors/ftp.py b/dcpy/connectors/ftp.py new file mode 100644 index 000000000..4c01943be --- /dev/null +++ b/dcpy/connectors/ftp.py @@ -0,0 +1,6 @@ +class FTPConnector: + def push(self, dest_path: str, ftp_profile: str): + raise Exception("Push not implemented for FTP") + + def pull(self, **kwargs): + raise Exception("Pull not implemented for FTP") diff --git a/dcpy/connectors/socrata/publish.py b/dcpy/connectors/socrata/publish.py index cbf768464..c3cadedfa 100644 --- a/dcpy/connectors/socrata/publish.py +++ b/dcpy/connectors/socrata/publish.py @@ -19,9 +19,11 @@ from socrata.revisions import Revision as SocrataPyRevision import time from typing import TypedDict, Literal, NotRequired, Any +from typing import Unpack from dcpy.utils.logging import logger +from dcpy.models.lifecycle.distribution import PublisherPushKwargs import dcpy.models.product.dataset.metadata as md from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request @@ -573,10 +575,10 @@ def __init__(self, metadata: md.Metadata, destination_id: str): def push_dataset( + *, metadata: md.Metadata, dataset_destination_id: str, dataset_package_path: Path, - *, publish: bool = False, metadata_only: bool = False, ): @@ -680,3 +682,16 @@ def push_dataset( dataset.discard_open_revisions() return f"Published {metadata.attributes.display_name} - destination: {dataset_destination_id}" + + +class SocrataPublishConnector: + conn_type = "socrata" + + def push( + self, + **kwargs: Unpack[PublisherPushKwargs], + ) -> Any: + return push_dataset(**kwargs) + + def pull(self, **kwargs): + raise Exception("Pull not implemented for Socrata Connector") diff --git a/dcpy/lifecycle/distribute/__init__.py b/dcpy/lifecycle/distribute/__init__.py new file mode 100644 index 000000000..36a0d193a --- /dev/null +++ b/dcpy/lifecycle/distribute/__init__.py @@ -0,0 +1,10 @@ +from dcpy.lifecycle.distribute import connectors as _conn + +from dcpy.connectors.socrata.publish import SocrataPublishConnector + + +# Register all default connectors for `lifecycle.distribute`. +# Third parties can similarly register their own connectors, +# so long as the connector implements the distribute Connector protocol. +_conn.register(SocrataPublishConnector()) +_conn.register(_conn.DistributionFTPConnector()) diff --git a/dcpy/lifecycle/distribute/connectors.py b/dcpy/lifecycle/distribute/connectors.py new file mode 100644 index 000000000..0c8c89b9d --- /dev/null +++ b/dcpy/lifecycle/distribute/connectors.py @@ -0,0 +1,52 @@ +from typing import Unpack, Protocol, Any + +from dcpy.connectors.ftp import FTPConnector +from dcpy.models.lifecycle.distribution import PublisherPushKwargs + + +_connectors = {} + + +class Connector(Protocol): + conn_type: str + + def push(self, **kwargs: Unpack[PublisherPushKwargs]) -> Any: + """push""" + + def pull(self, **kwargs) -> Any: + """pull""" + + +def register(connector: Connector): + _connectors[connector.conn_type] = connector + + +def push(dest_type: str, **kwargs: Unpack[PublisherPushKwargs]) -> str: + connector: Connector = _connectors[dest_type] + return connector.push(**kwargs) + + +def pull(conn_type: str, **kwargs) -> str: + connector: Connector = _connectors[conn_type] + return connector.pull(**kwargs) + + +# Wrap the FTP Connector to bind it to the `PublisherPushKwargs` +# so that we can register and delegate FTP calls. +# This is the recommended way for third parties to add custom Distribution Connectors. +class DistributionFTPConnector: + conn_type: str + + def __init__(self): + self.conn_type = "ftp" + self._base_connector = FTPConnector() + + def push(self, **kwargs: Unpack[PublisherPushKwargs]) -> Any: + md = kwargs["metadata"] + dest = md.get_destination(kwargs["dataset_destination_id"]) + dest_path = dest.custom["destination_path"] + user_id = dest.custom["user_id"] + self._base_connector.push(dest_path=dest_path, user_id=user_id) + + def pull(self, **kwargs) -> Any: + raise Exception("Pull is not defined for any Distribution Connectors.") diff --git a/dcpy/models/lifecycle/distribution.py b/dcpy/models/lifecycle/distribution.py new file mode 100644 index 000000000..69bc9133e --- /dev/null +++ b/dcpy/models/lifecycle/distribution.py @@ -0,0 +1,12 @@ +from typing import TypedDict, NotRequired, Required +from pathlib import Path + +import dcpy.models.product.dataset.metadata as ds_md + + +class PublisherPushKwargs(TypedDict): + metadata: Required[ds_md.Metadata] + dataset_destination_id: Required[str] + publish: NotRequired[bool] + dataset_package_path: NotRequired[Path] + metadata_only: NotRequired[bool] diff --git a/dcpy/test/lifecycle/distribute/test_connector_dispatch.py b/dcpy/test/lifecycle/distribute/test_connector_dispatch.py new file mode 100644 index 000000000..a49995cdf --- /dev/null +++ b/dcpy/test/lifecycle/distribute/test_connector_dispatch.py @@ -0,0 +1,58 @@ +from pathlib import Path +import pytest +from typing import Unpack, Any + +from dcpy.models.lifecycle.distribution import PublisherPushKwargs +from dcpy.models.product import metadata as md +from dcpy.lifecycle.distribute import connectors + + +@pytest.fixture +def org_metadata(resources_path: Path): + # TODO: refactor away, into conftest maybe + template_vars = { + "version": "24c", + "lion_prod_level_pub_freq": "monthly", + "pseudo_lots_pub_freq": "monthly", + "agency": "fake_agency", + } + return md.OrgMetadata.from_path( + resources_path / "test_product_metadata_repo", template_vars=template_vars + ) + + +SNOWFLAKE_CONNECTOR_TYPE = "snowflake" + + +class MockSnowflakeConnector: + conn_type: str + + def __init__(self): + self.conn_type = SNOWFLAKE_CONNECTOR_TYPE + self.push_counter = 0 + + def push( + self, + **kwargs: Unpack[PublisherPushKwargs], + ) -> Any: + self.push_counter += 1 + + def pull(self, **kwargs): + raise Exception("Pull not implemented for Socrata Connector") + + +def test_dynamic_dispatch(org_metadata: md.OrgMetadata): + snowflake_connector = MockSnowflakeConnector() + connectors.register(snowflake_connector) + + assert snowflake_connector.push_counter == 0 + + connectors.push( + dest_type=SNOWFLAKE_CONNECTOR_TYPE, + metadata=org_metadata.product("lion").dataset("pseudo_lots"), + dataset_destination_id="garlic_sftp", + ) + + assert ( + snowflake_connector.push_counter == 1 + ), "The mock snowflake connector should have been called." diff --git a/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml b/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml index bb16ee568..2127cde3d 100644 --- a/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml +++ b/dcpy/test/resources/test_product_metadata_repo/products/lion/pseudo_lots/metadata.yml @@ -21,3 +21,5 @@ destinations: - id: socrata type: socrata tags: [prod_tag, pseudo_lots_tag] + - id: garlic_sftp + type: sftp