Skip to content

Commit

Permalink
Event history refactor (#71)
Browse files Browse the repository at this point in the history
* store status changes with timestamps, show run time based on those timestamps

* fix some bugs

* refactor StatusHistory into it's own dataclass

* create new file specific to status

* add tests for StatusHistory

* add pytest to pre-commit

* attempt to fix github runner

* remove demo_test, accidentally added

* update .gitignore

* clean up mere commit
eriktaubeneck authored Jul 12, 2024
1 parent 8b6b38d commit 771b88e
Showing 14 changed files with 265 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ jobs:

- name: Install dependencies
run: |
pip install .
pip install -e .
- name: Setup node.js
uses: actions/setup-node@v4
with:
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ tmp/
IGNORE-ME*
.pyre/*
.draft
.coverage*

# local env files
.env*.local
14 changes: 14 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -19,6 +19,20 @@ repos:
[
"-rn", # Only display messages
]
- id: pytest
name: pytest
language: python
entry: pytest
types: [python]
pass_filenames: false
args: [--cov=sidecar]
- id : pytest-coverage
name: coverage
language: python
entry: coverage report
types: [python]
pass_filenames: false
args: [--fail-under=9] # increase this over time
- id: pyre-check
name: pyre-check
entry: pyre check
3 changes: 2 additions & 1 deletion .pyre_configuration
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"site_package_search_strategy": "pep561",
"source_directories": [
"sidecar"
{"import_root": ".", "source": "sidecar"}
]

}
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@ dependencies=[
"pre-commit",
"cryptography",
"httpx",
"pytest",
"pytest-cov",
]

[project.scripts]
@@ -50,6 +52,15 @@ disable = [
# "R0913",
]

[tool.pylint.main]
source-roots = ["sidecar"]

[tool.black]
target-version = ["py311", ]
include = '\.pyi?$'

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]
pythonpath = "sidecar"
115 changes: 32 additions & 83 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# pylint: disable=R0801
from __future__ import annotations

import time
from collections import namedtuple
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
@@ -13,7 +10,8 @@
from ..helpers import Role
from ..logger import logger
from ..settings import settings
from .step import Status, Step
from .status import Status, StatusHistory
from .step import Step

# Dictionary to store queries
queries: dict[str, "Query"] = {}
@@ -23,30 +21,23 @@ class QueryExistsError(Exception):
pass


StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"])


@dataclass
class Query:
# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
start_time: Optional[float] = field(init=False, default=None)
end_time: Optional[float] = field(init=False, default=None)
stopped: bool = field(init=False, default=False)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
_status_history: StatusHistory = field(init=False, repr=True)
step_classes: ClassVar[list[type[Step]]] = []
_log_dir: Path = settings.root_path / Path("logs")
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)
_status_dir: Path = settings.root_path / Path("status_semaphore")

def __post_init__(self):
self.logger = logger.bind(task=self.query_id)
status_dir = settings.root_path / Path("status")
status_dir.mkdir(exist_ok=True)
status_file_path = status_dir / Path(f"{self.query_id}")
self._status_history = StatusHistory(file_path=status_file_path, logger=logger)

self._log_dir.mkdir(exist_ok=True)
self._status_dir.mkdir(exist_ok=True)
self._logger_id = logger.add(
self.log_file_path,
serialize=True,
@@ -58,17 +49,21 @@ def __post_init__(self):
raise QueryExistsError(f"{self.query_id} already exists")
queries[self.query_id] = self

@property
def _log_dir(self) -> Path:
return settings.root_path / Path("logs")

@property
def role(self) -> Role:
return settings.role

@property
def started(self) -> bool:
return self.start_time is not None
return self.status >= Status.STARTING

@property
def finished(self) -> bool:
return self.end_time is not None
return self.status >= Status.COMPLETE

@classmethod
def get_from_query_id(cls, query_id) -> Optional["Query"]:
@@ -83,55 +78,22 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]:
if query:
return query
raise e
query.load_history_from_file()
if query.status == Status.UNKNOWN:
return None
return query

def load_history_from_file(self):
if self.status_file_path.exists():
self.logger.debug(
f"Loading query {self.query_id} status history "
f"from file {self.status_file_path}"
)
with self.status_file_path.open("r") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def _last_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def status_event_json(self):
status_event = {
"status": self._last_status_event.status.name,
"start_time": self._last_status_event.timestamp,
}
if self.status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self._last_status_event.timestamp
return status_event

@property
def status(self) -> Status:
return self._last_status_event.status
return self._status_history.current_status

@status.setter
def status(self, status: Status):
if self.status <= Status.COMPLETE:
now = time.time()
self._status_history.append(StatusChangeEvent(status=status, timestamp=now))
with self.status_file_path.open("a") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{now}\n")
if self.status != status and self.status <= Status.COMPLETE:
self._status_history.add(status)

@property
def status_event_json(self):
return self._status_history.status_event_json

@property
def running(self):
@@ -141,18 +103,12 @@ def running(self):
def log_file_path(self) -> Path:
return self._log_dir / Path(f"{self.query_id}.log")

@property
def status_file_path(self) -> Path:
return self._status_dir / Path(f"{self.query_id}")

@property
def steps(self) -> Iterable[Step]:
for step_class in self.step_classes:
if not self.stopped:
yield step_class.build_from_query(self)
yield step_class.build_from_query(self)

def start(self):
self.start_time = time.time()
try:
for step in self.steps:
if self.finished:
@@ -180,37 +136,30 @@ def finish(self):
self._cleanup()

def kill(self):
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
if self.running:
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
self._cleanup()

def crash(self):
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
if self.running:
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
self._cleanup()

def _cleanup(self):
self.current_step = None
self.end_time = time.time()
try:
logger.remove(self._logger_id)
except ValueError:
pass
if queries.get(self.query_id) is not None:
del queries[self.query_id]

@property
def run_time(self):
if not self.start_time:
return 0
if not self.end_time:
return time.time() - self.start_time
return self.end_time - self.start_time

@property
def cpu_usage_percent(self) -> float:
if self.current_step:
83 changes: 83 additions & 0 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

import time
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import NamedTuple

import loguru


class Status(IntEnum):
UNKNOWN = auto()
NOT_FOUND = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
CRASHED = auto()


StatusChangeEvent = NamedTuple(
"StatusChangeEvent", [("status", Status), ("timestamp", float)]
)


@dataclass
class StatusHistory:
file_path: Path = field(init=True, repr=False)
logger: loguru.Logger = field(init=True, repr=False, compare=False)
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)

def __post_init__(self):
if self.file_path.exists():
self.logger.debug(f"Loading status history from file {self.file_path}")
with self.file_path.open("r", encoding="utf8") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def locking_status(self):
"""Cannot add to history after this or higher status is reached"""
return Status.COMPLETE

def add(self, status: Status, timestamp: float = time.time()):
assert status > self.current_status
assert self.current_status < self.locking_status
self._status_history.append(
StatusChangeEvent(status=status, timestamp=timestamp)
)
with self.file_path.open("a", encoding="utf8") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{timestamp}\n")

@property
def current_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def current_status(self):
return self.current_status_event.status

@property
def status_event_json(self):
status_event = {
"status": self.current_status_event.status.name,
"start_time": self.current_status_event.timestamp,
}
if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self.current_status_event.timestamp
return status_event
14 changes: 1 addition & 13 deletions sidecar/app/query/step.py
Original file line number Diff line number Diff line change
@@ -3,29 +3,17 @@
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import IntEnum, auto
from typing import TYPE_CHECKING, ClassVar, Optional

import loguru

from .command import Command
from .status import Status

if TYPE_CHECKING:
from .base import QueryTypeT


class Status(IntEnum):
UNKNOWN = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
NOT_FOUND = auto()
CRASHED = auto()


@dataclass(kw_only=True)
class Step(ABC):
skip: bool = field(init=False, default=False)
2 changes: 1 addition & 1 deletion sidecar/app/routes/start.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
from ..query.base import Query
from ..query.demo_logger import DemoLoggerQuery
from ..query.ipa import GateType, IPACoordinatorQuery, IPAHelperQuery
from ..query.step import Status
from ..query.status import Status
from ..settings import settings

router = APIRouter(
2 changes: 1 addition & 1 deletion sidecar/app/routes/stop.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

from ..logger import logger
from ..query.base import Query
from ..query.step import Status
from ..query.status import Status

router = APIRouter(
prefix="/stop",
2 changes: 1 addition & 1 deletion sidecar/app/routes/websockets.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@

from ..logger import logger
from ..query.base import Query
from ..query.step import Status
from ..query.status import Status

router = APIRouter(
prefix="/ws",
4 changes: 2 additions & 2 deletions sidecar/cli/cli.py
Original file line number Diff line number Diff line change
@@ -9,8 +9,8 @@
import click
import click_pathlib

from ..app.command import Command, start_commands_parallel
from ..app.helpers import Role
from sidecar.app.command import Command, start_commands_parallel
from sidecar.app.helpers import Role


@click.group()
Empty file added sidecar/tests/__init__.py
Empty file.
115 changes: 115 additions & 0 deletions sidecar/tests/app/query/test_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import time
from pathlib import Path

import loguru
import pytest

from sidecar.app.query.status import Status, StatusChangeEvent, StatusHistory


@pytest.fixture(name="status_history_fixture")
def _status_history_fixture(tmp_path):
status_history = StatusHistory(
file_path=tmp_path / Path("status"),
logger=loguru.logger,
)

return status_history


@pytest.fixture(name="full_status_history_fixture")
def _full_status_history_fixture(status_history_fixture):
status_events = [
(Status.STARTING, 1.0),
(Status.COMPILING, 2.0),
(Status.WAITING_TO_START, 3.0),
(Status.IN_PROGRESS, 4.0),
(Status.COMPLETE, 5.0),
]

for status, timestamp in status_events:
status_history_fixture.add(status, timestamp)

return status_history_fixture


def test_status_history_add(status_history_fixture):
now = time.time()
status_history_fixture.add(Status.COMPILING, now)
assert status_history_fixture.current_status_event == StatusChangeEvent(
Status.COMPILING, now
)
now = time.time()
status_history_fixture.add(Status.IN_PROGRESS, now)
assert status_history_fixture.current_status_event == StatusChangeEvent(
Status.IN_PROGRESS, now
)


def test_status_history_add_write_to_file(status_history_fixture):
status_history_fixture.add(Status.COMPILING, 1.0)
status_history_fixture.add(Status.IN_PROGRESS, 2.0)
with status_history_fixture.file_path.open("r", encoding="utf-8") as f:
assert f.readline() == "COMPILING,1.0\n"
assert f.readline() == "IN_PROGRESS,2.0\n"


def test_status_history_add_load_from_file(tmp_path, full_status_history_fixture):
status_history = StatusHistory(
file_path=tmp_path / Path("status"),
logger=loguru.logger,
)
assert status_history == full_status_history_fixture


def test_status_history_cannot_add_when_locked(full_status_history_fixture):
with pytest.raises(AssertionError):
now = time.time()
full_status_history_fixture.add(Status.KILLED, now)


def test_status_history_cannot_add_lower_status(status_history_fixture):
now = time.time()
status_history_fixture.add(Status.IN_PROGRESS, now)
assert status_history_fixture.current_status_event == StatusChangeEvent(
Status.IN_PROGRESS, now
)
with pytest.raises(AssertionError):
now = time.time()
status_history_fixture.add(Status.COMPILING, now)


def test_status_history_current_status_event(full_status_history_fixture):
assert full_status_history_fixture.current_status_event == StatusChangeEvent(
Status.COMPLETE, 5.0
)


def test_status_history_current_status(full_status_history_fixture):
assert full_status_history_fixture.current_status == Status.COMPLETE


def test_status_history_status_event_json(
status_history_fixture,
):
now = time.time()
status_history_fixture.add(Status.COMPILING, now)
assert status_history_fixture.status_event_json == {
"status": Status.COMPILING.name,
"start_time": now,
}

now = time.time()
status_history_fixture.add(Status.IN_PROGRESS, now)
assert status_history_fixture.status_event_json == {
"status": Status.IN_PROGRESS.name,
"start_time": now,
}

now2 = time.time()
status_history_fixture.add(Status.COMPLETE, now2)
assert status_history_fixture.status_event_json == {
"status": Status.COMPLETE.name,
"start_time": now,
"end_time": now2,
}

0 comments on commit 771b88e

Please sign in to comment.