Skip to content

Commit

Permalink
Upgrade to python 3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
sidmitra committed Sep 10, 2024
1 parent 05c78f0 commit fc41e09
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 58 deletions.
27 changes: 15 additions & 12 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ default_language_version:

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.6.0
hooks:
- id: check-added-large-files
- id: check-case-conflict
Expand All @@ -25,33 +25,36 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v2.34.0
rev: v3.17.0
hooks:
- id: pyupgrade
args: ["--py39-plus", "--keep-runtime-typing"]
args: ["--py312-plus", "--keep-runtime-typing"]

- repo: https://github.com/PyCQA/autoflake
rev: v1.4
rev: v2.3.1
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-import]

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.13.2
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (pyi)
types: [pyi]

- repo: https://github.com/ambv/black
rev: 22.3.0
hooks:
- id: black
args: [--line-length=88, --safe]

- repo: https://github.com/pycqa/flake8
rev: 4.0.1
rev: 7.1.1
hooks:
- id: flake8

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.6.4 # Upgrade along with version in pyproject.toml dev dependencies
hooks:
- id: ruff
types_or: [ python, pyi ]
args: [ --fix ]
- id: ruff-format
types_or: [ python, pyi ]
4 changes: 2 additions & 2 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from store import InMemoryStore


def setup_logging():
def setup_logging() -> None:
prod_log_format = (
"%(asctime)s [%(process)d] %(levelname)-8.8s %(name)s: %(message)s"
)
Expand All @@ -31,7 +31,7 @@ def setup_logging():

@click.command()
@click.option("--broker", default="redis://localhost:6379/1", help="celery broker uri")
def run(broker):
def run(broker) -> None:
setup_logging()

# start all the exporters in different threads
Expand Down
3 changes: 1 addition & 2 deletions exporters/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from typing import Union

from celery.events.state import Task, Worker


class Exporter(ABC):
@abstractmethod
def process_event(self, event: Union[Task, Worker]):
def process_event(self, event: Task | Worker):
pass
25 changes: 15 additions & 10 deletions exporters/dd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from enum import Enum
from threading import Thread
from time import sleep
from typing import Union

import daiquiri
from celery.events.state import Task, Worker
Expand All @@ -21,21 +20,22 @@ class DataDogMetrics(Enum):


class DataDogSummary: # maybe this can be generic
def __init__(self, events):
def __init__(self, events) -> None:
self.events = events

@property
def wait_time(self):
"""Returns wait time of a task in seconds.
Note:
-----
----
observation task started from celery-beat don't have pending or task-sent event
Returns
--------
Returns:
-------
int
Wait Time in seconds
"""
try:
client_sent_time = self.events[PENDING]["timestamp"]
Expand All @@ -50,9 +50,10 @@ def run_time(self):
"""Returns execution of a task in seconds.
Returns
--------
-------
int
Run Time in seconds
"""
try:
return self.events[SUCCESS]["runtime"]
Expand All @@ -62,7 +63,7 @@ def run_time(self):


class DataDogExporter(Exporter, Thread):
def __init__(self, config_option=None, store=None):
def __init__(self, config_option=None, store=None) -> None:
Thread.__init__(self)
self.daemon = True
self.store = store
Expand All @@ -84,7 +85,7 @@ def get_tags(events):
except KeyError:
logger.exception(f"Pending Event missing in {events}")

def process_event(self, event: Union[Task, Worker]):
def process_event(self, event: Task | Worker) -> None:
self.store.add_event(event.uuid, event.state, event)
if event.state in READY_STATES:
logger.debug(f"task: {event.uuid} ended with state: {event.state}")
Expand All @@ -103,11 +104,15 @@ def run(self) -> None:
summary = DataDogSummary(events)
if (wait_time := summary.wait_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_WAIT_TIME.value, wait_time, tags=tags
DataDogMetrics.TASK_WAIT_TIME.value,
wait_time,
tags=tags,
)
if (run_time := summary.run_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_RUNTIME_TIME.value, run_time, tags=tags
DataDogMetrics.TASK_RUNTIME_TIME.value,
run_time,
tags=tags,
)
if SUCCESS in events:
statsd.increment(DataDogMetrics.TOTAL_SUCCESS.value, tags=tags)
Expand Down
57 changes: 39 additions & 18 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,51 @@ version = "0.1.0"
description = "Celery metrics exporter for Datadog and Postgres"
authors = ["Airbase Inc <[email protected]>"]

