Skip to content

Commit

Permalink
Get ESP Sample info from new CONSOLIDATED_MSG, if available
Browse files Browse the repository at this point in the history
  • Loading branch information
MBARIMike committed Aug 12, 2024
1 parent 6af70b5 commit 771c3e5
Showing 1 changed file with 71 additions and 10 deletions.
81 changes: 71 additions & 10 deletions stoqs/loaders/SampleLoaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
# E.g.: Cmd::Paused in FILTERING -- during Sample Pump (SP) move after sampling 486.135m
lsr_volume_re_paused = r'.*[Ss]ampl.+\s+(?P<volume_num>[-+]?\d*\.\d+)(?P<volume_units>[a-z]{2})'
lsr_esp_error_msg_re = r'(?P<esp_error_message>.+Error in PROCESSING.+)'
# Consolidated message containing ESP Cartridge number, start, and end times
CONSOLIDATED_MSG = re.compile(r'ESP Cartridge (?P<cartridge_number>\d+): S_FILTERING: start: (?P<start_dt>\S+) end: (?P<end_dt>\S+)')

# Begining of syslog, E.G.: 2019-08-16T19:51:31.135Z,1565985091.135 [ESPComponent](INFO)
iso_time = r'\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d*Z'
Expand Down Expand Up @@ -407,8 +409,11 @@ def _read_syslog(self, url, levels=('IMPORTANT',), components=('ESPComponent',))
for line in (lb.decode(errors='ignore') for lb in resp.iter_lines()):

# Ugly logic to handle multiple line messages
lm = re.match(beg_syslog_re, line)
lm = re.match(beg_syslog_re, line, re.MULTILINE)
if lm:
# if "ESP log summary report" in lm.groupdict().get('log_message'):
# self.logger.info(lm.groupdict().get('log_message'))

if lm.groupdict().get('log_component') in ('ESPComponent',) and lm.groupdict().get('log_level') in levels:
# December 2022 while adding Sipper capture: Not sure why there is this prev_line logic...
if prev_line_has_component:
Expand Down Expand Up @@ -497,6 +502,44 @@ def _get_esp_device(self, url):
break

return esp_device

def _esps_from_json_consolidated(self, platform_name, url, db_alias, use_syslog=True):
'''In June 2024 consolidated messages like this started appearing in the syslogs:
/mbari/LRAUV/makai/missionlogs/2024/20240607_20240612/20240607T125711/syslog:2024-06-08T02:27:20.778Z,1717813640.778 [ESPComponent](IMPORTANT): [sample #1] ESP Cartridge 59: S_FILTERING: start: 2024-06-08T01:36:36.784Z end: 2024-06-08T02:20:35.432Z
Return (esp_s_filtering, esp_s_stopping, esp_log_summaries, esp_log_criticals, esp_device) as in _esps_from_json() but with much simpler parsing
'''
esp_s_filtering = []
esp_s_stopping = []
esp_log_summaries = []
esp_log_criticals = []
esp_device = None

LOGSUMMARY = 'ESP log summary report'

if use_syslog:
log_important, _ = self._read_syslog(url)
log_critical, _ = self._read_syslog(url, levels=('CRITICAL',))
esp_device = self._get_esp_device(url)
else:
log_important = self._read_tethysdash(platform_name, url)

if not log_important and not log_critical:
return esp_s_filtering, esp_s_stopping, esp_log_summaries, esp_log_criticals, esp_device

Log = namedtuple('Log', 'esec text')
for rec in log_important:
if mcm := re.search(CONSOLIDATED_MSG, rec['text']):
# Truncate the fractional seconds and 'Z' from the _dt strings
start_esec = datetime.strptime(mcm.groupdict().get('start_dt')[:-5], "%Y-%m-%dT%H:%M:%S").timestamp()
end_esec = datetime.strptime(mcm.groupdict().get('end_dt')[:-5], "%Y-%m-%dT%H:%M:%S").timestamp()
if end_esec > start_esec:
esp_s_filtering.append(Log(start_esec, rec['text']))
esp_s_stopping.append(Log(end_esec, rec['text']))
esp_log_summaries.append(Log(rec['unixTime']/1000.0, rec['text']))
else:
self.logger.warning(f"Not saving Sample {rec['text']} as the end is before the start")

return esp_s_filtering, esp_s_stopping, esp_log_summaries, esp_log_criticals, esp_device

