diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 475c38970..016110a95 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -34,8 +34,6 @@ from pudl.settings import EtlSettings from . import ( - check_foreign_keys, - cli, eia_bulk_elec_assets, epacems_assets, glue_assets, diff --git a/src/pudl/etl/cli.py b/src/pudl/etl/cli.py index 016c12ff5..1e3e9c2ff 100644 --- a/src/pudl/etl/cli.py +++ b/src/pudl/etl/cli.py @@ -8,14 +8,13 @@ import fsspec from dagster import ( DagsterInstance, - Definitions, JobDefinition, build_reconstructable_job, - define_asset_job, execute_job, ) import pudl +from pudl.etl import defs from pudl.helpers import get_dagster_execution_config from pudl.settings import EpaCemsSettings, EtlSettings from pudl.workspace.setup import PudlPaths @@ -37,24 +36,12 @@ def pudl_etl_job_factory( The job definition to be executed. """ - def get_pudl_etl_job(): + def get_pudl_etl_job(job_name: str | None = None): """Create an pudl_etl_job wrapped by to be wrapped by reconstructable.""" pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel) - jobs = [define_asset_job("etl_job")] - if not process_epacems: - jobs = [ - define_asset_job( - "etl_job", - selection=pudl.etl.create_non_cems_selection( - pudl.etl.default_assets - ), - ) - ] - return Definitions( - assets=pudl.etl.default_assets, - resources=pudl.etl.default_resources, - jobs=jobs, - ).get_job_def("etl_job") + if job_name is None: + job_name = "etl_full_no_cems" if not process_epacems else "etl_full" + return defs.get_job_def(job_name) return get_pudl_etl_job diff --git a/src/pudl/transform/ferc714.py b/src/pudl/transform/ferc714.py index e188d18f1..8c2cc278e 100644 --- a/src/pudl/transform/ferc714.py +++ b/src/pudl/transform/ferc714.py @@ -1276,25 +1276,28 @@ class Ferc714CheckSpec: ] -def make_check(spec: Ferc714CheckSpec) -> AssetChecksDefinition: +def make_row_num_check(spec: Ferc714CheckSpec) -> AssetChecksDefinition: """Turn the Ferc714CheckSpec into an actual Dagster asset check.""" - @asset_check(asset=spec.asset, blocking=True) - def _check(df): + @asset_check( + asset=spec.asset, required_resource_keys={"dataset_settings"}, blocking=True + ) + def _row_num_check(context, df): errors = [] - for year, expected_rows in spec.num_rows_by_report_year.items(): + for year in context.resources.dataset_settings.ferc714.years: + expected_rows = spec.num_rows_by_report_year[year] if (num_rows := len(df.loc[df.report_year == year])) != expected_rows: errors.append( f"Expected {expected_rows} for report year {year}, found {num_rows}" ) - logger.info(errors) + logger.warning(errors) if errors: return AssetCheckResult(passed=False, metadata={"errors": errors}) return AssetCheckResult(passed=True) - return _check + return _row_num_check -_checks = [make_check(spec) for spec in check_specs] +_checks = [make_row_num_check(spec) for spec in check_specs] diff --git a/test/conftest.py b/test/conftest.py index 55994116d..20837864e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -363,7 +363,7 @@ def pudl_io_manager( md = PUDL_PACKAGE.to_sql() md.create_all(engine) # Run the ETL and generate a new PUDL SQLite DB for testing: - execute_result = pudl_etl_job_factory()().execute_in_process( + execute_result = pudl_etl_job_factory()("etl_fast").execute_in_process( run_config={ "resources": { "dataset_settings": {