Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Connector Registration and Dynamic Dispatch #1375

Merged
merged 8 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions .github/workflows/distribute_socrata_from_bytes.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 📬 Distribute Socrata From Bytes
run-name: "📬 Distribute Socrata From Bytes: ${{ inputs.PRODUCT_NAME }}-${{ inputs.DATASET_VERSION }}-${{ inputs.DATASET }}-${{ inputs.DESTINATION_ID }}"
name: 📬 Distribute From Bytes
run-name: "📬 Distribute From Bytes: ${{ inputs.PRODUCT_NAME }}-${{ inputs.DATASET_VERSION }}-${{ inputs.DATASET }}-${{ inputs.DESTINATION_ID }}"

on:
workflow_dispatch:
Expand All @@ -8,31 +8,40 @@ on:
description: "Name of the product"
type: string
required: true
DATASET_VERSION:
PRODUCT_VERSION:
description: "Version to push"
type: string
required: true
DESTINATION_ID:
description: "Destination ID (e.g. `colp_socrata`)"
type: string
required: true
DATASET:
description: "Dataset to push (defaults to PRODUCT_NAME when omitted)"
description: "Dataset to push"
type: string
required: false
required: true
PUBLISH:
description: "Publish the revision? (if not, will leave the revision open)"
type: boolean
default: false
required: true
IGNORE_VALIDATION_ERRORS:
description: "Ignore Validation Errors? (Will still perform validation, but will just log the outputs to the console)"
required: false
METADATA_ONLY:
description: "Distribute just the metadata?"
type: boolean
default: false
required: false
DESTINATION_ID_FILTER:
description: "Destination ID (e.g. `colp_socrata`)"
type: string
required: false
DESTINATION_TYPE_FILTER:
description: "Destination type (e.g. 'ftp')"
type: string
required: false
SKIP_VALIDATION:
description: "Skip validation altogether?"
type: boolean
default: false
IGNORE_VALIDATION_ERRORS:
description: "Ignore Validation Errors? (Will still perform validation, but will just log the outputs to the console)"
type: boolean
default: false
jobs:
publish:
runs-on: ubuntu-22.04
Expand All @@ -43,9 +52,17 @@ jobs:
image: nycplanning/build-base:latest
env:
PUBLISHING_BUCKET: edm-publishing
RECIPES_BUCKET: edm-recipes
PRODUCT_METADATA_REPO_PATH: product-metadata
_TYPER_STANDARD_TRACEBACK: 1
steps:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this disables the use of rich for error messages. are "pretty" tracebacks in github action logs hard to read when distributing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the context variables from typer are humongous in this case, so it's a real pain to track back in the logs. Honestly, I really dislike having the full context all the time. It might be nice to tie this to the to log-level. (ie enable rich for debug level logs, but not for info)

- uses: actions/checkout@v4

- uses: actions/checkout@v4
with:
repository: NYCPlanning/product-metadata
path: product-metadata

- name: Load Secrets
uses: 1password/load-secrets-action@v1
with:
Expand All @@ -58,3 +75,20 @@ jobs:
SOCRATA_USER: "op://Data Engineering/DCP_OpenData/username"
SOCRATA_PASSWORD: "op://Data Engineering/DCP_OpenData/password"


- name: Finish container setup
run: ./bash/docker_container_setup.sh

- name: Distribute to Destinations
run: |
python3 -m dcpy.cli lifecycle scripts package_and_distribute \
${{ inputs.PRODUCT_NAME }} \
${{ inputs.PRODUCT_VERSION }} \
${{ inputs.DATASET }} \
bytes \
$(if [ ${{ inputs.DESTINATION_ID_FILTER }} != "" ]; then echo "-a ${{ inputs.DESTINATION_ID_FILTER }}"; fi) \
$(if [ ${{ inputs.DESTINATION_TYPE_FILTER }} != "" ]; then echo "-e ${{ inputs.DESTINATION_TYPE_FILTER }}"; fi) \
$(if [ ${{ inputs.SKIP_VALIDATION }} = "true" ]; then echo "-y"; fi) \
$(if [ ${{ inputs.IGNORE_VALIDATION_ERRORS }} = "true" ]; then echo "-i"; fi) \
$(if [ ${{ inputs.PUBLISH }} = "true" ]; then echo "-p"; fi) \
$(if [ ${{ inputs.METADATA_ONLY }} = "true" ]; then echo "-m"; fi)
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Lifecycle artifacts
.library/
.publishing/
.package/
.lifecycle/
output/
.output/
.data
Expand Down
6 changes: 6 additions & 0 deletions dcpy/connectors/ftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class FTPConnector:
def push(self, dest_path: str, ftp_profile: str):
raise Exception("Push not implemented for FTP")

Check warning on line 3 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L3

Added line #L3 was not covered by tests

def pull(self, **kwargs):
raise Exception("Pull not implemented for FTP")

Check warning on line 6 in dcpy/connectors/ftp.py

View check run for this annotation

Codecov / codecov/patch

dcpy/connectors/ftp.py#L6

Added line #L6 was not covered by tests
2 changes: 1 addition & 1 deletion dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ def __init__(self, metadata: md.Metadata, destination_id: str):


def push_dataset(
*,
metadata: md.Metadata,
dataset_destination_id: str,
dataset_package_path: Path,
*,
publish: bool = False,
metadata_only: bool = False,
):
Expand Down
4 changes: 3 additions & 1 deletion dcpy/lifecycle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path

BASE_PATH = Path(".lifecycle")


