Skip to content

Commit

Permalink
Microbatch: batched execution (#10677)
Browse files Browse the repository at this point in the history
* initial rough-in with CLI flags

* dbt-adapters testing against event-time-ref-filtering

* fix TestList

* Checkpoint

* fix tests

* add event_time_start params to build

* rename configs

* Gate resolve_event_time_filter via micro batch strategy and fix strptime usage

* Add unit test for resolve_event_time_filter

* Additional unit tests for `resolve_event_time_filter` to ensure lookback + batch_size work

* Remove extraneous comments and print statements from resolve_event_time_filter

* Fixup microbatch functional tests to use microbatch strategy

* Gate microbatch functionality behind env_var while in beta

* Add comment about how _is_incremental should be removed

* Improve `event_time_start/end` cli parameters to auto convert to datetime objects

* for testing: dbt-postgres 'microbatch' strategy

* rough in: chunked backfills

* partial failure of microbatch runs

* decouple run result methods

* initial refactor

* rename configs to __dbt_internal

* update compiled_code in context after re-compilation

* finish rename of context vars

* changelog entry

* fix patch_microbatch_end_time

* refactor into MicrobatchBuilder

* fix provider unit tests + add unit tests for MicrobatchBuilder

* add TestMicrobatchJinjaContextVarsAvailable

* unit test offset + truncate timestamp methods

* Remove pairing.md file

* Add tying to microbatch specific functions added in `task/run.py`

* Add doc strings to microbatch.py functions and classes

* Set microbatch node status to `ERROR` if all batches for node failed

* Fire an event for batch exceptions instead of directly printing

* Fix firing of failed microbatch log event

---------

Co-authored-by: Quigley Malcolm <[email protected]>
  • Loading branch information
MichelleArk and QMalcolm authored Sep 18, 2024
1 parent 16f5023 commit 8fe5ea1
Show file tree
Hide file tree
Showing 11 changed files with 1,114 additions and 341 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240913-232111.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Execute microbatch models in batches
time: 2024-09-13T23:21:11.935434-04:00
custom:
Author: michelleark
Issue: "10700"
75 changes: 2 additions & 73 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import os
from copy import deepcopy
from datetime import datetime, timedelta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -17,7 +16,6 @@
Union,
)

import pytz
from typing_extensions import Protocol

