Skip to content

Commit

Permalink
HDXDSYS-1301 Simplify operational presence HAPI pipeline to read from…
Browse files Browse the repository at this point in the history
… global HDX dataset (#217)

* Update humanitarian needs to remove error checks as they are now in pipeline
* Update sector and org type to inherit from HDX Python Scraper versions
* Operational presence pipeline reads from global HDX dataset
* Remove unneeded code
* Operational presence test added
  • Loading branch information
mcarans authored Jan 21, 2025
1 parent af46280 commit 90f0abb
Show file tree
Hide file tree
Showing 47 changed files with 56,288 additions and 26,082 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.32] = 2025-01-22

### Changed

- 3W from global dataset
- Remove negative and rounded checks from HNO as are now in scraper
- Common logic for 3W and HNO

## [0.10.31] = 2025-01-13

### Changed
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ email-validator==2.2.0
# via hdx-python-api
et-xmlfile==2.0.0
# via openpyxl
filelock==3.16.1
filelock==3.17.0
# via virtualenv
frictionless==5.18.0
# via hdx-python-utilities
Expand Down
16 changes: 1 addition & 15 deletions src/hapi/pipelines/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ def parse_args():
action="store_true",
help="Use saved data",
)
parser.add_argument(
"-dbg",
"--debug",
default=False,
action="store_true",
help="Debug",
)
parser.add_argument(
"-ehx",
"--err-to-hdx",
Expand All @@ -104,7 +97,6 @@ def main(
basic_auths: Optional[Dict[str, str]] = None,
save: bool = False,
use_saved: bool = False,
debug: bool = False,
err_to_hdx: bool = False,
**ignore,
) -> None:
Expand All @@ -121,7 +113,6 @@ def main(
basic_auths (Optional[Dict[str, str]]): Basic authorisations
save (bool): Whether to save state for testing. Defaults to False.
use_saved (bool): Whether to use saved state for testing. Defaults to False.
debug (bool): Whether to output debug info. Defaults to False.
err_to_hdx (bool): Whether to write any errors to HDX metadata. Defaults to False.
Returns:
Expand All @@ -142,7 +133,7 @@ def main(
params["prepare_fn"] = prepare_hapi_views
logger.info(f"> Database parameters: {params}")
configuration = Configuration.read()
with HDXErrorHandler(should_exit_on_error=False) as error_handler:
with HDXErrorHandler(write_to_hdx=err_to_hdx) as error_handler:
with temp_dir() as temp_folder:
with Database(**params) as database:
session = database.get_session()
Expand All @@ -169,9 +160,6 @@ def main(
)
pipelines.run()
pipelines.output()
pipelines.output_errors(err_to_hdx)
if debug:
pipelines.debug("debug")
logger.info("HAPI pipelines completed!")


Expand Down Expand Up @@ -216,7 +204,6 @@ def main(
"food_security.yaml",
"idps.yaml",
"national_risk.yaml",
"operational_presence.yaml",
"refugees_and_returnees.yaml",
"wfp.yaml",
]
Expand All @@ -235,6 +222,5 @@ def main(
basic_auths=basic_auths,
save=args.save,
use_saved=args.use_saved,
debug=args.debug,
err_to_hdx=ehx,
)
51 changes: 14 additions & 37 deletions src/hapi/pipelines/app/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ def __init__(
url=AdminLevel.formats_url, retriever=reader
).cache()
self.admins = Admins(
configuration, session, self.locations, libhxl_dataset
configuration,
session,
self.locations,
libhxl_dataset,
error_handler,
)
admin1_config = configuration["admin1"]
self.adminone = AdminLevel(admin_config=admin1_config, admin_level=1)
Expand All @@ -88,21 +92,13 @@ def __init__(
logger.info("Admin two name replacements:")
self.admintwo.output_admin_name_replacements()

self.org = Org(
session=session,
datasetinfo=configuration["org"],
)
self.org_type = OrgType(
session=session,
datasetinfo=configuration["org_type"],
org_type_map=configuration["org_type_map"],
)
self.sector = Sector(
session=session,
datasetinfo=configuration["sector"],
sector_map=configuration["sector_map"],
)
self.currency = Currency(configuration=configuration, session=session)
self.currency = Currency(session=session, configuration=configuration)

Sources.set_default_source_date_format("%Y-%m-%d")
self.runner = Runner(
Expand All @@ -113,6 +109,7 @@ def __init__(
)
self.configurable_scrapers = {}
self.create_configurable_scrapers()

self.metadata = Metadata(
runner=self.runner, session=session, today=today
)
Expand All @@ -135,7 +132,7 @@ def setup_configurable_scrapers(
if countryiso3s:
configuration = {}
# This assumes format prefix_iso_.... eg.
# population_gtm, operational_presence_afg_total
# population_gtm
iso3_index = len(prefix) + 1
for key, value in self.configuration[f"{prefix}{suffix}"].items():
if len(key) < iso3_index + 3:
Expand Down Expand Up @@ -172,13 +169,6 @@ def _create_configurable_scrapers(
current_scrapers + scraper_names
)

_create_configurable_scrapers(
"operational_presence", "admintwo", adminlevel=self.admintwo
)
_create_configurable_scrapers(
"operational_presence", "adminone", adminlevel=self.adminone
)
_create_configurable_scrapers("operational_presence", "national")
_create_configurable_scrapers("national_risk", "national")
_create_configurable_scrapers("refugees_and_returnees", "national")
_create_configurable_scrapers("idps", "national")
Expand Down Expand Up @@ -212,21 +202,17 @@ def output_operational_presence(self):
not self.themes_to_run
or "operational_presence" in self.themes_to_run
):
results = self.runner.get_hapi_results(
self.configurable_scrapers["operational_presence"]
org = Org(
session=self.session,
metadata=self.metadata,
configuration=self.configuration,
)
org.populate()
operational_presence = OperationalPresence(
session=self.session,
metadata=self.metadata,
admins=self.admins,
adminone=self.adminone,
admintwo=self.admintwo,
org=self.org,
org_type=self.org_type,
sector=self.sector,
results=results,
config=self.configuration,
error_handler=self.error_handler,
configuration=self.configuration,
)
operational_presence.populate()

Expand All @@ -253,9 +239,7 @@ def output_humanitarian_needs(self):
session=self.session,
metadata=self.metadata,
admins=self.admins,
sector=self.sector,
configuration=self.configuration,
error_handler=self.error_handler,
)
humanitarian_needs.populate()

Expand Down Expand Up @@ -374,7 +358,6 @@ def output(self):
self.locations.populate()
self.admins.populate()
self.metadata.populate()
self.org.populate()
self.org_type.populate()
self.sector.populate()
self.currency.populate()
Expand All @@ -389,9 +372,3 @@ def output(self):
self.output_poverty_rate()
self.output_conflict_event()
self.output_food_prices()

def debug(self, folder: str) -> None:
self.org.output_org_map(folder)

def output_errors(self) -> None:
self.error_handler.output_errors()
63 changes: 63 additions & 0 deletions src/hapi/pipelines/database/admins.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Populate the admin tables."""

import logging
import re
from abc import ABC
from typing import Dict, List, Literal, Optional

Expand All @@ -25,18 +26,22 @@


class Admins(BaseUploader):
admin_name_regex = re.compile(r"Admin (\d) Name")

def __init__(
self,
configuration: Configuration,
session: Session,
locations: Locations,
libhxl_dataset: hxl.Dataset,
error_handler: HDXErrorHandler,
):
super().__init__(session)
self._limit = configuration["commit_limit"]
self._orphan_admin2s = configuration["orphan_admin2s"]
self._locations = locations
self._libhxl_dataset = libhxl_dataset
self._error_handler = error_handler
self.admin1_data = {}
self.admin2_data = {}

Expand Down Expand Up @@ -196,6 +201,64 @@ def get_admin2_ref(
)
return ref

def get_admin2_ref_from_row(
self, row: Dict, dataset_name: str, pipeline: str
):
countryiso3 = row["Country ISO3"]
if countryiso3 == "#country+code": # ignore HXL row
return None
admin_level = "0"
for header in row:
match = self.admin_name_regex.match(header)
if match and row[header]:
admin_level = match.group(1)
match admin_level:
case "0":
admin_level = "national"
admin_code = countryiso3
case "1":
admin_code = row["Admin 1 PCode"]
if admin_code:
admin_level = "adminone"
else:
admin_level = "national"
admin_code = countryiso3
case "2":
admin_code = row["Admin 2 PCode"]
if admin_code:
admin_level = "admintwo"
else:
admin_code = row["Admin 1 PCode"]
if admin_code:
admin_level = "adminone"
else:
admin_level = "national"
admin_code = countryiso3
case _:
return None
admin2_ref = self.get_admin2_ref(
admin_level,
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
if admin2_ref is None:
if admin_level == "adminone":
admin_code = get_admin1_to_location_connector_code(countryiso3)
elif admin_level == "admintwo":
admin_code = get_admin2_to_location_connector_code(countryiso3)
else:
return None
admin2_ref = self.get_admin2_ref(
admin_level,
admin_code,
dataset_name,
pipeline,
self._error_handler,
)
return admin2_ref


def get_admin2_to_admin1_connector_code(admin1_code: str) -> str:
"""Get the code for an unspecified admin2, based on the admin1 code."""
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/currency.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
class Currency(BaseUploader):
def __init__(
self,
configuration: Configuration,
session: Session,
configuration: Configuration,
):
super().__init__(session)
self._configuration = configuration
Expand Down
Loading

0 comments on commit 90f0abb

Please sign in to comment.