class WORKING_DIRECTORIES:
packaging = Path(".package")
packaging = BASE_PATH / Path("package")
44 changes: 44 additions & 0 deletions dcpy/lifecycle/distribute/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Unpack

from dcpy.lifecycle.distribute.connectors import (
DistributionSFTPConnector,
SocrataPublishConnector,
)
from dcpy.models.lifecycle.distribute import (
DatasetDestinationPushArgs,
DistributeResult,
)
from dcpy.models.connectors import ConnectorDispatcher


# Register all default connectors for `lifecycle.distribute`.
# Third parties can similarly register their own connectors,
# so long as the connector implements a ConnectorDispatcher protocol.
dispatcher = ConnectorDispatcher[DatasetDestinationPushArgs, dict]()

dispatcher.register(conn_type="socrata", connector=SocrataPublishConnector())
dispatcher.register(conn_type="sftp", connector=DistributionSFTPConnector())


def to_dataset_destination(
**push_kwargs: Unpack[DatasetDestinationPushArgs],
) -> DistributeResult:
"""Distribute a dataset and specific dataset_destination_id.

Requires fully rendered template, ie there should be no template variables in the metadata
"""
ds_md = push_kwargs["metadata"]
dest = ds_md.get_destination(push_kwargs["dataset_destination_id"])
dest_type = dest.type

Check warning on line 32 in dcpy/lifecycle/distribute/__init__.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/__init__.py#L30-L32

Added lines #L30 - L32 were not covered by tests

try:
result = dispatcher.push(dest_type, push_kwargs)
return DistributeResult.from_push_kwargs(

Check warning on line 36 in dcpy/lifecycle/distribute/__init__.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/__init__.py#L34-L36

Added lines #L34 - L36 were not covered by tests
result=result, success=True, push_args=push_kwargs
)
except Exception as e:
return DistributeResult.from_push_kwargs(

Check warning on line 40 in dcpy/lifecycle/distribute/__init__.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/__init__.py#L39-L40

Added lines #L39 - L40 were not covered by tests
result=f"Error pushing {push_kwargs['metadata'].attributes.display_name} to dest_type: {dest_type}, destination: {dest.id}: {str(e)}",
success=False,
push_args=push_kwargs,
)
38 changes: 36 additions & 2 deletions dcpy/lifecycle/distribute/_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,41 @@
import typer
from pathlib import Path

Check warning on line 2 in dcpy/lifecycle/distribute/_cli.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/_cli.py#L2

Added line #L2 was not covered by tests

from dcpy.lifecycle.distribute import socrata
from dcpy.lifecycle import distribute
import dcpy.models.product.dataset.metadata as m

Check warning on line 5 in dcpy/lifecycle/distribute/_cli.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/_cli.py#L4-L5

Added lines #L4 - L5 were not covered by tests

app = typer.Typer()

app.add_typer(socrata.socrata_app, name="socrata")

@app.command("from_local")
def _dist_from_local(
package_path: Path = typer.Argument(),
dataset_destination_id: str = typer.Argument(),
metadata_path: Path = typer.Option(
None,
"-m",
"--metadata-path",
help="(Optional) Metadata Path Override",
),
publish: bool = typer.Option(
False,
"-p",
"--publish",
help="Publish the Revision? Or leave it open.",
),
metadata_only: bool = typer.Option(
False,
"-z",
"--metadata-only",
help="Only push metadata (including attachments).",
),
):
md = m.Metadata.from_path(metadata_path or (package_path / "metadata.yml"))
result = distribute.to_dataset_destination(
metadata=md,
dataset_destination_id=dataset_destination_id,
publish=publish,
dataset_package_path=package_path,
metadata_only=metadata_only,
)
print(result)
42 changes: 42 additions & 0 deletions dcpy/lifecycle/distribute/connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any

from dcpy.connectors.ftp import FTPConnector
from dcpy.connectors.socrata import publish as socrata_pub
from dcpy.models.lifecycle.distribute import DatasetDestinationPushArgs

# Sadly, can't use Unpack on kwarg generics yet.
# https://github.com/python/typing/issues/1399


# Wrap the connectors to bind them to the `PublisherPushKwargs`
# so that we can register and delegate calls.
# This is the recommended way for third parties to add custom Distribution Connectors.
class DistributionSFTPConnector:
conn_type: str

def __init__(self):
self.conn_type = "sftp"
self._base_connector = FTPConnector()

def push(self, arg: DatasetDestinationPushArgs) -> Any:
md = arg["metadata"]
dest = md.get_destination(arg["dataset_destination_id"])
dest_path = dest.custom["destination_path"]
user_id = dest.custom["user_id"]
self._base_connector.push(dest_path=dest_path, ftp_profile=user_id)

Check warning on line 26 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L22-L26

Added lines #L22 - L26 were not covered by tests

def pull(self, _: dict) -> Any:
raise Exception("Pull is not defined for any Distribution Connectors.")

Check warning on line 29 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L29

Added line #L29 was not covered by tests


class SocrataPublishConnector:
conn_type = "socrata"

def push(
self,
arg: DatasetDestinationPushArgs,
) -> Any:
return socrata_pub.push_dataset(**arg)

Check warning on line 39 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L39

Added line #L39 was not covered by tests

def pull(self, _: dict):
raise Exception("Pull not implemented for Socrata Connector")

Check warning on line 42 in dcpy/lifecycle/distribute/connectors.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/distribute/connectors.py#L42

Added line #L42 was not covered by tests
Loading