Skip to content

Commit

Permalink
feat: add grafana tempo topology (#3217)
Browse files Browse the repository at this point in the history
Signed-off-by: 35C4n0r <[email protected]>
Co-authored-by: Tal <[email protected]>
  • Loading branch information
35C4n0r and talboren authored Jan 30, 2025
1 parent b8db4e4 commit ee1a386
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 6 deletions.
12 changes: 10 additions & 2 deletions docs/providers/documentation/grafana-provider.mdx
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
title: "Grafana Provider"
description: "Grafana Provider allows either pull/push alerts from Grafana to Keep."
description: "Grafana Provider allows either pull/push alerts and pull Topology Map from Grafana to Keep."
---
<Tip>Grafana currently supports pulling/pushing alerts. We will add querying and notifying soon.</Tip>
<Tip>Grafana currently supports pulling/pushing alerts & Topology Map. We will add querying and notifying soon.</Tip>

## Legacy vs Unified Alerting

Expand Down Expand Up @@ -114,6 +114,14 @@ If Keep is not accessible externally and the webhook cannot be created, you can
4. **Network and Connectivity Check:**
- Use network monitoring tools to ensure Grafana can reach Keep or any alternative endpoint configured for alerts.

<Note>
**Topology Map** is generated from the traces collect by Tempo.
To get the Datasource UID, go to:
1. Connections > Data Sources.
2. Click the Prometheus instance which is scraping data from Tempo > Your URL is in the format `https://host/connections/datasources/edit/<DATASOURCE_UID>`
3. Copy that DATASOURCE_UID and use it while installing the provider.
</Note>

## Webhook Integration Modifications

The webhook integration adds Keep as a contact point in the Grafana instance. This integration can be located under the "Contact Points" section. Keep also gains access to the following scopes:
Expand Down
2 changes: 1 addition & 1 deletion keep/providers/argocd_provider/argocd_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,4 @@ def pull_topology(self):
node["uid"]
] = "unknown"

return list(service_topology.values())
return list(service_topology.values()), {}
4 changes: 4 additions & 0 deletions keep/providers/grafana_provider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ curl -X POST -H "Content-Type: application/json" \
# and get
{"id":1,"name":"keep-token","key":"glsa_XXXXXX"}%
```

### For Topology Quickstart
Follow this guide:
https://grafana.com/docs/tempo/latest/getting-started/docker-example/
134 changes: 131 additions & 3 deletions keep/providers/grafana_provider/grafana_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from packaging.version import Version

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.api.models.db.topology import TopologyServiceInDto
from keep.contextmanager.contextmanager import ContextManager
from keep.exceptions.provider_exception import ProviderException
from keep.providers.base.base_provider import BaseProvider
from keep.providers.base.base_provider import BaseProvider, BaseTopologyProvider
from keep.providers.base.provider_exceptions import GetAlertException
from keep.providers.grafana_provider.grafana_alert_format_description import (
GrafanaAlertFormatDescription,
Expand Down Expand Up @@ -47,11 +48,18 @@ class GrafanaProviderAuthConfig:
"validation": "any_http_url",
},
)
datasource_uid: str = dataclasses.field(
metadata={
"required": False,
"description": "Datasource UID",
"hint": "Provide if you want to pull topology data",
},
)


class GrafanaProvider(BaseProvider):
class GrafanaProvider(BaseTopologyProvider):
PROVIDER_DISPLAY_NAME = "Grafana"
"""Pull/Push alerts from Grafana."""
"""Pull/Push alerts & Topology map from Grafana."""

PROVIDER_CATEGORY = ["Monitoring", "Developer Tools"]
KEEP_GRAFANA_WEBHOOK_INTEGRATION_NAME = "keep-grafana-webhook-integration"
Expand Down Expand Up @@ -856,6 +864,126 @@ def simulate_alert(cls, **kwargs) -> dict:
return {"keep_source_type": "grafana", "event": final_payload}
return final_payload

def query_datasource_for_topology(self):
self.logger.info("Attempting to query datasource for topology data.")
headers = {"Authorization": f"Bearer {self.authentication_config.token}", "Content-Type": "application/json",}
json_data = {
"queries": [
{
"format": "table",
"refId": "traces_service_graph_request_total",
"expr": "sum by (client, server) (rate(traces_service_graph_request_total[3600s]))",
"instant": True,
"exemplar": False,
"requestId": "service_map_request",
"utcOffsetSec": 19800,
"interval": "",
"legendFormat": "",
"datasource": {
"uid": self.authentication_config.datasource_uid,
},
"datasourceId": 1,
"intervalMs": 5000,
"maxDataPoints": 954,
},
{
"format": "table",
"refId": "traces_service_graph_request_server_seconds_sum",
"expr": "sum by (client, server) (rate(traces_service_graph_request_server_seconds_sum[3600s]))",
"instant": True,
"exemplar": False,
"requestId": "service_map_request_avg",
"utcOffsetSec": 19800,
"interval": "",
"legendFormat": "",
"datasource": {
"uid": self.authentication_config.datasource_uid,
},
"datasourceId": 1,
"intervalMs": 5000,
"maxDataPoints": 954,
},
],
"to": "now",
}
try:
response = requests.post(
f"{self.authentication_config.host}/api/ds/query",
verify=False,
headers=headers,
json=json_data,
timeout=10,
)
if response.status_code != 200:
raise Exception(response.text)
return response.json()
except Exception as e:
self.logger.error("Error while querying datasource for topology map", extra={"exception": str(e)})

@staticmethod
def __extract_schema_value_pair(results, query: str):
client_server_data = {}
for frames in results.get(query, {}).get("frames", []):
value_index = 0
for fields in frames.get("schema", {}).get("fields", []):
if (
"labels" in fields
and "client" in fields["labels"]
and "server" in fields["labels"]
):
client_server_data[
(fields["labels"]["client"], fields["labels"]["server"])
] = float(frames["data"]["values"][value_index][0])
break
value_index += 1
return client_server_data

def pull_topology(self):
self.logger.info("Pulling Topology data from Grafana...")
try:
service_topology = {}
results = self.query_datasource_for_topology().get("results", {})

self.logger.info("Scraping traces_service_graph_request_total data from the response")
requests_per_second_data = GrafanaProvider.__extract_schema_value_pair(
results=results, query="traces_service_graph_request_total"
)

self.logger.info("Scraping traces_service_graph_request_server_seconds_sum data from the response")
total_response_times_data = GrafanaProvider.__extract_schema_value_pair(
results=results, query="traces_service_graph_request_server_seconds_sum"
)

self.logger.info("Building Topology map.")
for client_server in requests_per_second_data:
client, server = client_server
requests_per_second = requests_per_second_data[client_server]
total_response_time = total_response_times_data.get(client_server, None)

if client not in service_topology:
service_topology[client] = TopologyServiceInDto(
source_provider_id=self.provider_id,
service=client,
display_name=client,
)
if server not in service_topology:
service_topology[server] = TopologyServiceInDto(
source_provider_id=self.provider_id,
service=server,
display_name=server,
)

service_topology[client].dependencies[server] = (
"unknown"
if total_response_time is None
else f"{round(requests_per_second, 2)}r/sec || {round((total_response_time / requests_per_second) * 1000, 2)}ms/r"
)
self.logger.info("Successfully pulled Topology data from Grafana...")
return list(service_topology.values()), {}
except Exception as e:
self.logger.error("Error while pulling topology data from Grafana", extra={"exception": str(e)})
raise e


if __name__ == "__main__":
# Output debug messages
Expand Down

0 comments on commit ee1a386

Please sign in to comment.