Skip to content

Commit

Permalink
Merge pull request #1 from Airbase/add_datadog_exporter
Browse files Browse the repository at this point in the history
Add datadog exporter
  • Loading branch information
HassankSalim authored Jul 19, 2022
2 parents 8c73c0c + 293be91 commit 86647d5
Show file tree
Hide file tree
Showing 15 changed files with 1,453 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[flake8]
format = pylint
max-line-length = 88
ignore =
E501
43 changes: 43 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Python
*.egg-info
*.pyc
.python-version
build

# Notebooks
*.ipynb
*.ipynb_checkpoints

# Celery
celerybeat.pid
celerybeat-schedule*

# Django
staticfiles

# Environment
.env*
.envs/
.venv
venv

# IDEs
*.swp
.idea
.vscode

# Testing and lints
.coverage*
.hypothesis
.mypy_cache/
.pytest_cache
coverage.xml
htmlcov

# DB dumps
*.dump
*.sqlite3

# Misc
*.DS_Store
*.log
57 changes: 57 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
default_language_version:
python: python3.9

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: check-added-large-files
- id: check-case-conflict
- id: check-executables-have-shebangs
- id: check-json
- id: check-merge-conflict
- id: check-symlinks
- id: check-toml
- id: check-xml
- id: check-yaml
- id: debug-statements
- id: detect-private-key
- id: end-of-file-fixer
- id: forbid-new-submodules
- id: mixed-line-ending
- id: no-commit-to-branch
- id: pretty-format-json
args: ["--autofix"]
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v2.34.0
hooks:
- id: pyupgrade
args: ["--py39-plus", "--keep-runtime-typing"]

- repo: https://github.com/PyCQA/autoflake
rev: v1.4
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-import]

- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (pyi)
types: [pyi]

- repo: https://github.com/ambv/black
rev: 22.3.0
hooks:
- id: black
args: [--line-length=88, --safe]

- repo: https://github.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
worker: python cli.py --broker=$REDIS_BROKER
49 changes: 49 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import os
import sys

import click
import daiquiri
from pythonjsonlogger.jsonlogger import JsonFormatter

from exporters import DataDogExporter
from receiver import CeleryEventReceiver
from store import InMemoryStore


def setup_logging():
prod_log_format = (
"%(asctime)s [%(process)d] %(levelname)-8.8s %(name)s: %(message)s"
)
is_production_env = os.environ.get("PROD", "False").lower() == "true"
output_formatter = (
JsonFormatter(prod_log_format)
if is_production_env
else daiquiri.formatter.ColorFormatter()
)
daiquiri.setup(
level=os.environ.get("LOG_LEVEL", "INFO"),
outputs=[
daiquiri.output.Stream(sys.stdout, formatter=output_formatter),
],
)


@click.command()
@click.option("--broker", default="redis://localhost:6379/1", help="celery broker uri")
def run(broker):
setup_logging()

# start all the exporters in different threads
logging.info("Initialize datadog exporter")
dd_exporter = DataDogExporter(store=InMemoryStore(max_size=100000))
dd_exporter.start()

logging.info("Initialize receiver")
event_receiver = CeleryEventReceiver(broker=broker)
event_receiver.attach(dd_exporter)
event_receiver.run()


if __name__ == "__main__":
run()
4 changes: 4 additions & 0 deletions exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from exporters.base import Exporter
from exporters.dd import DataDogExporter

__all__ = ["Exporter", "DataDogExporter"]
10 changes: 10 additions & 0 deletions exporters/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from abc import ABC, abstractmethod
from typing import Union

from celery.events.state import Task, Worker


class Exporter(ABC):
@abstractmethod
def process_event(self, event: Union[Task, Worker]):
pass
119 changes: 119 additions & 0 deletions exporters/dd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from enum import Enum
from threading import Thread
from time import sleep
from typing import Union

import daiquiri
from celery.events.state import Task, Worker
from celery.states import FAILURE, PENDING, READY_STATES, STARTED, SUCCESS
from datadog import initialize, statsd

from exporters import Exporter

logger = daiquiri.getLogger(__name__)


class DataDogMetrics(Enum):
TASK_WAIT_TIME = "celery.task.wait_time"
TASK_RUNTIME_TIME = "celery.task.run_time"
TOTAL_SUCCESS = "celery.task.success_count"
TOTAL_FAILED = "celery.task.fail_count"


class DataDogSummary: # maybe this can be generic
def __init__(self, events):
self.events = events

@property
def wait_time(self):
"""Returns wait time of a task in seconds.
Note:
-----
observation task started from celery-beat don't have pending or task-sent event
Returns
--------
int
Wait Time in seconds
"""
try:
client_sent_time = self.events[PENDING]["timestamp"]
start_time = self.events[STARTED]["timestamp"]
return start_time - client_sent_time
except KeyError:
logger.exception(f"KeyError for {self.events}")
return None

@property
def run_time(self):
"""Returns execution of a task in seconds.
Returns
--------
int
Run Time in seconds
"""
try:
return self.events[SUCCESS]["runtime"]
except KeyError:
logger.exception(f"KeyError for {self.events}")
return None


class DataDogExporter(Exporter, Thread):
def __init__(self, config_option=None, store=None):
Thread.__init__(self)
self.daemon = True
self.store = store
self.config_option = config_option or {
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
}
initialize(**self.config_option)

@staticmethod
def get_tags(events):
try:
pending_event = events[PENDING]
tags_dict = {
"queue": pending_event.get("queue", ""),
"task_name": pending_event.get("name", ""),
}
return [f"{key}:{value}" for key, value in tags_dict.items() if value]
except KeyError:
logger.exception(f"Pending Event missing in {events}")

def process_event(self, event: Union[Task, Worker]):
self.store.add_event(event.uuid, event.state, event)
if event.state in READY_STATES:
logger.debug(f"task: {event.uuid} ended with state: {event.state}")
self.store.add_processable_task(event.uuid)

def run(self) -> None:
logger.info("Starting Datadog exporter")
while True:
try:
if self.store.is_empty():
sleep(10)
continue
task_id = self.store.get_processable_task()
events = self.store.get_events(task_id)
tags = self.get_tags(events)
summary = DataDogSummary(events)
if (wait_time := summary.wait_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_WAIT_TIME.value, wait_time, tags=tags
)
if (run_time := summary.run_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_RUNTIME_TIME.value, run_time, tags=tags
)
if SUCCESS in events:
statsd.increment(DataDogMetrics.TOTAL_SUCCESS.value, tags=tags)
if FAILURE in events:
statsd.increment(DataDogMetrics.TOTAL_FAILED.value, tags=tags)
self.store.pop_task(task_id)
except Exception:
logger.exception("datadog exporter exception")
sleep(10)
Loading

0 comments on commit 86647d5

Please sign in to comment.