[RFC] Declarative Automation and the future of Auto-materialize Policies #22811
Replies: 15 comments 40 replies
-
Sounds great. I wonder how this will perhaps support dbt and database views out of the box. I.e materialize the views only on changes of the view definition but refresh downstream easily according to their AMP |
Beta Was this translation helpful? Give feedback.
-
Quick note, the photos are inaccessible to me. Other than that, it sounds super duper awesome! The composability seems to extend further and be in the spirit of all the greatness Dagster is already known for. I am looking forward to playing around with these changes - and seeing if it solves some of the pains we've had with AutoMaterialization + Freshness (especially regarding partitioned assets). I really love the ability to create multiple sensors for different subsets of assets, minimizing the reach of potential bugs. |
Beta Was this translation helpful? Give feedback.
-
I would really love it if in the new system an automaterialize sensor was created for each asset, or we at least got the ability to turn off auto-materialization for individual assets/asset chains. There are a few feature requests/issues around this that dive deeper into it - #22073, #15504, #18133 Setting those sensors up separately is a bit of a hassle, and I would think has problems with asset DAGs that cross code locations. If instead each asset can have auto-materialization turned on or off at the asset level, perhaps with a toggle in the automation tab on the definition, then you can avoid those concerns I believe. |
Beta Was this translation helpful? Give feedback.
-
Are there any plans to revisit support for irregular non-cron schedules + time partitioning (that are known in advance) in light of this? Like on a list of business hour datetimes provided in a file or emitted by a some function. |
Beta Was this translation helpful? Give feedback.
-
Great changes! I might missed it but do you have any plans on customzing a list of parents monitored by conditions?
Obvious "schedule" for D is to wait for both B and C to be updated, but do not wait for A, since it won't be updated anytime soon.
Maybe anything in |
Beta Was this translation helpful? Give feedback.
-
Also - please do not forget the story of unit testing. I have seen that some first steps for unit testing for the old API were sent here #22292 given that:
could be dramatically simplified by this new API and these aspects ideally can be considered there as well |
Beta Was this translation helpful? Give feedback.
-
It would be very neat if this new api will become blocking for asset checks as outlined here #22427 |
Beta Was this translation helpful? Give feedback.
-
It would be quite neat if a default condition would be there to allow to update based on next business day (with a pluggable calendar) |
Beta Was this translation helpful? Give feedback.
-
Just because I didn't see it explicitly mentioned, the new declarative automation will still not work between different code locations? Assuming so because I see the behavior of the default daemon is still the same, with one Overall, definitely liking the design of the new approach to automation! Excited for the arrival of this |
Beta Was this translation helpful? Give feedback.
-
I would appreciate a automation_condition that allows me to enable/disable automation on a certain stage (think development, integration, production). In the previous versions of Dagster (using schedules + jobs to trigger assets) we have set the schedule state depending on the stage. |
Beta Was this translation helpful? Give feedback.
-
Hi, I understand that since part only works if the since condition was detected on a previous tick and partially understand how to build the automation but can't clearly translate the need to the code. The main Idea is that it should still update for example after a downtime, even if cron tick evaluation is missing, if the asset or all daily assets weren't updated, run the request to do so. |
Beta Was this translation helpful? Give feedback.
-
Hello, Can you point to the authoritative source of the latest documentation with working code, including coverage of even experimental features. It's really pissing me off trying to piece together sample code from PRs, docs-preview.dagster.io and docs.dagster.io. Similarly, if something has been superseded (like AutomationCondition superseding manually defined sensors & schedules), please mark sensors & schedules with something in the docs page to say that it has been superseded in the current release. I don't understand how all of your users are coping with this multiple sources of partial truth situation... |
Beta Was this translation helpful? Give feedback.
-
I'm struggling to fully understand the interaction between I have an asset with daily partitions, that depends on multiple assets that also have daily partitions. I would like to update the asset for all of the partitions that have been updated in any of the upstream assets. I am doing so with the materialization conditions: fifteen_mins_passed = AutomationCondition.cron_tick_passed(
"*/15 * * * *", cron_timezone="UTC"
)
any_deps_updated = AutomationCondition.any_deps_match(
AutomationCondition.newly_updated(),
)
fifteens_min_passed_and_any_deps_updated = fifteen_mins_passed & any_deps_updated
@asset(
...
automation_condition=fifteens_min_passed_and_any_deps_updated
)
... This works for many conditions, however I am finding that it can fail if multiple partitions are updated for a dependency in the same 15 minute window. If I materialize the partition Can you provide any info on what might be causing this problem and how to achieve the conditions we're looking for? |
Beta Was this translation helpful? Give feedback.
-
I'm really liking this. A few questions though.
|
Beta Was this translation helpful? Give feedback.
-
I am struggling to get even the simplest example I can think of of declarative automation working, is there any proper documentation with working examples? This is I think the simplest possible example and it seems to not work, in that if I materialize "my_first_asset" it runs, but then doesn't cause "my_second_asset" to materialize. Could you help provide examples of how this is supposed to work? from dagster import DailyPartitionsDefinition, asset
# Define daily partitions starting from 2025-01-01
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
@asset(partitions_def=daily_partitions)
def my_first_asset(context):
partition_date = context.partition_key
context.log.info(f"Materializing my_first_asset for partition: {partition_date}")
return f"Data for {partition_date}"
@asset(
partitions_def=daily_partitions,
deps=["my_first_asset"],
automation_condition=AutomationCondition.eager(),
)
def my_second_asset(context):
partition_date = context.partition_key
context.log.info(f"Materializing my_second_asset for partition: {partition_date}")
return f"Processed result" |
Beta Was this translation helpful? Give feedback.
-
Introduction
In Dagster 1.8, we are releasing the successor to the
AutoMaterializePolicy
system, which we’re calling “Declarative Automation”.We have several goals with these changes:
@schedule
or@sensor
to work around limitations of the asset-focused API.Docs for Declarative Automation can be found here.
Context
Over the past year and half, we’ve seen many users adopt the
AutoMaterializePolicy
abstraction, and find success with an asset-based orchestration model. However, there have been some persistent themes in the feedback regarding the current system.“It’s challenging to customize”
When the system was first released, there were only two policies available to the user:
AutoMaterializePolicy.eager()
andAutoMaterializePolicy.lazy()
. As more users adopted the system, it was clear that these options were too course-grained to capture many common use cases.The advent of the
AutoMaterializeRule
system formalized some of the logic internal to the evaluation system, and exposed it to the user. This helped provide some levers for customization, but we’ve learned that this is fundamentally insufficient for achieving our vision for this product area.AutoMaterializeRule.materialize_on_missing()
needs to contain complex logic for handling state transitions to ensure that (e.g.) an asset doesn’t continually get requested if the previously requested run failed. This level of complexity makes it nearly impossible to build an intuition as to how the system is functioning, as it essentially “hides” critical inputs that the system is using to make its decisions.“It’s challenging to operate at scale”
As another quick history lesson, the original
AutoMaterializePolicy.eager()
had no rate-limiting behavior at all. This meant that if you were to add a new partitioned asset with an eager policy to your code location, AMP would attempt to launch a run for every single partition of the asset (in essence, a “surprise backfill”). This particular issue was resolved with the addition of themax_materializations_per_minute
parameter (which defaults to1
), but illustrates the general category of problem where a seemingly small change can result in a huge impact.At a high level, users need to be able to be confident that their changes will have a defined and limited scope, and we believe we can do significantly better here.
Introducing: Declarative Automation
Declarative Automation is the term we are using to describe the new suite of interfaces we’ve designed to address the weaknesses of the Auto-materialization system. This term is intentionally general — we plan on expanding this system over time to support automating things such as asset observations and asset checks in addition to materializations.
There will be no breaking changes to
AutoMaterializePolicy
made in the Dagster 1.8 release and all existing code will continue to function, but theAutoMaterializePolicy
andAutoMaterializeRule
interfaces will be marked as deprecated. We will continue to support these APIs until at least Q1 2025, and are open to feedback on timeline.However, we believe Declarative Automation will provide a vastly superior experience to current-day APIs, and will generally provide a superset of the capabilities offered today.
Automation Conditions
The core primitive of Declarative Automation is the
AutomationCondition
, which encodes a particular state that an asset may be in.Conditions can be combined together using a variety of operators to build more complex expressions, allowing you to precisely describe the conditions under which an asset ought to be materialized.
Similarly to AMPs, you’ll be able to attach a condition to an asset as follows:
In common cases, you will not need to manually mix and match conditions, and can instead use one of the three “out-of-the-box” policies:
AutomationCondition.eager()
This policy is intended to be a drop-in replacement for the current-day
AutoMaterializePolicy.eager()
, and replicates its behavior with a couple key exceptions.First, it is purely forward-looking. This means that it only reacts to events which happen after the policy is added to the asset, and will (by default) only materialize the latest partition of a time-partitioned asset. This handles the “surprise backfill” problem more elegantly than the
max_materializations_per_minute
parameter of today’s world.Secondly, it will drop the
skip_on_parent_outdated
rule. This is a fairly complex bit of logic which prevents materializations in cases where ancestors are in an “unsynced” state. This requires recursing up the entire asset graph, which means that events happening far upstream can prevent assets from materializing in a timely manner. This was one of the more common rules for users to manually disable. In some sense, the main purpose of this rule was to prevent materializations when we knew a parent was going to get materialized again in the near future (under the assumption that all assets in the graph were on the “eager” policy). This functionality will be replaced by logic that prevents materializing the asset if any of its parents are currently in progress.The net result of these changes is a policy which is simpler to understand, and aligns more closely with what we’ve observed people to expect the behavior to be.
AutomationCondition.on_cron(”@daily”)
One of the more common things we’ve observed users doing with the
AutoMaterializeRule
system is to create a sort of “distributed cron schedule”. Let’s take the following example:Here, if we took the naive approach, and simply materialized the upstream assets exactly on the hour, and the downstream asset exactly every 3 hours, then the downstream asset would get kicked off before its parents had time to complete, and so would perpetually be executing on old data.
This issue can be solved by attempting to materialize each asset once per cron schedule tick, but only after all of its parent assets have updated since that tick. This lets you independently set cron schedules on individual assets without needing to worry about the specific cadences of its upstreams.
In the past, this would look like:
We are now elevating this as a first-class use case with its own dedicated static constructor:
AutomationCondition.any_downstream_conditions()
This is intended to serve as a replacement for and generalization of the existing
AutoMaterializePolicy.lazy()
.One of the core benefits of the original freshness-based scheduling system was the ability to materialize certain assets only when they are needed to satisfy downstream policies, rather than requiring you to set an explicit policy on each individual asset.
However, the way to achieve this with an
AutoMaterializePolicy
is highly-coupled with the (now-deprecated)FreshnessPolicy
system. The idea was that you could simply define a target freshness and the system would automatically do what it needed to do to adhere to that requirement. While nice in theory, the fundamental issue is that there is an infinite spectrum of ways to satisfy any givenFreshnessPolicy
, with different tradeoffs between reducing the number of executions and reducing the likelihood of missing that freshness guarantee (for example, materializing the asset continuously would do a great job of meeting freshness requirements but is obviously not a desirable solution).With that in mind, we decided to decouple these systems. Instead, users can express constraints on the required frequency of updates purely through
AutomationConditions
, and useAutomationCondition.any_downstream_conditions()
on upstream assets to automatically “inherit” the condition(s) of downstream assets.Take the following example:
In this case, we’ve defined that the downstream assets should run on some specific cadences (every three hours / daily), but the upstream assets only exist in order to enable these downstreams.
Rather than needing to explicitly figure out which assets need to update at which frequency to enable those downstreams, you can simply give those assets a
any_downstream_conditions()
automation condition, allowing those requirements to propagate upwards through the graph.This helps reduce unnecessary computation for these assets, as they’ll exclusively be executed in cases where some downstream with an explicit policy needs to execute.
Customizing Conditions
While the above policies can handle many use cases, more specialized needs will always crop up. The core of the evaluation engine starts with simple conditions representing basic properties or statuses of an asset, for example:
AutomationCondition.missing()
: True if the asset has never been materializedAutomationCondition.in_progress()
: True if there is an in-progress run targeting this assetAutomationCondition.in_latest_time_window(<timedelta>)
: True for any time-window-partition of the asset from within the last (e.g.) 12 hoursThese conditions may be composed together using the standard boolean operators, e.g.:
AutomationCondition.missing() & ~AutomationCondition.in_progress()
: True if this asset has never been materialized and is not part of an in-progress runA history of these evaluations can be viewed in the UI, giving you detailed information on exactly which sub-conditions were true on any given evaluation:
More complex operators also exist. For example:
AutomationCondition.any_deps_match(<condition>)
: True if any dependencies of this asset match an arbitrary conditionAutomationCondition.<condition>.since(<condition>)
: True if the first condition has become true since the second condition became true.Example:
Let’s bring back the cron-based schedule policy from above:
There’s a lot going on here, but let’s re-implement this from scratch using the new
AutomationCondition
APIs. At a high level, we just want a condition that materializes an asset when the following things are both true:AutoMaterializeRule.materialize_on_cron
)AutoMaterializeRule.skip_on_not_all_parents_updated_since_cron
)This can be implemented with
AutomationConditions
as follows:In the rules-based system, you needed bespoke rules to separately handle each of these two cases, but here you can use simple components (
cron_tick_passed
,newly_updated
) and use generic operators to combine them together.We see this composability as a massive win in terms of flexibility. The combinatorial possibilities of this expression-based system are massive, and each individual condition added to the suite will automatically benefit from this framework. For example,
all_deps_match
andany_deps_match
support.allow()
and.ignore()
methods, meaning you can target these conditions at specific parents:These building blocks are significantly simpler and more powerful than the analogs in the
AutoMaterializeRule
system.Custom Python-Based Conditions [experimental]
Some automation use cases require custom business logic that cannot be expressed with off-the-shelf components. In these cases, you can define AutomationConditions which execute arbitrary python code, and compose them with the built-in conditions.
In order to execute arbitrary Python code, you'll need to make it possible for your sensor evaluations to execute on your user code server. This is the same place that your @sensor methods are evaluated.
To define a custom automation condition, you will then create your own subclass of AutomationCondition, defining the evaluate() method.
see more details here
Unit Testing
As the flexibility of this feature increases, so does the need for building confidence that the condition you’ve created does what you expect. Dagster will provide unit-testing APIs to allow you to validate that your policy reacts as expected to various events:
While this is somewhat bare-bones at the moment, we plan on continuing to expand this interface to make common testing patterns as ergonomic as possible, and would love your feedback on how you see yourself using this.
AutomationCondition Sensors
The centralized Daemon approach is risky (a failure or slowdown when evaluating any asset in the global asset graph impacts all other assets), and inflexible (it is impossible to target different behaviors at different sets of assets).
In 1.8, we'll be defaulting to a sensor-based approach. Each asset with an
AutomationCondition
defined will be handled by anAutomationConditionSensorDefinition
, rather than a centralized Daemon. By default, a singleAutomationConditionSensorDefinition
is created per code location, and will target all assets within that code location.However, in cases where a single code location handles multiple disparate concerns, it can be useful to fully isolate the operation of different sets of assets. To do so, you’ll be able to explicitly define multiple
AutomationConditionSensorDefinition
objects:These can be viewed in the "Sensors" tab in the UI like any other sensor:
What's Next...
Mass-applying Conditions to assets
We plan to make it easy to apply arbitrary properties (i.e. not just
AutomationConditions
) to large sets of assets. More details will be provided at a later date, but at a high level this would look something like the following:Automating a Group of Assets to materialize on a single automation condition evaluation
We're looking at seeing a global_condition value on a named group or list of assets which is evaluated just a single time, and based on the result of that evaluation, either ALL of the selected assets will be launched, or NONE of them will be.
This is a much closer analog to something like a ScheduleDefinition, and is a much closer mental model match to existing concepts.
Better Run Batching
The AMP system often ends up needing to perform “shadow backfills”, which separate out a single semantic intent into a set of independent runs. It might “know” that it needs to update an asset and all of its downstreams (particularly when using an
eager()
policy), but is unable to launch all of those assets in a single run.The core reason behind this is that Dagster does not currently support creating runs which target assets with different
PartitionsDefinitions
in a first-class way. We are working on changing this, which will in turn allow the Declarative Automation system to neatly combine together runs into larger (and more logical) batches. In essence, AMP will be capable of emitting backfills, but these backfills will be more of a first-class object than they are in their current form.More details will follow in a separate Github Discussion!
Improving the observability on Overview UIs
Much like assets managed with AMPs, assets triggered by Declarative Automation are shown as 'ad hoc' materializations on the runs page, which we recognize is an inadequate way to share data about the state of your various assets. This is because this page was originally built under the assumption that assets would be bundled into Jobs, and Jobs could be the organizing feature for displaying information about the state of a process.
We are currently working on designs for how to more effectively represent assets triggered outside of Jobs, and should have designs available to execute within the February - March 2025 time frame.
Call to Action
We’d love to hear your thoughts! We know that this is a large set of changes to a widely-used system, and are committed to making the migration path as smooth as possible. We strongly believe that the end result will be a significantly more stable and flexible system.
If you have concerns, questions, use cases you want addressed, or just generally have feedback regarding these changes, don’t hesitate to comment here, or reach out to us on Slack.
Beta Was this translation helpful? Give feedback.
All reactions