Skip to content

Commit

Permalink
Add ability to set timeout health check in workflows (#58)
Browse files Browse the repository at this point in the history
* Add ability to set timeout health check in workflows

---------

Co-authored-by: Brent Johnson <[email protected]>
  • Loading branch information
brent-johnson and Brent Johnson authored Nov 10, 2023
1 parent b2af63d commit ae84716
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 4 deletions.
1 change: 1 addition & 0 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
tasks=tasks,
git_source=git_conf,
tags=workflow.tags,
health=workflow.health,
job_clusters=[JobsJobClusters(**c) for c in workflow_clusters],
schedule=self.workflow_obj_to_schedule(workflow),
max_concurrent_runs=workflow.max_concurrent_runs,
Expand Down
7 changes: 6 additions & 1 deletion brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
BrickflowProjectDeploymentSettings,
get_brickflow_version,
)
from brickflow.bundles.model import JobsTasksNotebookTask, JobsTasksNotificationSettings
from brickflow.bundles.model import (
JobsTasksNotebookTask,
JobsTasksNotificationSettings,
JobsTasksHealthRules,
)
from brickflow.cli.projects import DEFAULT_BRICKFLOW_VERSION_MODE
from brickflow.context import (
BrickflowBuiltInTaskVariables,
Expand Down Expand Up @@ -479,6 +483,7 @@ class Task:
task_settings: Optional[TaskSettings] = None
custom_execute_callback: Optional[Callable] = None
ensure_brickflow_plugins: bool = False
health: Optional[List[JobsTasksHealthRules]] = None

def __post_init__(self) -> None:
self.is_valid_task_signature()
Expand Down
3 changes: 3 additions & 0 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
JobsWebhookNotifications,
JobsNotificationSettings,
JobsTrigger,
JobsHealthRules,
)
from brickflow.context import BrickflowInternalVariables
from brickflow.engine import ROOT_NODE
Expand Down Expand Up @@ -116,6 +117,8 @@ class Workflow:
schedule_pause_status: str = "UNPAUSED"
default_cluster: Optional[Cluster] = None
clusters: List[Cluster] = field(default_factory=lambda: [])

health: Optional[List[JobsHealthRules]] = None
default_task_settings: TaskSettings = TaskSettings()
email_notifications: Optional[WorkflowEmailNotifications] = None
webhook_notifications: Optional[WorkflowWebhookNotifications] = None
Expand Down
15 changes: 12 additions & 3 deletions docs/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ wf = Workflow( # (1)!
email_notifications=EmailNotifications(
on_start=["[email protected]"],
on_success=["[email protected]"],
on_failure=["[email protected]"]
on_failure=["[email protected]"],
on_duration_warning_threshold_exceeded=["[email protected]"]
),
timeout_seconds=timedelta(hours=2).seconds
),
Expand All @@ -44,6 +45,11 @@ wf = Workflow( # (1)!
"catalog": "development",
"database": "your_database"
},
health = { # (16)!
"metric": "RUN_DURATION_SECONDS",
"op": "GREATER_THAN",
"value": 7200
}
)


Expand All @@ -67,6 +73,7 @@ def task_function(*, test="var"):
13. Define the common task parameters that can be used in all the tasks
14. Define a workflow task and associate it to the workflow
15. Define the schedule pause status. It is defaulted to "UNPAUSED"
16. Define health check condition that triggers duration warning threshold exceeded notifications

### Clusters

Expand Down Expand Up @@ -207,7 +214,8 @@ default_task_settings=TaskSettings(
email_notifications=EmailNotifications(
on_start=["[email protected]"],
on_success=["[email protected]"],
on_failure=["[email protected]"]
on_failure=["[email protected]"],
on_duration_warning_threshold_exceeded=["[email protected]"]
),
timeout_seconds=timedelta(hours=2).seconds,
max_retries=2,
Expand Down Expand Up @@ -249,4 +257,5 @@ common_task_parameters={
"catalog": "development",
"database": "your_database"
}
```
```

5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/dev_bundle_polyrepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ environments:
jobs:
some_wf:
email_notifications: null
health: null
git_source:
git_commit: a
git_provider: github
Expand Down Expand Up @@ -64,6 +65,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source:
git_commit: a
git_provider: github
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/local_bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source: null
job_clusters: []
max_concurrent_runs: 1.0
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/expected_bundles/local_bundle_prefix_suffix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ environments:
email_notifications: null
notification_settings: null
webhook_notifications: null
health:
rules:
- metric: "RUN_DURATION_SECONDS"
op: "GREATER_THAN"
value: 7200.0
git_source: null
job_clusters: []
max_concurrent_runs: 1.0
Expand Down
5 changes: 5 additions & 0 deletions tests/codegen/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
run_as_user="[email protected]",
tags={"test": "test2"},
common_task_parameters={"all_tasks1": "test", "all_tasks3": "123"}, # type: ignore
health={
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200.0}
]
},
)


Expand Down
5 changes: 5 additions & 0 deletions tests/engine/sample_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
),
tags={"test": "test2"},
common_task_parameters={"all_tasks1": "test", "all_tasks3": "123"}, # type: ignore
health={
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200}
]
},
)


Expand Down
7 changes: 7 additions & 0 deletions tests/engine/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ def test_tags(self):
def test_default_task_settings(self):
assert wf.default_task_settings is not None

def test_health_settings(self):
assert wf.health == {
"rules": [
{"metric": "RUN_DURATION_SECONDS", "op": "GREATER_THAN", "value": 7200}
]
}

def test_user(self):
principal = "[email protected]"
u = User(principal)
Expand Down

0 comments on commit ae84716

Please sign in to comment.