Skip to content

Commit

Permalink
pysmurf-monitor main process robustness
Browse files Browse the repository at this point in the history
- protect against OperationalError crashing main loop
- Adds basic type hinting
  • Loading branch information
jlashner committed Aug 23, 2024
1 parent cf89f1f commit 1d981ef
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions socs/agents/pysmurf_monitor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
from ocs import ocs_agent, ocs_feed, site_config
from twisted.internet import reactor
from twisted.internet.protocol import DatagramProtocol
import txaio # type: ignore

import traceback
from typing import Tuple, Optional, Dict, Any
import sqlalchemy

from socs.db.suprsync import SupRsyncFilesManager, create_file


def create_remote_path(meta, archive_name):
def create_remote_path(meta: Dict[str, Any], archive_name: str) -> str:
"""
Creates "remote path" for file.
Expand Down Expand Up @@ -52,6 +57,8 @@ def create_remote_path(meta, archive_name):
)
elif archive_name == 'timestreams':
return str(os.path.join(*os.path.normpath(meta['path']).split(os.sep)[-3:]))
else:
raise ValueError(f"Unknown archive name: {archive_name}")


class PysmurfMonitor(DatagramProtocol):
Expand Down Expand Up @@ -87,17 +94,17 @@ class PysmurfMonitor(DatagramProtocol):
suprsync db.
"""

def __init__(self, agent, args):
def __init__(self, agent: ocs_agent.OCSAgent, args: argparse.Namespace) -> None:
self.agent: ocs_agent.OCSAgent = agent
self.log = agent.log
self.file_queue = queue.Queue()
self.db_path = args.db_path
self.running = False
self.echo_sql = args.echo_sql
self.log: txaio.interfaces.ILogger = agent.log
self.file_queue: queue.Queue[Dict[str, Any]] = queue.Queue()
self.db_path: str = args.db_path
self.running: bool = False
self.echo_sql: bool = args.echo_sql

self.agent.register_feed('pysmurf_session_data')

def datagramReceived(self, _data, addr):
def datagramReceived(self, _data: bytes, addr: Tuple[str, int]) -> None:
"""
Called whenever UDP data is received.
Expand Down Expand Up @@ -152,7 +159,11 @@ def datagramReceived(self, _data, addr):
self.agent.publish_to_feed(feed_name, feed_data, from_reactor=True)

@ocs_agent.param('test_mode', default=False, type=bool)
def run(self, session, params=None):
def run(
self,
session: ocs_agent.OpSession,
params: Any = None,
) -> Tuple[bool, str]:
"""run(test_mode=False)
**Process** - Main process for the pysmurf monitor agent. Processes
Expand All @@ -163,13 +174,12 @@ def run(self, session, params=None):
test_mode (bool, optional):
Stop the Process loop after processing any file(s).
This is meant only for testing. Default is False.
"""
srfm = SupRsyncFilesManager(self.db_path, create_all=True, echo=self.echo_sql)

self.running = True
files_to_add = []
while self.running:
files = []
while not self.file_queue.empty():
meta = self.file_queue.get()
# Archive name defaults to pysmurf because that is currently
Expand All @@ -194,7 +204,7 @@ def run(self, session, params=None):
if key in local_path:
deletable = False

files.append(
files_to_add.append(
create_file(local_path, remote_path, archive_name,
deletable=deletable)
)
Expand All @@ -205,9 +215,19 @@ def run(self, session, params=None):
meta=meta, e=e
)

if files:
with srfm.Session.begin() as session:
session.add_all(files)
if files_to_add:
try:
with srfm.Session.begin() as db_session:
db_session.add_all(files_to_add)
session.degraded = False
files_to_add = []
except sqlalchemy.exc.OperationalError:
session.degraded = True
self.log.error("Error adding files to database...")
self.log.error("{e}", e=traceback.format_exc())
self.log.info("Sleeping 5 sec and then trying again...")
time.sleep(5)
continue

if params['test_mode']:
break
Expand All @@ -216,13 +236,17 @@ def run(self, session, params=None):

return True, 'Monitor exited cleanly.'

def _stop(self, session, params=None):
def _stop(
self,
session: ocs_agent.OpSession,
params: Any = True,
) -> Tuple[bool, str]:
self.running = False
session.set_status('stopping')
return True, 'Done monitoring.'


def make_parser(parser=None):
def make_parser(parser: Optional[argparse.ArgumentParser]=None) -> argparse.ArgumentParser:
if parser is None:
parser = argparse.ArgumentParser()

Expand All @@ -241,7 +265,7 @@ def make_parser(parser=None):
return parser


def main(args=None):
def main(args=None) -> None:
parser = make_parser()
args = site_config.parse_args(agent_class='PysmurfMonitor',
parser=parser,
Expand Down

0 comments on commit 1d981ef

Please sign in to comment.