[build-system]
requires = ["poetry-core>=1.2.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.dependencies]
python = "3.10.*"
celery = "^5.2"
datadog = "^0.44"
kombu = "^5.2"
redis = "^4.3"
amqp = "^5.1"
python = "3.12.*"
amqp = "^5.2"
celery = "^5.4"
click = "^8.1"
requests = "^2.28"
daiquiri = "^3.1.0"
datadog = "^0.50"
kombu = "^5.2"
redis = "^5.0"

[tool.poetry.dev-dependencies]
black = "22.3.0"
flake8 = "4.0.1"
taskipy = "^1.10.2"
pre-commit = "^2.19.0"
[tool.poetry.group.dev.dependencies]
flake8 = "^7.1"
pre-commit = "^3.8"
ruff = "^0.6.4" # Upgrade along with version in .pre-commit-config.yaml
taskipy = "^1.13"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.ruff]
line-length = 88
target-version = "py312"
exclude = []

[tool.poetry.scripts]
eventbusk = "cli:cli"
[tool.ruff.lint]
select = ["ALL"]
ignore = [
"ANN001",
"ANN201",
"ANN205",
"D100",
"D101",
"D102",
"D103",
"D104",
"D107",
"D211",
"D213",
"E501",
"G004",
"RUF012",
]

[tool.taskipy.tasks]
pre_freeze = "echo '# Do not edit directly, but instead via poetry' > requirements.txt && echo '# Do not edit directly, but instead via poetry. Also contains pinned dev requirements.' > requirements_all.txt"
freeze = "poetry export --without-hashes --format=requirements.txt >> requirements.txt && poetry export --dev --without-hashes --format=requirements.txt >> requirements_all.txt"
freeze = "poetry export --without-hashes --format=requirements.txt >> requirements.txt && poetry export --with dev --without-hashes --format=requirements.txt >> requirements_all.txt"
ruff = "ruff check --unsafe-fixes"
19 changes: 11 additions & 8 deletions receiver.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
from time import sleep
from typing import Union
from typing import TYPE_CHECKING

import daiquiri
from celery import Celery
from celery.events.state import Task, Worker

from exporters import Exporter
from utils import is_event_type_task

if TYPE_CHECKING:
from exporters import Exporter

logger = daiquiri.getLogger(__name__)


class CeleryEventReceiver:
def __init__(self, broker: str):
def __init__(self, broker: str) -> None:
self.broker = broker
self.celery_app = Celery(
broker=self.broker,
)
self.exporters: list[Exporter] = []
self.state = self.celery_app.events.State()

def attach(self, exporter):
def attach(self, exporter) -> None:
self.exporters.append(exporter)

def _notify_exporters(self, event: Union[Task, Worker]):
def _notify_exporters(self, event: Task | Worker) -> None:
for exporter in self.exporters:
exporter.process_event(event)

def notify(self, event):
def notify(self, event) -> None:
event_type: str = event.get("type")
if not is_event_type_task(event_type):
# Since we are only exporting metrics related to celery task
Expand All @@ -37,7 +39,7 @@ def notify(self, event):
event_details, event_type = self.state.event(event)
event, is_create_event = event_details
logger.debug(
f"Receiver event: {event.uuid} {event.name}, {event.state} {event.timestamp} with data: {event}"
f"Receiver event: {event.uuid} {event.name}, {event.state} {event.timestamp} with data: {event}",
)
self._notify_exporters(event)

Expand All @@ -46,7 +48,8 @@ def run(self) -> None:
while True:
try:
receiver = self.celery_app.events.Receiver(
connection, handlers={"*": self.notify}
connection,
handlers={"*": self.notify},
)
receiver.capture(limit=None, timeout=120)
except Exception:
Expand Down
1 change: 1 addition & 0 deletions runtime.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python-3.12.6
8 changes: 4 additions & 4 deletions store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def serialize(self, event):


class InMemoryStore(TaskStore):
def __init__(self, max_size):
def __init__(self, max_size) -> None:
self.event_store = LRUCache(max_size)
self.processing_queue = Queue()

Expand All @@ -56,15 +56,15 @@ def get_processable_task(self):
def get_events(self, task_id):
return self.event_store.get(task_id)

def pop_task(self, task_id):
def pop_task(self, task_id) -> None:
self.event_store.data.pop(task_id)

def add_event(self, task_id, state, event):
def add_event(self, task_id, state, event) -> None:
event_dict = self.serialize(event)
try:
self.event_store[task_id][state] = event_dict
except KeyError:
self.event_store[event.uuid] = {event.state: event_dict}

def add_processable_task(self, task_id):
def add_processable_task(self, task_id) -> None:
self.processing_queue.put(task_id)
5 changes: 3 additions & 2 deletions utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
def is_event_type_task(event_type: str):
"""Return true when event type is of type event
"""Return true when event type is of type event.
Returns
--------
-------
bool
returns true when event type is of type event
"""
return event_type and event_type.startswith("task-")

0 comments on commit fc41e09

Please sign in to comment.