Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add optional --hours-back for edr monitor #1549

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
10 changes: 5 additions & 5 deletions elementary/monitor/api/alerts/alerts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, List
from typing import Dict, List, Optional

from elementary.clients.api.api_client import APIClient
from elementary.clients.dbt.dbt_runner import DbtRunner
Expand All @@ -24,13 +24,13 @@ def __init__(
config=self.config,
)

def get_new_alerts(self, days_back: int) -> List[PendingAlertSchema]:
pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back)
def get_new_alerts(self, days_back: int, hours_back: Optional[int] = None) -> List[PendingAlertSchema]:
pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back, hours_back=hours_back)
return pending_alerts

def get_alerts_last_sent_times(self, days_back: int) -> Dict[str, datetime]:
def get_alerts_last_sent_times(self, days_back: int, hours_back: Optional[int] = None) -> Dict[str, datetime]:
alerts_last_sent_times = self.alerts_fetcher.query_last_alert_times(
days_back=days_back
days_back=days_back, hours_back=hours_back
)
last_sent_times = dict()
for alert_class_id, last_sent_time_as_string in alerts_last_sent_times.items():
Expand Down
13 changes: 12 additions & 1 deletion elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def decorator(func):
default=1 if cmd == Command.MONITOR else 7,
help="Set a limit to how far back should edr collect data.",
)(func)
if cmd in (Command.MONITOR):
func = click.option(
"--hours-back",
"-h",
type=int,
default=None,
help="Optionally set an hourly limit to how far back should edr collect data. If provided, it overrides --days-back.",
)(func)
func = click.option(
"--env",
type=str,
Expand Down Expand Up @@ -166,6 +174,7 @@ def get_cli_properties() -> dict:
full_refresh_dbt_package = params.get("full_refresh_dbt_package")
select = params.get("select")
days_back = params.get("days_back")
hours_back = params.get("hours_back")
timezone = params.get("timezone")
group_by = params.get("group_by")
suppression_interval = params.get("suppression_interval")
Expand All @@ -177,6 +186,7 @@ def get_cli_properties() -> dict:
"full_refresh_dbt_package": full_refresh_dbt_package,
"select": select,
"days_back": days_back,
"hours_back": hours_back,
"timezone": timezone,
"group_by": group_by,
"suppression_interval": suppression_interval,
Expand Down Expand Up @@ -272,6 +282,7 @@ def get_cli_properties() -> dict:
def monitor(
ctx,
days_back,
hours_back,
slack_webhook,
deprecated_slack_webhook,
slack_token,
Expand Down Expand Up @@ -362,7 +373,7 @@ def monitor(
Command.MONITOR, get_cli_properties(), ctx.command.name
)
success = data_monitoring.run_alerts(
days_back, full_refresh_dbt_package, dbt_vars=vars
days_back, hours_back, full_refresh_dbt_package, dbt_vars=vars
)
anonymous_tracking.track_cli_end(
Command.MONITOR, data_monitoring.properties(), ctx.command.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ def _get_integration_client(self) -> BaseIntegration:
def _populate_data(
self,
days_back: Optional[int] = None,
hours_back: Optional[int] = None,
dbt_full_refresh: bool = False,
dbt_vars: Optional[dict] = None,
) -> bool:
logger.info("Running internal dbt run to populate alerts")
vars = dbt_vars or dict()
if days_back:
vars.update(days_back=days_back)
if hours_back:
vars.update(hours_back=hours_back)
success = self.internal_dbt_runner.run(
models="elementary_cli.alerts.alerts_v2",
full_refresh=dbt_full_refresh,
Expand All @@ -86,17 +89,17 @@ def _populate_data(

return success

def _fetch_data(self, days_back: int) -> List[PendingAlertSchema]:
def _fetch_data(self, days_back: int, hours_back: Optional[int] = None) -> List[PendingAlertSchema]:
return self.alerts_api.get_new_alerts(
days_back=days_back,
days_back=days_back, hours_back=hours_back,
)

def _filter_data(self, data: List[PendingAlertSchema]) -> List[PendingAlertSchema]:
return filter_alerts(data, alerts_filter=self.selector_filter)

def _fetch_last_sent_times(self, days_back: int) -> Dict[str, datetime]:
def _fetch_last_sent_times(self, days_back: int, hours_back: Optional[int] = None) -> Dict[str, datetime]:
return self.alerts_api.get_alerts_last_sent_times(
days_back=days_back,
days_back=days_back, hours_back=hours_back,
)

def _sort_alerts(
Expand Down Expand Up @@ -286,13 +289,15 @@ def _skip_alerts(self, alerts: List[PendingAlertSchema]):
def run_alerts(
self,
days_back: int,
hours_back: int,
dbt_full_refresh: bool = False,
dbt_vars: Optional[dict] = None,
) -> bool:
# Populate data
if self.should_populate_data:
popopulated_data_successfully = self._populate_data(
days_back=days_back,
hours_back=hours_back,
dbt_full_refresh=dbt_full_refresh,
dbt_vars=dbt_vars,
)
Expand All @@ -302,9 +307,9 @@ def run_alerts(
return self.success

# Fetch and filter data
alerts = self._fetch_data(days_back)
alerts = self._fetch_data(days_back=days_back, hours_back=hours_back)
alerts = self._filter_data(alerts)
alerts_last_sent_times = self._fetch_last_sent_times(days_back)
alerts_last_sent_times = self._fetch_last_sent_times(days_back=days_back, hours_back=hours_back)
sorted_alerts = self._sort_alerts(
alerts=alerts, alerts_last_sent_times=alerts_last_sent_times
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
{% macro get_pending_alerts(days_back, type=none) %}
{% macro get_pending_alerts(days_back, type=none, hours_back=none) %}
-- depends_on: {{ ref('alerts_v2') }}
{% if hours_back %}
{% set alerts_time_limit = elementary_cli.get_alerts_time_limit_hour(hours_back) %}
{% else %}
{% set alerts_time_limit = elementary_cli.get_alerts_time_limit(days_back) %}
{% endif %}
{% set select_pending_alerts_query %}
with alerts_in_time_limit as (
select *
from {{ ref('elementary_cli', 'alerts_v2') }}
where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }}
where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ alerts_time_limit }}
{% if type %}
and lower(type) = {{ elementary.edr_quote(type | lower) }}
{% endif %}
Expand All @@ -30,16 +35,21 @@
{% endmacro %}


{% macro get_last_alert_sent_times(days_back, type=none) %}
{% macro get_last_alert_sent_times(days_back, type=none, hours_back=none) %}
-- depends_on: {{ ref('alerts_v2') }}
{% if hours_back %}
{% set alerts_time_limit = elementary_cli.get_alerts_time_limit_hour(hours_back) %}
{% else %}
{% set alerts_time_limit = elementary_cli.get_alerts_time_limit(days_back) %}
{% endif %}
{% set select_last_alert_sent_times_query %}
with alerts_in_time_limit as (
select
alert_class_id,
status,
sent_at
from {{ ref('alerts_v2') }}
where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }}
where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ alerts_time_limit }}
{% if type %}
and lower(type) = {{ elementary.edr_quote(type | lower) }}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro get_alerts_time_limit_hour(hours_back=24) %}
{% set nowtime = elementary.edr_current_timestamp() %}
{% set datetime_limit = elementary.edr_timeadd('hour', hours_back * -1, nowtime) %}
{{ return(datetime_limit) }}
{% endmacro %}
8 changes: 4 additions & 4 deletions elementary/monitor/fetchers/alerts/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ def skip_alerts(
)

def query_pending_alerts(
self, days_back: int, type: Optional[AlertTypes] = None
self, days_back: int, type: Optional[AlertTypes] = None, hours_back: Optional[int] = None
) -> List[PendingAlertSchema]:
pending_alerts_results = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_pending_alerts",
macro_args={"days_back": days_back, "type": type.value if type else None},
macro_args={"days_back": days_back, "type": type.value if type else None, "hours_back": hours_back},
)
return [
PendingAlertSchema(**result)
for result in json.loads(pending_alerts_results[0])
]

def query_last_alert_times(
self, days_back: int, type: Optional[AlertTypes] = None
self, days_back: int, type: Optional[AlertTypes] = None, hours_back: Optional[int] = None
) -> Dict[str, str]:
response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_last_alert_sent_times",
macro_args={"days_back": days_back, "type": type.value if type else None},
macro_args={"days_back": days_back, "type": type.value if type else None, "hours_back": hours_back},
)
return json.loads(response[0])

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "elementary-data"
version = "0.15.1"
version = "0.15.1+rv10"
description = "Data monitoring and lineage"
authors = ["Elementary"]
keywords = ["data", "lineage", "data lineage", "data warehouse", "DWH", "observability", "data monitoring", "data observability", "Snowflake", "BigQuery", "Redshift", "data reliability", "analytics engineering"]
Expand Down
Loading