from dbt import selected_resources
Expand All @@ -31,7 +29,6 @@
get_adapter_type_names,
)
from dbt.artifacts.resources import NodeConfig, NodeVersion, RefArgs, SourceConfig
from dbt.artifacts.resources.types import BatchSize
from dbt.clients.jinja import (
MacroGenerator,
MacroStack,
Expand Down Expand Up @@ -234,66 +231,6 @@ def Relation(self):
def resolve_limit(self) -> Optional[int]:
return 0 if getattr(self.config.args, "EMPTY", False) else None

def _build_end_time(self) -> Optional[datetime]:
return datetime.now(tz=pytz.utc)

def _build_start_time(
self, checkpoint: Optional[datetime], is_incremental: bool
) -> Optional[datetime]:
if not is_incremental or checkpoint is None:
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size
if batch_size is None:
raise DbtRuntimeError(f"The model `{self.model.name}` requires a `batch_size`")

lookback = self.model.config.lookback
if batch_size == BatchSize.hour:
start = datetime(
checkpoint.year,
checkpoint.month,
checkpoint.day,
checkpoint.hour,
0,
0,
0,
pytz.utc,
) - timedelta(hours=lookback)
elif batch_size == BatchSize.day:
start = datetime(
checkpoint.year, checkpoint.month, checkpoint.day, 0, 0, 0, 0, pytz.utc
) - timedelta(days=lookback)
elif batch_size == BatchSize.month:
start = datetime(checkpoint.year, checkpoint.month, 1, 0, 0, 0, 0, pytz.utc)
for _ in range(lookback):
start = start - timedelta(days=1)
start = datetime(start.year, start.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
start = datetime(checkpoint.year - lookback, 1, 1, 0, 0, 0, 0, pytz.utc)
else:
raise DbtInternalError(
f"Batch size `{batch_size}` is not handled during batch calculation"
)

return start

def _is_incremental(self) -> bool:
# TODO: Remove. This is a temporary method. We're working with adapters on
# a strategy to ensure we can access the `is_incremental` logic without drift
relation_info = self.Relation.create_from(self.config, self.model)
relation = self.db_wrapper.get_relation(
relation_info.database, relation_info.schema, relation_info.name
)
return (
relation is not None
and relation.type == "table"
and self.model.config.materialized == "incremental"
and not (
getattr(self.config.args, "FULL_REFRESH", False) or self.model.config.full_refresh
)
)

def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
Expand All @@ -303,16 +240,8 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF
and self.model.config.materialized == "incremental"
and self.model.config.incremental_strategy == "microbatch"
):
is_incremental = self._is_incremental()
end: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_END", None)
end = end.replace(tzinfo=pytz.UTC) if end else self._build_end_time()

start: Optional[datetime] = getattr(self.config.args, "EVENT_TIME_START", None)
start = (
start.replace(tzinfo=pytz.UTC)
if start
else self._build_start_time(checkpoint=end, is_incremental=is_incremental)
)
start = self.model.config.get("__dbt_internal_microbatch_event_time_start")
end = self.model.config.get("__dbt_internal_microbatch_event_time_end")

if start is not None or end is not None:
event_time_filter = EventTimeFilter(
Expand Down
Empty file.
Empty file.
164 changes: 164 additions & 0 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from datetime import datetime, timedelta
from typing import List, Optional, Tuple

import pytz

from dbt.artifacts.resources.types import BatchSize
from dbt.contracts.graph.nodes import ModelNode, NodeConfig
from dbt.exceptions import DbtInternalError, DbtRuntimeError


class MicrobatchBuilder:
"""A utility class for building microbatch definitions associated with a specific model"""

def __init__(
self,
model: ModelNode,
is_incremental: bool,
event_time_start: Optional[datetime],
event_time_end: Optional[datetime],
):
if model.config.incremental_strategy != "microbatch":
raise DbtInternalError(
f"Model '{model.name}' does not use 'microbatch' incremental_strategy."
)
self.model = model

if self.model.config.batch_size is None:
raise DbtRuntimeError(
f"Microbatch model '{self.model.name}' does not have a 'batch_size' config (one of {[batch_size.value for batch_size in BatchSize]}) specificed."
)

self.is_incremental = is_incremental
self.event_time_start = (
event_time_start.replace(tzinfo=pytz.UTC) if event_time_start else None
)
self.event_time_end = event_time_end.replace(tzinfo=pytz.UTC) if event_time_end else None

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or datetime.now(tz=pytz.utc)

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
If the checkpoint is `None`, then `None` will be returned as a checkpoint is necessary
to build a start time. This is because we build the start time relative to the checkpoint
via the batchsize and offset, and we cannot offset a checkpoint if there is no checkpoint.
"""

if self.event_time_start:
return MicrobatchBuilder.truncate_timestamp(
self.event_time_start, self.model.config.batch_size
)

if not self.is_incremental or checkpoint is None:
# TODO: return new model-level configuration or raise error
return None

assert isinstance(self.model.config, NodeConfig)
batch_size = self.model.config.batch_size

lookback = self.model.config.lookback
start = MicrobatchBuilder.offset_timestamp(checkpoint, batch_size, -1 * lookback)

return start

def build_batches(
self, start: Optional[datetime], end: datetime
) -> List[Tuple[Optional[datetime], datetime]]:
"""
Given a start and end datetime, builds a list of batches where each batch is
the size of the model's batch_size.
"""
if start is None:
return [(start, end)]

batch_size = self.model.config.batch_size
curr_batch_start: datetime = start
curr_batch_end: datetime = MicrobatchBuilder.offset_timestamp(
curr_batch_start, batch_size, 1
)

batches: List[Tuple[Optional[datetime], datetime]] = [(curr_batch_start, curr_batch_end)]
while curr_batch_end <= end:
curr_batch_start = curr_batch_end
curr_batch_end = MicrobatchBuilder.offset_timestamp(curr_batch_start, batch_size, 1)
batches.append((curr_batch_start, curr_batch_end))

# use exact end value as stop
batches[-1] = (batches[-1][0], end)

return batches

@staticmethod
def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime:
"""Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size.
Note: It's important to understand that the offset applies to the truncated timestamp, not
the origin timestamp. Thus being offset by a day isn't relative to the any given hour that day,
but relative to the start of the day. So if the timestamp is the very end of a day, 2024-09-17 23:59:59,
you have a batch size of a day, and an offset of +1, then the returned value ends up being only one
second later, 2024-09-18 00:00:00.
2024-09-17 16:06:00 + Batchsize.hour -1 -> 2024-09-17 15:00:00
2024-09-17 16:06:00 + Batchsize.hour +1 -> 2024-09-17 17:00:00
2024-09-17 16:06:00 + Batchsize.day -1 -> 2024-09-16 00:00:00
2024-09-17 16:06:00 + Batchsize.day +1 -> 2024-09-18 00:00:00
2024-09-17 16:06:00 + Batchsize.month -1 -> 2024-08-01 00:00:00
2024-09-17 16:06:00 + Batchsize.month +1 -> 2024-10-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year -1 -> 2023-01-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year +1 -> 2025-01-01 00:00:00
"""
truncated = MicrobatchBuilder.truncate_timestamp(timestamp, batch_size)

offset_timestamp: datetime
if batch_size == BatchSize.hour:
offset_timestamp = truncated + timedelta(hours=offset)
elif batch_size == BatchSize.day:
offset_timestamp = truncated + timedelta(days=offset)
elif batch_size == BatchSize.month:
offset_timestamp = truncated
for _ in range(abs(offset)):
if offset < 0:
offset_timestamp = offset_timestamp - timedelta(days=1)
else:
offset_timestamp = offset_timestamp + timedelta(days=31)
offset_timestamp = MicrobatchBuilder.truncate_timestamp(
offset_timestamp, batch_size
)
elif batch_size == BatchSize.year:
offset_timestamp = truncated.replace(year=truncated.year + offset)

return offset_timestamp

@staticmethod
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
"""Truncates the passed in timestamp based on the batch_size.
2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17 16:00:00
2024-09-17 16:06:00 + Batchsize.day -> 2024-09-17 00:00:00
2024-09-17 16:06:00 + Batchsize.month -> 2024-09-01 00:00:00
2024-09-17 16:06:00 + Batchsize.year -> 2024-01-01 00:00:00
"""
if batch_size == BatchSize.hour:
truncated = datetime(
timestamp.year,
timestamp.month,
timestamp.day,
timestamp.hour,
0,
0,
0,
pytz.utc,
)
elif batch_size == BatchSize.day:
truncated = datetime(
timestamp.year, timestamp.month, timestamp.day, 0, 0, 0, 0, pytz.utc
)
elif batch_size == BatchSize.month:
truncated = datetime(timestamp.year, timestamp.month, 1, 0, 0, 0, 0, pytz.utc)
elif batch_size == BatchSize.year:
truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc)

return truncated
253 changes: 214 additions & 39 deletions core/dbt/task/docs/index.html

Large diffs are not rendered by default.

Loading

0 comments on commit 8fe5ea1

Please sign in to comment.