def _esps_from_json(self, platform_name, url, db_alias, use_syslog=True):
'''Retrieve Sample information that's available in the syslogurl from the TethysDash REST API
Expand Down Expand Up @@ -594,9 +637,11 @@ def _filter_stop_nums(self, filterings, stoppings, summaries):
for filtering, stopping, summary in zip(filterings, stoppings, summaries):
self.logger.debug(f"summary = {summary}")
ms = (re.match(sampling_start_re, filtering.text) or
re.match(no_num_sampling_start_re, filtering.text))
re.match(no_num_sampling_start_re, filtering.text) or
re.match(sample_prefix, filtering.text))
me = (re.match(sampling_end_re, stopping.text) or
re.match(no_num_sampling_end_re, stopping.text))
re.match(no_num_sampling_end_re, stopping.text) or
re.match(sample_prefix, stopping.text))
filter_nums.append(ms.groupdict().get('seq_num'))
stop_nums.append(me.groupdict().get('seq_num'))

Expand Down Expand Up @@ -765,21 +810,26 @@ def _validate_summaries(self, platform_name, filterings, stoppings, summaries, c

return filterings, stoppings, summaries

def _match_seq_to_cartridge(self, filterings, stoppings, summaries, before_seq_num_implemented=False):
def _match_seq_to_cartridge(self, filterings, stoppings, summaries, before_seq_num_implemented=False, consolidated_msg=False):
'''Take lists from parsing TethysDash log and build Sample names list with start and end times
'''
# Loop through extractions from syslog to build dictionary
sample_names = defaultdict(SampleInfo)
for filtering, stopping, summary in zip(filterings, stoppings, summaries):
self.logger.debug(f"summary = {summary}")
ms = (re.match(sampling_start_re, filtering.text) or
re.match(no_num_sampling_start_re, filtering.text))
re.match(no_num_sampling_start_re, filtering.text) or
re.match(sample_prefix, filtering.text))
me = (re.match(sampling_end_re, stopping.text) or
re.match(no_num_sampling_end_re, stopping.text))
re.match(no_num_sampling_end_re, stopping.text) or
re.match(sample_prefix, stopping.text))

lsr_seq_num = re.search(lsr_seq_num_re, summary.text, re.MULTILINE)
lsr_lsr_num_messages = re.search(lsr_num_messages_re, summary.text, re.MULTILINE)
lsr_cartridge_number = re.search(lsr_cartridge_number_re, summary.text, re.MULTILINE)
if consolidated_msg:
lsr_cartridge_number = re.search(CONSOLIDATED_MSG, summary.text)
else:
lsr_cartridge_number = re.search(lsr_cartridge_number_re, summary.text, re.MULTILINE)
lsr_volume = re.search(lsr_volume_re, summary.text, re.MULTILINE)
if not lsr_volume:
self.logger.debug(f"Could not parse lsr_volume from '{summary.text}'")
Expand All @@ -791,6 +841,8 @@ def _match_seq_to_cartridge(self, filterings, stoppings, summaries, before_seq_n
try:
if before_seq_num_implemented:
self.logger.info(f"This log is before seq_num was implemented - not checking for match")
elif consolidated_msg:
self.logger.info(f"This log uses the consolidated message - not checking for match")
elif not (ms.groupdict().get('seq_num') == me.groupdict().get('seq_num') == lsr_seq_num.groupdict().get('seq_num')):
self.logger.warn(f"Sample numbers do not match for '{filtering.text}', '{stopping.text}', and '{summary.text}'")
except AttributeError:
Expand Down Expand Up @@ -965,8 +1017,16 @@ def load_lrauv_samples(self, platform_name, activity_name, url, db_alias):
url looks like 'http://dods.mbari.org/opendap/data/lrauv/tethys/missionlogs/2018/20180906_20180917/20180908T084424/201809080844_201809112341_2S_scieng.nc'
'''
self.logger.info(f"Parsing ESP sample messages from /mbari/LRAUV/{'/'.join(url.split('/')[6:-1])}/syslog")
init_filterings, init_stoppings, init_summaries, init_criticals, esp_device = self._esps_from_json(platform_name, url, db_alias)
filterings, stoppings, summaries = self._validate_summaries(platform_name, init_filterings, init_stoppings, init_summaries, init_criticals)
init_filterings, init_stoppings, init_summaries, init_criticals, esp_device = self._esps_from_json_consolidated(platform_name, url, db_alias)
if not init_filterings and not init_stoppings and not init_summaries:
# Attempt to match filterings and stoppings from before June 2024 when consolidated messages started
consolidated_msg = False
self.logger.info(f"No consolidated ESP messages found, using legacy method to match filterings and stoppings")
init_filterings, init_stoppings, init_summaries, init_criticals, esp_device = self._esps_from_json(platform_name, url, db_alias)
filterings, stoppings, summaries = self._validate_summaries(platform_name, init_filterings, init_stoppings, init_summaries, init_criticals)
else:
consolidated_msg = True
filterings, stoppings, summaries = init_filterings, init_stoppings, init_summaries

# After 14 August 2018 a 'sample #<num>' is included in the log message:
# https://bitbucket.org/mbari/lrauv-application/pull-requests/76/add-sample-to-all-logimportant-entries/diff
Expand All @@ -977,7 +1037,8 @@ def load_lrauv_samples(self, platform_name, activity_name, url, db_alias):
esp_names = None
if filterings and stoppings and summaries:
esp_names = self._match_seq_to_cartridge(filterings, stoppings, summaries,
before_seq_num_implemented=before_seq_num_implemented)
before_seq_num_implemented=before_seq_num_implemented,
consolidated_msg=consolidated_msg)

samplings_at, sample_num_errs = self._sippers_from_json(platform_name, url)
sipper_names = self._match_sippers(samplings_at, sample_num_errs)
Expand Down

0 comments on commit 771c3e5

Please sign in to comment.