Skip to content

Commit

Permalink
Update DB record during fill (#10)
Browse files Browse the repository at this point in the history
* Refactor code to update DB record during fill

* Rename done -> complete
  • Loading branch information
albireox authored Nov 11, 2024
1 parent 7d4882c commit 29d8b81
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 72 deletions.
43 changes: 12 additions & 31 deletions src/lvmcryo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typer.core import TyperGroup

from lvmcryo.config import Actions, InteractiveMode, NotificationLevel
from lvmcryo.tools import DBHandler


LOCKFILE = pathlib.Path("/data/lvmcryo.lock")
Expand Down Expand Up @@ -393,7 +394,6 @@ async def ln2(
"""

import json
from logging import FileHandler
from tempfile import NamedTemporaryFile

Expand All @@ -410,7 +410,6 @@ async def ln2(
LockExistsError,
add_json_handler,
ensure_lock,
write_fill_to_db,
)
from lvmcryo.validate import validate_fill

Expand All @@ -426,7 +425,6 @@ async def ln2(

json_path: pathlib.Path | None = None
json_handler: FileHandler | None = None
log_data: list[dict] | None = None
images: dict[str, pathlib.Path | None] = {}

if action == Actions.abort:
Expand Down Expand Up @@ -543,10 +541,15 @@ async def ln2(
log.warning("Lock file exists. Removing it because --clear-lock.")
LOCKFILE.unlink()

db_handler = DBHandler(action, handler, config, json_handler=json_handler)
record_pk = await db_handler.write(complete=False)
if record_pk:
log.debug(f"Record {record_pk} created in the database.")

try:
with ensure_lock(LOCKFILE):
# Run worker.
await ln2_runner(handler, config, notifier)
await ln2_runner(handler, config, notifier, db_handler=db_handler)

# Check handler status.
if handler.failed:
Expand Down Expand Up @@ -605,10 +608,10 @@ async def ln2(
log.info(f"Event times:\n{handler.event_times.model_dump_json(indent=2)}")

if not skip_finally:
configuration_json = config.model_dump() | {
valve: valve_model.model_dump()
for valve, valve_model in config.valve_info.items()
}
# Do a quick update of the DB record since post_fill_tasks() may
# block for a long time.
await db_handler.write(error=error)

plot_paths = await post_fill_tasks(
handler,
notifier=notifier,
Expand Down Expand Up @@ -638,29 +641,7 @@ async def ln2(
error = validate_error

log.info("Writing fill metadata to database.")
if json_handler and json_path:
json_handler.flush()
with json_path.open("r") as ff:
log_data = [json.loads(line) for line in ff.readlines()]

record_pk = await write_fill_to_db(
handler,
api_db_route=config.internal_config["api_routes"]["register_fill"],
plot_paths=plot_paths,
db_extra_payload={
"error": str(error) if error is not None else None,
"action": action.value,
"log_file": str(config.log_path) if config.log_path else None,
"json_file": str(json_path)
if json_path and config.write_json
else None,
"log_data": log_data,
"configuration": configuration_json,
"valve_times": handler.get_valve_times(as_string=True),
},
)
if record_pk:
log.debug(f"Record {record_pk} created in the database.")
await db_handler.write(complete=True, plot_paths=plot_paths, error=error)

if config.notify:
images = {
Expand Down
20 changes: 19 additions & 1 deletion src/lvmcryo/handlers/ln2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import logging
from dataclasses import dataclass, field

from typing import Coroutine, Literal, NoReturn, overload
from typing import Any, Callable, Coroutine, Literal, NoReturn, overload

import sshkeyboard
from pydantic import BaseModel, field_serializer
Expand Down Expand Up @@ -281,6 +281,7 @@ async def purge(
min_purge_time: float | None = None,
max_purge_time: float | None = None,
prompt: bool | None = None,
preopen_cb: Callable[[], Coroutine | Any] | None = None,
):
"""Purges the system.
Expand All @@ -301,6 +302,8 @@ async def purge(
prompt
Whether to show a prompt to stop or cancel the purge. If ``None``,
determined from the instance ``interactive`` attribute.
preopen_cb
A callback to run before opening the purge valve.
"""

Expand All @@ -322,6 +325,12 @@ async def purge(
self._kb_monitor(action="purge")

try:
if preopen_cb:
if asyncio.iscoroutinefunction(preopen_cb):
await preopen_cb()
else:
preopen_cb()

await valve_handler.start_fill(
min_open_time=min_purge_time or 0.0,
max_open_time=max_purge_time,
Expand Down Expand Up @@ -352,6 +361,7 @@ async def fill(
min_fill_time: float | None = None,
max_fill_time: float | None = None,
prompt: bool | None = None,
preopen_cb: Callable[[], Coroutine | Any] | None = None,
):
"""Fills the selected cameras.
Expand All @@ -376,6 +386,8 @@ async def fill(
prompt
Whether to show a prompt to stop or cancel the fill. If ``None``,
determined from the instance ``interactive`` attribute.
preopen_cb
A callback to run before opening the fill valves.
"""

Expand Down Expand Up @@ -413,6 +425,12 @@ async def fill(
self._kb_monitor(action="fill")

try:
if preopen_cb:
if asyncio.iscoroutinefunction(preopen_cb):
await preopen_cb()
else:
preopen_cb()

await asyncio.gather(*fill_tasks)

if not self.aborted:
Expand Down
4 changes: 4 additions & 0 deletions src/lvmcryo/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

if TYPE_CHECKING:
from lvmcryo.config import Config
from lvmcryo.tools import DBHandler


__all__ = ["ln2_runner"]
Expand Down Expand Up @@ -58,6 +59,7 @@ async def ln2_runner(
handler: LN2Handler,
config: Config,
notifier: Notifier | None = None,
db_handler: DBHandler | None = None,
):
"""Runs the purge/fill process.
Expand Down Expand Up @@ -128,6 +130,7 @@ async def ln2_runner(
min_purge_time=config.min_purge_time,
max_purge_time=max_purge_time,
prompt=not config.no_prompt,
preopen_cb=db_handler.write if db_handler is not None else None,
)

if handler.failed or handler.aborted:
Expand All @@ -143,6 +146,7 @@ async def ln2_runner(
min_fill_time=config.min_fill_time,
max_fill_time=max_fill_time,
prompt=not config.no_prompt,
preopen_cb=db_handler.write if db_handler is not None else None,
)

if handler.failed or handler.aborted:
Expand Down
154 changes: 114 additions & 40 deletions src/lvmcryo/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

import asyncio
import contextlib
import json
import logging
import os
import pathlib
import time
import warnings
from contextlib import suppress
from functools import partial
from logging import getLogger
from logging import FileHandler, getLogger

from typing import TYPE_CHECKING, Any

Expand All @@ -32,6 +34,7 @@
if TYPE_CHECKING:
from datetime import datetime

from lvmcryo.config import Config
from lvmcryo.handlers.ln2 import LN2Handler


Expand Down Expand Up @@ -276,54 +279,125 @@ def date_json(date: datetime | None) -> str | None:
return date.isoformat() if date else None


async def write_fill_to_db(
handler: LN2Handler,
api_db_route: str = "http://lvm-hub.lco.cl:8090/api/spectrographs/fills/register",
plot_paths: dict[str, pathlib.Path] = {},
db_extra_payload: dict[str, Any] = {},
):
"""Records the fill to the database.
class DBHandler:
"""Handles writing the fill to the database.
Parameters
----------
action
The action being performed.
handler
The `.LN2Handler` instance.
config
The configuration object.
api_db_route
The API route to write the data to the database.
plot_paths
A dictionary with the paths to the plots.
db_extra_payload
Extra payload to send to the database registration endpoint.
Returns
-------
pk
The primary key of the new record in the database.
json_handler
The logging handler used to write JSON data.
"""

event_times = handler.event_times

async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.post(
api_db_route,
json={
"start_time": date_json(event_times.start_time),
"end_time": date_json(event_times.end_time),
"purge_start": date_json(event_times.purge_start),
"purge_complete": date_json(event_times.purge_complete),
"fill_start": date_json(event_times.fill_start),
"fill_complete": date_json(event_times.fill_complete),
"fail_time": date_json(event_times.fail_time),
"abort_time": date_json(event_times.abort_time),
"failed": handler.failed,
"aborted": handler.aborted,
"plot_paths": {k: str(v) for k, v in plot_paths.items()},
**db_extra_payload,
},
)
response.raise_for_status()
def __init__(
self,
action: str,
handler: LN2Handler,
config: Config,
api_route: str | None = None,
json_handler: FileHandler | None = None,
) -> None:
self.pk: int | None = None

self.action = action
self.handler = handler
self.config = config
self.api_route = api_route or config.internal_config["api_routes.register_fill"]

self.json_handler = json_handler

def get_log_data(self):
"""Returns the log data for the fill."""

if self.json_handler:
self.json_handler.flush()
json_path = pathlib.Path(self.json_handler.baseFilename)
with json_path.open("r") as ff:
return [json.loads(line) for line in ff.readlines()]

record_id = response.json()
return None

async def write(
self,
complete: bool = False,
plot_paths: dict[str, pathlib.Path] = {},
error: Exception | str | None = None,
raise_on_error: bool = False,
):
"""Records the fill to the database.
return record_id
Parameters
----------
complete
Whether the action is complete.
plot_paths
A dictionary with the paths to the plots.
error
The error message or ``None`` if no error.
raise_on_error
Whether to raise an exception if the write fails.
Returns
-------
pk
The primary key of the new record in the database.
"""

event_times = self.handler.event_times
log_path = self.config.log_path

json_path = self.json_handler.baseFilename if self.json_handler else None
json_file = str(json_path) if json_path and self.config.write_json else None

configuration_json = self.config.model_dump() | {
valve: valve_model.model_dump()
for valve, valve_model in self.config.valve_info.items()
}

async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.post(
self.api_route,
json={
"action": self.action,
"complete": complete,
"pk": self.pk,
"start_time": date_json(event_times.start_time),
"end_time": date_json(event_times.end_time),
"purge_start": date_json(event_times.purge_start),
"purge_complete": date_json(event_times.purge_complete),
"fill_start": date_json(event_times.fill_start),
"fill_complete": date_json(event_times.fill_complete),
"fail_time": date_json(event_times.fail_time),
"abort_time": date_json(event_times.abort_time),
"failed": self.handler.failed,
"aborted": self.handler.aborted,
"plot_paths": {k: str(v) for k, v in plot_paths.items()},
"log_file": str(log_path) if log_path else None,
"valve_times": self.handler.get_valve_times(as_string=True),
"json_file": json_file,
"log_data": self.get_log_data(),
"configuration": configuration_json,
"error": str(error) if error is not None else None,
},
)

if response.status_code != 200:
if raise_on_error:
raise RuntimeError(f"Error writing to the DB: {response.text}")
else:
warnings.warn(f"Error writing to the DB: {response.text}")

return self.pk

self.pk = response.json()

return self.pk

0 comments on commit 29d8b81

Please sign in to comment.