diff --git a/warehouse/oso_dagster/definitions.py b/warehouse/oso_dagster/definitions.py index 9a9d7868..3c2c6f6c 100644 --- a/warehouse/oso_dagster/definitions.py +++ b/warehouse/oso_dagster/definitions.py @@ -7,7 +7,7 @@ from dagster_embedded_elt.dlt import DagsterDltResource from dotenv import load_dotenv from . import constants -from .schedules import schedules +from .schedules import schedules, get_partitioned_schedules from .cbt import CBTResource from .factories import load_all_assets_from_package from .utils import ( @@ -91,6 +91,8 @@ def load_definitions(): asset_factories = asset_factories + alerts + all_schedules = schedules + get_partitioned_schedules(asset_factories) + # Each of the dbt environments needs to be setup as a resource to be used in # the dbt assets resources = { @@ -116,7 +118,7 @@ def load_definitions(): return Definitions( assets=asset_factories.assets, - schedules=schedules, + schedules=all_schedules, jobs=asset_factories.jobs, asset_checks=asset_factories.checks, sensors=asset_factories.sensors, diff --git a/warehouse/oso_dagster/factories/common.py b/warehouse/oso_dagster/factories/common.py index 49270efd..e3cafe88 100644 --- a/warehouse/oso_dagster/factories/common.py +++ b/warehouse/oso_dagster/factories/common.py @@ -73,6 +73,11 @@ def filter_assets_by_name(self, name: str): filtered = self.filter_assets(lambda a: a.key.path[-1] == name) return filtered + def find_job_by_name( + self, name: str + ) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]: + return next((job for job in self.jobs if job.name == name), None) + type EarlyResourcesAssetDecoratedFunction[**P] = Callable[ P, AssetFactoryResponse | AssetsDefinition diff --git a/warehouse/oso_dagster/factories/dlt.py b/warehouse/oso_dagster/factories/dlt.py index ac2c1ef6..8a536019 100644 --- a/warehouse/oso_dagster/factories/dlt.py +++ b/warehouse/oso_dagster/factories/dlt.py @@ -3,6 +3,7 @@ Dict, Any, Mapping, + MutableMapping, Optional, Callable, Iterator, @@ -14,12 +15,14 @@ from uuid import uuid4 from dagster import ( + PartitionsDefinition, asset, AssetIn, AssetExecutionContext, MaterializeResult, Config, AssetMaterialization, + define_asset_job, ) import dlt as dltlib from dlt.sources import DltResource @@ -63,7 +66,7 @@ def dlt_factory( key_prefix: Optional[AssetKeyPrefixParam] = None, deps: Optional[AssetDeps] = None, ins: Optional[Mapping[str, AssetIn]] = None, - tags: Optional[Mapping[str, str]] = None, + tags: Optional[MutableMapping[str, str]] = None, *args: P.args, **kwargs: P.kwargs, ): @@ -77,6 +80,9 @@ def dlt_factory( """ tags = tags or {} + if "partitions_def" in kwargs: + tags["opensource.observer/extra"] = "partitioned-assets" + key_prefix_str = "" if key_prefix: if isinstance(key_prefix, str): @@ -186,7 +192,18 @@ def _dlt_asset( for result in results: yield cast(R, result) - return AssetFactoryResponse([_dlt_asset]) + asset_partitions = cast( + Optional[PartitionsDefinition[str]], + kwargs["partitions_def"] if "partitions_def" in kwargs else None, + ) + + _asset_job = define_asset_job( + name=f"{key_prefix_str}_{asset_name}_job", + selection=[_dlt_asset], + partitions_def=asset_partitions, + ) + + return AssetFactoryResponse([_dlt_asset], [], [_asset_job]) return _factory diff --git a/warehouse/oso_dagster/schedules.py b/warehouse/oso_dagster/schedules.py index 0e28f127..11be92ae 100644 --- a/warehouse/oso_dagster/schedules.py +++ b/warehouse/oso_dagster/schedules.py @@ -1,11 +1,72 @@ -""" -To add a daily schedule that materializes your dbt assets, uncomment the following lines. -""" +from typing import Generator, Iterable, List, cast + +from dagster import ( + AssetKey, + AssetSelection, + AssetsDefinition, + RunRequest, + ScheduleDefinition, + ScheduleEvaluationContext, + define_asset_job, +) +from oso_dagster.factories.common import AssetFactoryResponse + +partitioned_assets = AssetSelection.tag( + "opensource.observer/extra", "partitioned-assets" +) + + +def get_partitioned_schedules( + factory: AssetFactoryResponse, +) -> List[ScheduleDefinition]: + resolved_assets = partitioned_assets.resolve( + cast(Iterable[AssetsDefinition], factory.assets) + ) + + def create_schedule(asset_key: AssetKey): + asset_path = "_".join(asset_key.path) + job_name = f"{asset_path}_job" + factory_job = factory.find_job_by_name(job_name) + + if not factory_job: + raise ValueError(f"Job {job_name} not found in factory response") + + def execution_fn( + context: ScheduleEvaluationContext, + ) -> Generator[RunRequest, None, None]: + if not factory_job.partitions_def: + raise ValueError( + f"Job {job_name} does not have a partitions definition, but is being used " + "in a partitioned schedule" + ) + + materialized_partitions = set( + context.instance.get_materialized_partitions(asset_key) + ) + + yield from ( + RunRequest( + run_key=f"{asset_path}_{partition_key}", + partition_key=partition_key, + ) + for partition_key in factory_job.partitions_def.get_partition_keys() + if partition_key not in materialized_partitions + ) + + # Run unmaterilized partitions every sunday at midnight + return ScheduleDefinition( + job=factory_job, + cron_schedule="0 0 * * 0", + name=f"materialize_{asset_path}_schedule", + execution_fn=execution_fn, + ) + + return [create_schedule(asset_key) for asset_key in resolved_assets] -from dagster import define_asset_job, ScheduleDefinition, AssetSelection materialize_all_assets = define_asset_job( - "materialize_all_assets_job", AssetSelection.all() + "materialize_all_assets_job", + AssetSelection.all() - partitioned_assets, ) materialize_source_assets = define_asset_job( @@ -14,8 +75,8 @@ | AssetSelection.tag("opensource.observer/type", "source-qa"), ) -schedules = [ - # Run everything once a week on sunday at midnight +schedules: list[ScheduleDefinition] = [ + # Run everything except partitioned assets once a week on sunday at midnight ScheduleDefinition( job=materialize_all_assets, cron_schedule="0 0 * * 0",