Skip to content

Commit

Permalink
Add required 'begin' config support for microbatch models (#10756)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 24, 2024
1 parent a1e4753 commit 730e40a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 30 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240923-155903.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support required 'begin' config for microbatch models
time: 2024-09-23T15:59:03.924079+01:00
custom:
Author: michelleark
Issue: "10701"
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class NodeConfig(NodeAndTestConfig):
incremental_strategy: Optional[str] = None
batch_size: Any = None
lookback: Any = 0
begin: Any = None
persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
default_factory=list,
Expand Down
29 changes: 14 additions & 15 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,45 +42,44 @@ def build_end_time(self):
def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
If the checkpoint is `None`, then `None` will be returned as a checkpoint is necessary
If the checkpoint is `None`, or this is the first run of a microbatch model, then the
model's configured `begin` value will be returned as a checkpoint is necessary
to build a start time. This is because we build the start time relative to the checkpoint
via the batchsize and offset, and we cannot offset a checkpoint if there is no checkpoint.
"""
assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size

# Use event_time_start if it is provided.
if self.event_time_start:
return MicrobatchBuilder.truncate_timestamp(
self.event_time_start, self.model.config.batch_size
)
return MicrobatchBuilder.truncate_timestamp(self.event_time_start, batch_size)

# First run, use model's configured 'begin' as start.
if not self.is_incremental or checkpoint is None:
# TODO: return new model-level configuration or raise error
return None
if not self.model.config.begin:
raise DbtRuntimeError(
f"Microbatch model '{self.model.name}' requires a 'begin' configuration."
)

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size
return MicrobatchBuilder.truncate_timestamp(self.model.config.begin, batch_size)

lookback = self.model.config.lookback
start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

return start

def build_batches(
self, start: Optional[datetime], end: datetime
) -> List[Tuple[Optional[datetime], datetime]]:
def build_batches(self, start: datetime, end: datetime) -> List[Tuple[datetime, datetime]]:
"""
Given a start and end datetime, builds a list of batches where each batch is
the size of the model's batch_size.
"""
if start is None:
return [(start, end)]

batch_size = self.model.config.batch_size
curr_batch_start: datetime = start
curr_batch_end: datetime = MicrobatchBuilder.offset_timestamp(
curr_batch_start, batch_size, 1
)

batches: List[Tuple[Optional[datetime], datetime]] = [(curr_batch_start, curr_batch_end)]
batches: List[Tuple[datetime, datetime]] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_rendered_model_config(**updates):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
}
result.update(updates)
return result
Expand Down Expand Up @@ -80,6 +81,7 @@ def get_rendered_seed_config(**updates):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
}
result.update(updates)
return result
Expand Down Expand Up @@ -129,6 +131,7 @@ def get_rendered_snapshot_config(**updates):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
}
result.update(updates)
return result
Expand Down
9 changes: 9 additions & 0 deletions tests/functional/list/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def expect_snapshot_output(self, happy_path_project): # noqa: F811
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"unique_id": "snapshot.test.my_snapshot",
"original_file_path": normalize("snapshots/snapshot.sql"),
Expand Down Expand Up @@ -133,6 +134,7 @@ def expect_analyses_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"unique_id": "analysis.test.a",
"original_file_path": normalize("analyses/a.sql"),
Expand Down Expand Up @@ -197,6 +199,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/ephemeral.sql"),
"unique_id": "model.test.ephemeral",
Expand Down Expand Up @@ -238,6 +241,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/incremental.sql"),
"unique_id": "model.test.incremental",
Expand Down Expand Up @@ -279,6 +283,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/sub/inner.sql"),
"unique_id": "model.test.inner",
Expand Down Expand Up @@ -320,6 +325,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/metricflow_time_spine.sql"),
"unique_id": "model.test.metricflow_time_spine",
Expand Down Expand Up @@ -361,6 +367,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/metricflow_time_spine_second.sql"),
"unique_id": "model.test.metricflow_time_spine_second",
Expand Down Expand Up @@ -402,6 +409,7 @@ def expect_model_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"original_file_path": normalize("models/outer.sql"),
"unique_id": "model.test.outer",
Expand Down Expand Up @@ -524,6 +532,7 @@ def expect_seed_output(self):
"event_time": None,
"lookback": 0,
"batch_size": None,
"begin": None,
},
"depends_on": {"macros": []},
"unique_id": "seed.test.seed",
Expand Down
16 changes: 10 additions & 6 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"""

microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
"""

Expand Down Expand Up @@ -68,7 +68,7 @@
"""

microbatch_model_calling_source_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ source('seed_sources', 'raw_source') }}
"""

Expand Down Expand Up @@ -123,10 +123,12 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg
type(project.adapter), "valid_incremental_strategies", lambda _: ["microbatch"]
):
# Initial run
run_dbt(["run"])
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])

# Incremental run uses custom strategy
_, logs = run_dbt_and_capture(["run"])
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run"])
assert "custom microbatch strategy" in logs


Expand All @@ -141,10 +143,12 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg
type(project.adapter), "valid_incremental_strategies", lambda _: []
):
# Initial run
run_dbt(["run"])
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])

# Incremental run fails
_, logs = run_dbt_and_capture(["run"], expect_pass=False)
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run"], expect_pass=False)
assert "'microbatch' is not valid" in logs


Expand Down
22 changes: 13 additions & 9 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dbt.artifacts.resources.types import BatchSize
from dbt.materializations.incremental.microbatch import MicrobatchBuilder

MODEL_CONFIG_BEGIN = datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC)


class TestMicrobatchBuilder:
@pytest.fixture(scope="class")
Expand All @@ -17,6 +19,7 @@ def microbatch_model(self):
model.config = mock.MagicMock(NodeConfig)
model.config.materialized = "incremental"
model.config.incremental_strategy = "microbatch"
model.config.begin = MODEL_CONFIG_BEGIN

return model

Expand Down Expand Up @@ -67,7 +70,8 @@ def test_build_end_time(
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
BatchSize.day,
0,
None,
# is_incremental: False => model.config.begin
MODEL_CONFIG_BEGIN,
),
# BatchSize.year
(
Expand All @@ -93,8 +97,8 @@ def test_build_end_time(
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
BatchSize.year,
0,
# is_incremental=False + no start_time -> None
None,
# is_incremental=False + no start_time -> model.config.begin
MODEL_CONFIG_BEGIN,
),
(
True,
Expand Down Expand Up @@ -136,8 +140,8 @@ def test_build_end_time(
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
BatchSize.month,
0,
# is_incremental=False + no start_time -> None
None,
# is_incremental=False + no start_time -> model.config.begin
MODEL_CONFIG_BEGIN,
),
(
True,
Expand Down Expand Up @@ -179,8 +183,8 @@ def test_build_end_time(
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
BatchSize.day,
0,
# is_incremental=False + no start_time -> None
None,
# is_incremental=False + no start_time -> model.config.begin
MODEL_CONFIG_BEGIN,
),
(
True,
Expand Down Expand Up @@ -222,8 +226,8 @@ def test_build_end_time(
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
BatchSize.hour,
0,
# is_incremental=False + no start_time -> None
None,
# is_incremental=False + no start_time -> model.config.begin
MODEL_CONFIG_BEGIN,
),
(
True,
Expand Down

0 comments on commit 730e40a

Please sign in to comment.