Skip to content

Commit

Permalink
Added type hints, use asdict to serialize dataclasses
Browse files Browse the repository at this point in the history
  • Loading branch information
amywieliczka committed Oct 30, 2024
1 parent c7555dd commit 04453f9
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
6 changes: 4 additions & 2 deletions dags/shared_tasks/mapping_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def map_endpoint_task(endpoint, fetched_versions, params=None, **context):
)
mapped_versions[collection_id] = mapped_collection_status.version

send_event_to_sns(context, mapped_collections)
send_event_to_sns(context,
{k: v.asdict() for k, v in mapped_collections.items()})
return mapped_versions


Expand Down Expand Up @@ -269,7 +270,8 @@ def validate_endpoint_task(url, mapped_versions, params=None, **context):
print("-", file=sys.stderr)
raise ValueError("No collections successfully validated, exiting.")

send_event_to_sns(context, validations)
send_event_to_sns(context,
{k: v.asdict() for k, v in validations.items()})
return [
validation_status.filepath
for validation_status in validations.values()
Expand Down
6 changes: 3 additions & 3 deletions metadata_mapper/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ def run_enrichments(records, collection, enrichment_set, page_filename):
class MappedPageStatus:
status: str
num_mapped_records: int
exceptions: dict
exceptions: dict[str, list[str]]
mapped_page_path: Union[str, None]
# ex: 3433/vernacular_metadata_v1/mapped_metadata_v1/data/1.jsonl


def group_page_exceptions(mapped_records: list[Record]):
def group_page_exceptions(mapped_records: list[Record]) -> dict[str, list[str]]:
group_page_exceptions = {}
page_exceptions = {
page_exceptions: dict[str, list[str]] = {
rec.legacy_couch_db_id: rec.enrichment_report
for rec in mapped_records if rec.enrichment_report
}
Expand Down
17 changes: 13 additions & 4 deletions metadata_mapper/lambda_shepherd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import requests

from dataclasses import dataclass
from dataclasses import dataclass, asdict
from urllib.parse import urlparse
from typing import Optional

from . import validate_mapping
from .lambda_function import map_page, MappedPageStatus
Expand Down Expand Up @@ -41,7 +42,7 @@ def check_for_missing_enrichments(collection):
return not_yet_implemented


@dataclass
@dataclass(frozen=True)
class MappedCollectionStatus:
status: str
num_mapped_records: int
Expand All @@ -53,6 +54,10 @@ class MappedCollectionStatus:
data_link: str


def asdict(self):
return asdict(self)


def get_mapping_status(
collection,
mapped_page_statuses: list[MappedPageStatus]) -> MappedCollectionStatus:
Expand Down Expand Up @@ -155,7 +160,10 @@ def print_map_status(collection, map_result: MappedCollectionStatus):
print(map_report_row)


def map_collection(collection_id, vernacular_version=None, validate=False):
def map_collection(
collection_id,
vernacular_version: Optional[str]=None,
validate: bool=False) -> MappedCollectionStatus:
# This is a functional duplicate of rikolti.d*gs.mapper_d*g.mapper_d*g

# Within an airflow runtime context, we take advantage of airflow's dynamic
Expand Down Expand Up @@ -217,7 +225,8 @@ def map_collection(collection_id, vernacular_version=None, validate=False):
const=True, nargs='?')
parser.add_argument('vernacular_version', help='relative path describing a vernacular version, ex: 3433/vernacular_data_version_1/')
args = parser.parse_args(sys.argv[1:])
mapped_collection = map_collection(args.collection_id, args.vernacular_version, args.validate)
mapped_collection = map_collection(
args.collection_id, args.vernacular_version, args.validate)
missing_enrichments = mapped_collection.missing_enrichments
if len(missing_enrichments) > 0:
print(
Expand Down
22 changes: 17 additions & 5 deletions metadata_mapper/map_registry_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import logging
import sys

from dataclasses import dataclass
from typing import Optional
from dataclasses import dataclass, asdict
from typing import Optional, Union

import requests

Expand All @@ -16,10 +16,16 @@
logger = logging.getLogger(__name__)


def map_endpoint(url, fetched_versions, limit=None) -> dict[int, MappedCollectionStatus]:
def map_endpoint(
url: str,
fetched_versions: dict[str, str],
limit: Optional[Union[int, str]]=None
) -> dict[int, MappedCollectionStatus]:

response = requests.get(url=url)
response.raise_for_status()
total = response.json().get('meta', {}).get('total_count', 1)
total: Union[int,str] = (
response.json().get('meta', {}).get('total_count', 1))
progress = 0
# map_report_headers = (
# "Collection ID, Status, Extent, Solr Count, Diff Count, Message"
Expand Down Expand Up @@ -65,14 +71,20 @@ def map_endpoint(url, fetched_versions, limit=None) -> dict[int, MappedCollectio
return map_report


@dataclass
@dataclass(frozen=True)
class ValidationReportStatus:
filepath: Optional[str] = None
num_validation_errors: int = 0
mapped_version: Optional[str] = None
error: bool = False
exception: Optional[Exception] = None

def asdict(self):
d = asdict(self)
if self.exception:
d['exception'] = str(self.exception)
return d


def validate_endpoint(
url, mapped_versions, limit=None) -> dict[int, ValidationReportStatus]:
Expand Down

0 comments on commit 04453f9

Please sign in to comment.