Skip to content

Commit

Permalink
feat: automatically create schedules to materialise partitioned a…
Browse files Browse the repository at this point in the history
…ssets dynamically (#2136)

* add: `find_by_job_name` member method to `AssetFactoryResponse` class

* add: automatic asset `job` creation functionality for `partitioned` assets to `dlt_factory`

* add: `generated` job schedules for `partitioned` assets

* feat: automatically create `schedules` to materialize `partitioned` assets dynamically
  • Loading branch information
Jabolol authored Sep 12, 2024
1 parent 4202708 commit a3ccda5
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 11 deletions.
6 changes: 4 additions & 2 deletions warehouse/oso_dagster/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dagster_embedded_elt.dlt import DagsterDltResource
from dotenv import load_dotenv
from . import constants
from .schedules import schedules
from .schedules import schedules, get_partitioned_schedules
from .cbt import CBTResource
from .factories import load_all_assets_from_package
from .utils import (
Expand Down Expand Up @@ -91,6 +91,8 @@ def load_definitions():

asset_factories = asset_factories + alerts

all_schedules = schedules + get_partitioned_schedules(asset_factories)

# Each of the dbt environments needs to be setup as a resource to be used in
# the dbt assets
resources = {
Expand All @@ -116,7 +118,7 @@ def load_definitions():

return Definitions(
assets=asset_factories.assets,
schedules=schedules,
schedules=all_schedules,
jobs=asset_factories.jobs,
asset_checks=asset_factories.checks,
sensors=asset_factories.sensors,
Expand Down
5 changes: 5 additions & 0 deletions warehouse/oso_dagster/factories/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def filter_assets_by_name(self, name: str):
filtered = self.filter_assets(lambda a: a.key.path[-1] == name)
return filtered

def find_job_by_name(
self, name: str
) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
return next((job for job in self.jobs if job.name == name), None)


type EarlyResourcesAssetDecoratedFunction[**P] = Callable[
P, AssetFactoryResponse | AssetsDefinition
Expand Down
21 changes: 19 additions & 2 deletions warehouse/oso_dagster/factories/dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Dict,
Any,
Mapping,
MutableMapping,
Optional,
Callable,
Iterator,
Expand All @@ -14,12 +15,14 @@

from uuid import uuid4
from dagster import (
PartitionsDefinition,
asset,
AssetIn,
AssetExecutionContext,
MaterializeResult,
Config,
AssetMaterialization,
define_asset_job,
)
import dlt as dltlib
from dlt.sources import DltResource
Expand Down Expand Up @@ -63,7 +66,7 @@ def dlt_factory(
key_prefix: Optional[AssetKeyPrefixParam] = None,
deps: Optional[AssetDeps] = None,
ins: Optional[Mapping[str, AssetIn]] = None,
tags: Optional[Mapping[str, str]] = None,
tags: Optional[MutableMapping[str, str]] = None,
*args: P.args,
**kwargs: P.kwargs,
):
Expand All @@ -77,6 +80,9 @@ def dlt_factory(
"""
tags = tags or {}

if "partitions_def" in kwargs:
tags["opensource.observer/extra"] = "partitioned-assets"

key_prefix_str = ""
if key_prefix:
if isinstance(key_prefix, str):
Expand Down Expand Up @@ -186,7 +192,18 @@ def _dlt_asset(
for result in results:
yield cast(R, result)

return AssetFactoryResponse([_dlt_asset])
asset_partitions = cast(
Optional[PartitionsDefinition[str]],
kwargs["partitions_def"] if "partitions_def" in kwargs else None,
)

_asset_job = define_asset_job(
name=f"{key_prefix_str}_{asset_name}_job",
selection=[_dlt_asset],
partitions_def=asset_partitions,
)

return AssetFactoryResponse([_dlt_asset], [], [_asset_job])

return _factory

Expand Down
75 changes: 68 additions & 7 deletions warehouse/oso_dagster/schedules.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,72 @@
"""
To add a daily schedule that materializes your dbt assets, uncomment the following lines.
"""
from typing import Generator, Iterable, List, cast

from dagster import (
AssetKey,
AssetSelection,
AssetsDefinition,
RunRequest,
ScheduleDefinition,
ScheduleEvaluationContext,
define_asset_job,
)
from oso_dagster.factories.common import AssetFactoryResponse

partitioned_assets = AssetSelection.tag(
"opensource.observer/extra", "partitioned-assets"
)


def get_partitioned_schedules(
factory: AssetFactoryResponse,
) -> List[ScheduleDefinition]:
resolved_assets = partitioned_assets.resolve(
cast(Iterable[AssetsDefinition], factory.assets)
)

def create_schedule(asset_key: AssetKey):
asset_path = "_".join(asset_key.path)
job_name = f"{asset_path}_job"
factory_job = factory.find_job_by_name(job_name)

if not factory_job:
raise ValueError(f"Job {job_name} not found in factory response")

def execution_fn(
context: ScheduleEvaluationContext,
) -> Generator[RunRequest, None, None]:
if not factory_job.partitions_def:
raise ValueError(
f"Job {job_name} does not have a partitions definition, but is being used "
"in a partitioned schedule"
)

materialized_partitions = set(
context.instance.get_materialized_partitions(asset_key)
)

yield from (
RunRequest(
run_key=f"{asset_path}_{partition_key}",
partition_key=partition_key,
)
for partition_key in factory_job.partitions_def.get_partition_keys()
if partition_key not in materialized_partitions
)

# Run unmaterilized partitions every sunday at midnight
return ScheduleDefinition(
job=factory_job,
cron_schedule="0 0 * * 0",
name=f"materialize_{asset_path}_schedule",
execution_fn=execution_fn,
)

return [create_schedule(asset_key) for asset_key in resolved_assets]

from dagster import define_asset_job, ScheduleDefinition, AssetSelection

materialize_all_assets = define_asset_job(
"materialize_all_assets_job", AssetSelection.all()
"materialize_all_assets_job",
AssetSelection.all() - partitioned_assets,
)

materialize_source_assets = define_asset_job(
Expand All @@ -14,8 +75,8 @@
| AssetSelection.tag("opensource.observer/type", "source-qa"),
)

schedules = [
# Run everything once a week on sunday at midnight
schedules: list[ScheduleDefinition] = [
# Run everything except partitioned assets once a week on sunday at midnight
ScheduleDefinition(
job=materialize_all_assets,
cron_schedule="0 0 * * 0",
Expand Down

0 comments on commit a3ccda5

Please sign in to comment.