From 779c5db49547238f31c71264b642386037df8923 Mon Sep 17 00:00:00 2001 From: Ivelin Ivanov Date: Sun, 27 Oct 2019 17:44:36 -0500 Subject: [PATCH] feat(pipeline): timeline of pipeline events --- ambianic-debug.sh | 25 +++ config.yaml | 15 +- src/ambianic/pipeline/__init__.py | 67 ++++++- src/ambianic/pipeline/ai/face_detect.py | 19 +- src/ambianic/pipeline/ai/image_detection.py | 15 +- src/ambianic/pipeline/ai/object_detect.py | 7 +- src/ambianic/pipeline/avsource/av_element.py | 2 +- src/ambianic/pipeline/interpreter.py | 31 +++- src/ambianic/pipeline/store.py | 79 +++++--- src/ambianic/pipeline/timeline.py | 185 +++++++++++++++++++ src/ambianic/server.py | 53 ++++-- tests/pipeline/ai/test_face_detect.py | 48 ++--- tests/pipeline/ai/test_object_detect.py | 10 +- tests/pipeline/avsource/test_avsource.py | 16 +- tests/pipeline/test_interpreter.py | 6 +- tests/pipeline/test_store.py | 47 +++-- tests/test_config.py | 8 +- 17 files changed, 500 insertions(+), 133 deletions(-) create mode 100755 ambianic-debug.sh create mode 100644 src/ambianic/pipeline/timeline.py diff --git a/ambianic-debug.sh b/ambianic-debug.sh new file mode 100755 index 00000000..a2901ea7 --- /dev/null +++ b/ambianic-debug.sh @@ -0,0 +1,25 @@ +#!/bin/bash +set -ex + +MY_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}") +MY_DIR=$(dirname "${MY_PATH}") + +# test if coral usb stick is available +USB_DIR=/dev/bus/usb + +if [ -d "$USB_DIR" ]; then + USB_ARG="--device $USB_DIR" +else + USB_ARG="" +fi + +# check if there is an image update +docker pull ambianic/ambianic:dev +# run dev image +docker run -it --rm \ + --name ambianic-dev \ + --mount type=bind,source="$MY_DIR",target=/workspace \ + --publish 1234:1234 \ + --publish 8778:8778 \ + $USB_ARG \ + ambianic/ambianic:dev diff --git a/config.yaml b/config.yaml index 03d81413..1957fb5c 100755 --- a/config.yaml +++ b/config.yaml @@ -9,16 +9,11 @@ data_dir: &data_dir ./data # Set logging level to one of DEBUG, INFO, WARNING, ERROR logging: file: ./data/ambianic-log.txt - level: INFO - -# path to saved object detections from front door camera stream -front_door_object_detect_dir: &fd_object_detect_dir ./data/detections/front-door/objects -# path to saved face detections from front door camera stream -front_door_face_detect_dir: &fd_face_detect_dir ./data/detections/front-door/faces - -entry_area_object_detect_dir: &ea_object_detect_dir ./data/detections/entry-area/objects -entry_area_face_detect_dir: &ea_face_detect_dir ./data/detections/entry-area/faces + level: DEBUG +# Pipeline event timeline configuration +timeline: + event_log: ./data/timeline-event-log.yaml # Cameras and other input data sources # Using Home Assistant conventions to ease upcoming integration @@ -68,14 +63,12 @@ pipelines: <<: *tfm_image_detection confidence_threshold: 0.8 - save_detections: # save samples from the inference results - output_directory: *fd_object_detect_dir positive_interval: 2 # how often (in seconds) to save samples with ANY results above the confidence threshold idle_interval: 6000 # how often (in seconds) to save samples with NO results above the confidence threshold - detect_faces: # run ai inference on the samples from the previous element output <<: *tfm_face_detection confidence_threshold: 0.8 - save_detections: # save samples from the inference results - output_directory: *fd_face_detect_dir positive_interval: 2 idle_interval: 600 diff --git a/src/ambianic/pipeline/__init__.py b/src/ambianic/pipeline/__init__.py index 3de78e2c..0055f23c 100755 --- a/src/ambianic/pipeline/__init__.py +++ b/src/ambianic/pipeline/__init__.py @@ -4,6 +4,7 @@ import time from typing import Iterable from ambianic.util import ManagedService +from ambianic.pipeline.timeline import PipelineContext log = logging.getLogger(__name__) @@ -16,11 +17,71 @@ class PipeElement(ManagedService): """The basic building block of an Ambianic pipeline.""" - def __init__(self): + def __init__(self, + element_name=None, + context: PipelineContext = None, + event_log: logging.Logger = None): + """Create a PipeElement instance.""" super().__init__() + self._name = element_name self._state = PIPE_STATE_STOPPED self._next_element = None self._latest_heartbeat = time.monotonic() + self._context = context + self._timeline_event_log = event_log + + @property + def name(self) -> str: + """Return this element's reference name in pipeline definitions.""" + return self._name + + @property + def context(self) -> PipelineContext: + """Pipeline execution context. + + :Returns: + ------- + type: PipelineContext + pipeline execution context + + """ + return self._context + + def push_context(self, element_context: dict = None): + """Push this element information to the context stack. + + Invoke before the element yields its first sample output + for a given input sample. + + :Parameters: + ---------- + element_context : dict + Contextual info about this element. + + """ + if element_context is None: + element_context = {} + element_context['class'] = self.__class__.__name__ + self._context.push_element_context(element_context) + + def pop_context(self) -> dict: + """Pop element information from the context stack. + + Invoke after the element yields its last sample output + for a given input sample. + + :Returns: + ------- + type: dict + Element context info. + + """ + return self._context.pop_element_context() + + @property + def event_log(self) -> logging.Logger: + """Get timeline event log for the current pipe execution context.""" + return self._timeline_event_log @property def state(self): @@ -161,7 +222,7 @@ class HealthChecker(PipeElement): based on received output samples and their frequency. """ - def __init__(self, health_status_callback=None): + def __init__(self, health_status_callback=None, **kwargs): """Create instance given health status callback. The health status call back will be invoked each time @@ -173,7 +234,7 @@ def __init__(self, health_status_callback=None): Method that is expected to measure the overall pipeline throughput health. """ - super().__init__() + super().__init__(**kwargs) assert health_status_callback self._health_status_callback = health_status_callback diff --git a/src/ambianic/pipeline/ai/face_detect.py b/src/ambianic/pipeline/ai/face_detect.py index fcd5a037..be4d865b 100755 --- a/src/ambianic/pipeline/ai/face_detect.py +++ b/src/ambianic/pipeline/ai/face_detect.py @@ -12,6 +12,7 @@ class FaceDetector(TFImageDetection): @staticmethod def crop_image(image, box): + """Crop image to given box.""" # Size of the image in pixels (size of orginal image) # (This is not mandatory) width, height = image.size @@ -28,6 +29,7 @@ def crop_image(image, box): return im1 def process_sample(self, **sample): + """Detect faces in the given image sample.""" log.debug("Pipe element %s received new sample with keys %s.", self.__class__.__name__, str([*sample])) @@ -46,8 +48,8 @@ def process_sample(self, **sample): else: # - apply face detection to cropped person areas # - pass face detections on to next pipe element - for category, confidence, box in prev_inference_result: - if category == 'person' and \ + for label, confidence, box in prev_inference_result: + if label == 'person' and \ confidence >= self._tfengine.confidence_threshold: person_regions.append(box) log.debug('Received %d person boxes for face detection', @@ -55,11 +57,18 @@ def process_sample(self, **sample): for box in person_regions: person_image = self.crop_image(image, box) inference_result = self.detect(image=person_image) - log.warning('Face detection inference_result: %r', - inference_result) + log.debug('Face detection inference_result: %r', + inference_result) + inf_meta = { + 'display': 'Face Detection' + # id + # version + # etc + } processed_sample = { 'image': person_image, - 'inference_result': inference_result + 'inference_result': inference_result, + 'inference_meta': inf_meta } yield processed_sample except Exception as e: diff --git a/src/ambianic/pipeline/ai/image_detection.py b/src/ambianic/pipeline/ai/image_detection.py index 7db97627..babc7c3d 100644 --- a/src/ambianic/pipeline/ai/image_detection.py +++ b/src/ambianic/pipeline/ai/image_detection.py @@ -33,13 +33,12 @@ def __init__(self, """ # log.warning('TFImageDetection __init__ invoked') - super().__init__() + super().__init__(**kwargs) self._tfengine = TFInferenceEngine( model=model, labels=labels, confidence_threshold=confidence_threshold, - top_k=top_k, - **kwargs) + top_k=top_k) self._labels = self.load_labels(self._tfengine.labels_path) self.last_time = time.monotonic() @@ -123,7 +122,7 @@ def detect(self, image=None): list of tuples List of top_k detections above confidence_threshold. Each detection is a tuple of: - (category, confidence, (x0, y0, x1, y1)) + (label, confidence, (x0, y0, x1, y1)) """ assert image @@ -204,14 +203,14 @@ def detect(self, image=None): # protect against models that return arbitrary labels # when the confidence is low if (li < len(self._labels)): - category = self._labels[li] + label = self._labels[li] box = boxes[0, i, :] x0 = box[1] y0 = box[0] x1 = box[3] y1 = box[2] inference_result.append(( - category, + label, confidence, (x0, y0, x1, y1))) return inference_result @@ -225,6 +224,6 @@ def detect(self, image=None): # for obj in objs: # x0, y0, x1, y1 = obj.bounding_box.flatten().tolist() # confidence = obj.score -# category = self.labels[obj.label_id] -# inference_result.append((category, confidence, (x0, y0, x1, y1))) +# label = self.labels[obj.label_id] +# inference_result.append((label, confidence, (x0, y0, x1, y1))) # return inference_result diff --git a/src/ambianic/pipeline/ai/object_detect.py b/src/ambianic/pipeline/ai/object_detect.py index be905256..cc706802 100755 --- a/src/ambianic/pipeline/ai/object_detect.py +++ b/src/ambianic/pipeline/ai/object_detect.py @@ -11,6 +11,7 @@ class ObjectDetector(TFImageDetection): """Detects objects in an image.""" def process_sample(self, **sample): + """Detect objects in sample image.""" log.debug("%s received new sample", self.__class__.__name__) if not sample: # pass through empty samples to next element @@ -19,10 +20,14 @@ def process_sample(self, **sample): try: image = sample['image'] inference_result = self.detect(image=image) + inf_meta = { + 'display': 'Object Detection' + } # pass on the results to the next connected pipe element processed_sample = { 'image': image, - 'inference_result': inference_result + 'inference_result': inference_result, + 'inference_meta': inf_meta } yield processed_sample except Exception as e: diff --git a/src/ambianic/pipeline/avsource/av_element.py b/src/ambianic/pipeline/avsource/av_element.py index d59c2a94..f1b66603 100755 --- a/src/ambianic/pipeline/avsource/av_element.py +++ b/src/ambianic/pipeline/avsource/av_element.py @@ -39,7 +39,7 @@ def __init__(self, uri=None, type=None, live=False, **kwargs): in case there is disruption of the source stream until explicit stop() is requested of the element. """ - super().__init__() + super().__init__(**kwargs) assert uri element_conf = dict(kwargs) element_conf['uri'] = uri diff --git a/src/ambianic/pipeline/interpreter.py b/src/ambianic/pipeline/interpreter.py index 894417f5..638ce453 100755 --- a/src/ambianic/pipeline/interpreter.py +++ b/src/ambianic/pipeline/interpreter.py @@ -8,12 +8,13 @@ from ambianic.pipeline.ai.face_detect import FaceDetector from ambianic.pipeline.store import SaveDetectionSamples from ambianic.pipeline import PipeElement, HealthChecker +from ambianic.pipeline import timeline from ambianic.util import ThreadedJob, ManagedService, stacktrace log = logging.getLogger(__name__) -def get_pipelines(pipelines_config): +def get_pipelines(pipelines_config, data_dir=None): """Initialize and return pipelines given config parameters. :Parameters: @@ -36,7 +37,7 @@ def get_pipelines(pipelines_config): if pipelines_config: for pname, pdef in pipelines_config.items(): log.info("loading %s pipeline configuration", pname) - p = Pipeline(pname=pname, pconfig=pdef) + p = Pipeline(pname=pname, pconfig=pdef, data_dir=data_dir) pipelines.append(p) else: log.warning('No pipelines configured.') @@ -80,7 +81,13 @@ def __init__(self, config=None): pipelines_config = config.get('pipelines', None) print('pipelines config: %r' % pipelines_config) if pipelines_config: - self._pipelines = get_pipelines(pipelines_config) + # get main data dir config and pass + # on to pipelines to use + data_dir = config.get('data_dir', None) + if not data_dir: + data_dir = './data' + self._pipelines = get_pipelines(pipelines_config, + data_dir=data_dir) for pp in self._pipelines: pj = ThreadedJob(pp) self._threaded_jobs.append(pj) @@ -213,7 +220,7 @@ def _on_unknown_pipe_element(self, name=None): ' Ignoring element and moving forward.', name) - def __init__(self, pname=None, pconfig=None): + def __init__(self, pname=None, pconfig=None, data_dir=None): """Init and load pipeline config.""" assert pname, "Pipeline name required" self.name = pname @@ -229,17 +236,28 @@ def __init__(self, pname=None, pconfig=None): # in the future status may represent a spectrum of health issues self._latest_health_status = True self._healing_thread = None + self._context = timeline.PipelineContext( + unique_pipeline_name=self.name) + self._context.data_dir = data_dir + self._event_log = timeline.get_event_log( + pipeline_context=self._context) for element_def in self.config: log.info('Pipeline %s loading next element: %s', pname, element_def) element_name = [*element_def][0] + assert element_name element_config = element_def[element_name] element_class = self.PIPELINE_OPS.get(element_name, None) if element_class: log.info('Pipeline %s adding element name %s ' 'with class %s and config %s', pname, element_name, element_class, element_config) - element = element_class(**element_config) + element = element_class( + **element_config, + element_name=element_name, + context=self._context, + event_log=self._event_log + ) self._pipe_elements.append(element) else: self._on_unknown_pipe_element(name=element_name) @@ -272,7 +290,8 @@ def start(self): e_next = self._pipe_elements[i] e.connect_to_next_element(e_next) last_element = self._pipe_elements[len(self._pipe_elements)-1] - hc = HealthChecker(health_status_callback=self._heartbeat) + hc = HealthChecker(health_status_callback=self._heartbeat, + element_name='health_check') last_element.connect_to_next_element(hc) self._pipe_elements[0].start() log.info("Stopped %s", self.__class__.__name__) diff --git a/src/ambianic/pipeline/store.py b/src/ambianic/pipeline/store.py index 33b53bf7..3213c408 100755 --- a/src/ambianic/pipeline/store.py +++ b/src/ambianic/pipeline/store.py @@ -1,9 +1,12 @@ +"""Pipeline sample storage elements.""" import logging import datetime import os import pathlib import json import uuid +from ambianic.pipeline import timeline +from ambianic.pipeline.timeline import PipelineEvent from ambianic.pipeline import PipeElement @@ -14,7 +17,6 @@ class SaveDetectionSamples(PipeElement): """Saves AI detection samples to an external storage location.""" def __init__(self, - output_directory='./', positive_interval=2, idle_interval=600, **kwargs): @@ -31,17 +33,28 @@ def __init__(self, Default it 10 minutes (600 seconds.) """ - super().__init__() + super().__init__(**kwargs) log.info('Loading pipe element %r ', self.__class__.__name__) - self._output_directory = output_directory + if self.context: + self._sys_data_dir = self.context.data_dir + else: + self._sys_data_dir = './data' + self._output_directory = pathlib.Path(self._sys_data_dir) assert self._output_directory, \ 'Pipe element %s: requires argument output_directory:' \ % self.__class__.__name__ - log.debug('output_directory: %r', output_directory) - self._output_directory = pathlib.Path(self._output_directory) + # mkdir succeeds even if directory exists. + self._output_directory.mkdir(parents=True, exist_ok=True) + # add unique suffix to output dir to avvoid collisions + now = datetime.datetime.now() + dir_prefix = 'detections/' + dir_time = now.strftime("%Y%m%d-%H%M%S.%f%z") + self._rel_data_dir = dir_prefix + dir_time + self._output_directory = self._output_directory / self._rel_data_dir self._output_directory.mkdir(parents=True, exist_ok=True) - # succeeds even if directory exists. - os.makedirs(self._output_directory, exist_ok=True) + self._output_directory = self._output_directory.resolve() + log.debug('output_directory: %r', self._output_directory) + # os.makedirs(self._output_directory, exist_ok=True) # by default save samples with detections every 2 seconds di = positive_interval self._positive_interval = datetime.timedelta(seconds=di) @@ -54,20 +67,20 @@ def __init__(self, self._idle_interval = datetime.timedelta(seconds=ii) self._time_latest_saved_idle = self._time_latest_saved_detection - def _save_sample(self, now, image, inference_result): - time_prefix = now.strftime("%Y%m%d-%H%M%S-{ftype}.{fext}") - image_file = time_prefix.format(ftype='image', fext='jpg') + def _save_sample(self, now, image, inference_result, inference_meta): + time_prefix = now.strftime("%Y%m%d-%H%M%S.%f%z.{fext}") + image_file = time_prefix.format(fext='jpg') image_path = self._output_directory / image_file - json_file = time_prefix.format(ftype='json', fext='txt') + json_file = time_prefix.format(fext='json') json_path = self._output_directory / json_file inf_json = [] - for category, confidence, box in inference_result: - log.info('category: %s , confidence: %.0f, box: %s', - category, + for label, confidence, box in inference_result: + log.info('label: %s , confidence: %.0f, box: %s', + label, confidence, box) one_inf = { - 'category': category, + 'label': label, 'confidence': float(confidence), 'box': { 'xmin': float(box[0]), @@ -77,20 +90,32 @@ def _save_sample(self, now, image, inference_result): } } inf_json.append(one_inf) - - ai_json = { + save_json = { 'id': uuid.uuid4().hex, 'datetime': now.isoformat(), - 'image': image_file, + 'image_file_name': image_file, + 'json_file_name': json_file, + # rel_dir is relative to system data dir + # this will be important when resloving REST API data + # file serving + 'rel_dir': self._rel_data_dir, 'inference_result': inf_json, + 'inference_meta': inference_meta } image.save(image_path) # save samples to local disk with open(json_path, 'w', encoding='utf-8') as f: - json.dump(ai_json, f, ensure_ascii=False, indent=4) + json.dump(save_json, f, ensure_ascii=False, indent=4) + # e = PipelineEvent('Detected Objects', type='ObjectDetection') + self.event_log.info('Detection Event', save_json) return image_path, json_path - def process_sample(self, image=None, inference_result=None, **sample): + def process_sample(self, + image=None, + inference_result=None, + inference_meta=None, + **sample): + """Process next detection sample.""" log.debug("Pipe element %s received new sample with keys %s.", self.__class__.__name__, str([*sample])) @@ -107,7 +132,10 @@ def process_sample(self, image=None, inference_result=None, **sample): # the user specified positive_interval if now - self._time_latest_saved_detection >= \ self._positive_interval: - self._save_sample(now, image, inference_result) + self._save_sample(now, + image, + inference_result, + inference_meta) self._time_latest_saved_detection = now else: # non-empty result, there is a detection @@ -115,11 +143,14 @@ def process_sample(self, image=None, inference_result=None, **sample): # the user specified positive_interval if now - self._time_latest_saved_idle >= \ self._idle_interval: - self._save_sample(now, image, inference_result) + self._save_sample(now, + image, + inference_result, + inference_meta) self._time_latest_saved_idle = now except Exception as e: - log.warning('Error %r while saving sample %r', - e, sample) + log.exception('Error %r while saving sample %r', + e, sample) finally: # pass on the sample to the next pipe element if there is one processed_sample = { diff --git a/src/ambianic/pipeline/timeline.py b/src/ambianic/pipeline/timeline.py new file mode 100644 index 00000000..a23d9c76 --- /dev/null +++ b/src/ambianic/pipeline/timeline.py @@ -0,0 +1,185 @@ +"""Pipeline event timeline read/write/search functions.""" + +import logging +import yaml +import uuid +import os +import pathlib + +log = logging.getLogger(__name__) +TIMELINE_EVENT_LOGGER_NAME = __name__ + '__timeline__event__logger__' +PIPELINE_CONTEXT_KEY = 'pipeline_context' + + +class PipelineEvent: + """Encapsulates information for a pipeline timeline event.""" + + def __init__(self, message: str = None, **kwargs): + """Create a new event instance. + + :Parameters: + ---------- + message : String + Human readable display message for the event. + **kwargs : type + Additional event arguments. + + """ + self.message = message + self.kwargs = kwargs + self.args = {} + self.args['message'] = self.message + self.args['args'] = self.kwargs + + def __str__(self): + """Format event as yaml string.""" + s = yaml.dump(self.kwargs) + return s + + +class PipelineContext: + """Runtime dynamic context for a pipeline. + + Carries information + such as pipeline name and pipe element stack + up to and including the element firing the event. + + """ + + def __init__(self, unique_pipeline_name: str = None): + """Instantiate timeline context for a pipeline. + + :Parameters: + ---------- + unique_pipeline_name : str + The unique runtime name of a pipeline. + + """ + self._unique_pipeline_name = unique_pipeline_name + self._element_stack = [] + self._data_dir = None + + @property + def unique_pipeline_name(self): + """Return pipeline unique name.""" + return self.unique_pipeline_name + + @property + def data_dir(self): + """Return system wide configured data dir.""" + return self._data_dir + + @data_dir.setter + def data_dir(self, dd=None): + """Set system wide configured data dir.""" + self._data_dir = dd + + def push_element_context(self, element_context: dict = None): + """Push new element information to the context stack.""" + self._element_stack.append(element_context) + + def pop_element_context(self) -> dict: + """Pop element information from the context stack.""" + return self._element_stack.pop() + + pass + + +class PipelineEventFormatter(logging.Formatter): + """Custom logging formatter for pipeline events.""" + + def format(self, record: logging.LogRecord = None) -> str: + """Populate event information and return as yaml formatted string.""" + # s = super().format(record) + s = None + e = {} + e['id'] = uuid.uuid4().hex + e['message'] = record.getMessage() + # log.warning('record.message: %r', record.getMessage()) + # log.warning('record.args: %r', record.args) + e['created'] = record.created + e['priority'] = record.levelname + e['args'] = record.args + e['source_code'] = {} + e['source_code']['pathname'] = record.pathname + e['source_code']['funcName'] = record.funcName + e['source_code']['lineno'] = record.lineno + ctx = record.args.get(PIPELINE_CONTEXT_KEY, None) + if ctx: + e[PIPELINE_CONTEXT_KEY] = ctx.toDict() + # use array enclosure a[] to mainain the log file + # yaml compliant as new events are appended + # - event1: + # - event2: + # - ... + a = [e] + s = yaml.dump(a) + return s + + +def configure_timeline(config: dict = None): + """Initialize timeline event logger. + + Sets up pipeline event logger once to be reused by pipelines + in the current runtime. + Should be called before any pipeline starts. + + A good place to initialize it is around the time when the root logger + is initialized. + + :Parameters: + ------------ + + config : dict + A dictionary of configuration parameters. + + """ + if config is None: + config = {} + log_filename = config.get('event_log', None) + if not log_filename: + log_filename = 'timeline-event-log.yaml' + log_directory = os.path.dirname(log_filename) + with pathlib.Path(log_directory) as log_dir: + log_dir.mkdir(parents=True, exist_ok=True) + log.debug("Timeline event log messages directed to {}". + format(log_filename)) + event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME) + event_log.setLevel(logging.INFO) + # Use rotating files as log message handler + handler = logging.handlers.RotatingFileHandler( + log_filename, + # each event file will keep up to 100K data + maxBytes=100*1024, + # 100 backup files will be kept. Older will be erased. + backupCount=100) + fmt = PipelineEventFormatter() + handler.setFormatter(fmt) + # remove any other handlers that may be assigned previously + # and could cause unexpected log collisions + event_log.handlers = [] + # add custom event handler + event_log.addHandler(handler) + + +def get_event_log(pipeline_context: PipelineContext = None) \ + -> logging.Logger: + """Get an instance of pipeline event logger. + + :Parameters: + ---------- + pipe_context : PipelineContext + + :Returns: + ------- + type + Implementation of logging.Logger that handles pipeline events + + """ + pipeline_event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME) + # wrap logger in an adapter that carries pipeline context + # such as pipeline name and current pipe element. + pipeline_event_log = logging.LoggerAdapter( + pipeline_event_log, + {PIPELINE_CONTEXT_KEY: pipeline_context}) + return pipeline_event_log diff --git a/src/ambianic/server.py b/src/ambianic/server.py index b687c8d6..9edae56e 100644 --- a/src/ambianic/server.py +++ b/src/ambianic/server.py @@ -1,12 +1,14 @@ """Main Ambianic server module.""" import time import logging +import logging.handlers import os import pathlib import yaml from ambianic.webapp.flaskr import FlaskServer from ambianic.pipeline.interpreter import PipelineServer from ambianic.util import ServiceExit, stacktrace +from ambianic.pipeline import timeline log = logging.getLogger(__name__) @@ -36,34 +38,48 @@ def _configure_logging(config=None): except AttributeError as e: log.warning("Invalid log level: %s . Error: %s", log_level, e) log.warning('Defaulting log level to %s', default_log_level) + fmt = None if numeric_level <= logging.INFO: format_cfg = '%(asctime)s %(levelname)-4s ' \ '%(pathname)s.%(funcName)s(%(lineno)d): %(message)s' datefmt_cfg = '%Y-%m-%d %H:%M:%S' + fmt = logging.Formatter(fmt=format_cfg, + datefmt=datefmt_cfg, style='%') else: - format_cfg = None - datefmt_cfg = None + fmt = logging.Formatter() + root_logger = logging.getLogger() + # remove any other handlers that may be assigned previously + # and could cause unexpected log collisions + root_logger.handlers = [] + # add a console handler that only shows errors and warnings + ch = logging.StreamHandler() + ch.setLevel(logging.WARNING) + # add formatter to ch + ch.setFormatter(fmt) + # add ch to logger + root_logger.addHandler(ch) + # add a file handler if configured log_filename = config.get('file', None) if log_filename: log_directory = os.path.dirname(log_filename) with pathlib.Path(log_directory) as log_dir: log_dir.mkdir(parents=True, exist_ok=True) - print("Log messages directed to {}".format(log_filename)) - root_logger = logging.getLogger() - # remove any outside handlers - while root_logger.hasHandlers(): - root_logger.removeHandler(root_logger.handlers[0]) - logging.basicConfig( - format=format_cfg, - level=numeric_level, - datefmt=datefmt_cfg, - filename=log_filename) + print("Log messages directed to {}".format(log_filename)) + handler = logging.handlers.RotatingFileHandler( + log_filename, + # each log file will be up to 10MB in size + maxBytes=100*1024*1024, + # 20 backup files will be kept. Older will be erased. + backupCount=20) + handler.setFormatter(fmt) + root_logger.addHandler(handler) + root_logger.setLevel(numeric_level) effective_level = log.getEffectiveLevel() assert numeric_level == effective_level log.info('Logging configured with level %s', logging.getLevelName(effective_level)) if effective_level <= logging.DEBUG: - log.debug('Configuration dump:') + log.debug('Configuration yaml dump:') log.debug(yaml.dump(config)) @@ -92,15 +108,20 @@ def _configure(env_work_dir=None): base_config = cf.read() all_config = secrets_config + "\n" + base_config config = yaml.safe_load(all_config) + log.debug('loaded config from %r: %r', CONFIG_FILE, config) # configure logging logging_config = None if config: logging_config = config.get('logging', None) _configure_logging(logging_config) + # configure pipeline timeline event log + timeline_config = None + if config: + timeline_config = config.get('timeline', None) + timeline.configure_timeline(timeline_config) return config except Exception as e: - log.error("Failed to load configuration: %s", str(e)) - stacktrace() + log.exception('Failed to load configuration: %s', e, exc_info=True) return None @@ -110,7 +131,7 @@ class AmbianicServer: def __init__(self, work_dir=None): """Inititalize server from working directory files. - Parameters + :Parameters: ---------- work_dir : string The working directory where config and data reside. diff --git a/tests/pipeline/ai/test_face_detect.py b/tests/pipeline/ai/test_face_detect.py index b076480f..577e22a5 100644 --- a/tests/pipeline/ai/test_face_detect.py +++ b/tests/pipeline/ai/test_face_detect.py @@ -109,7 +109,7 @@ def test_no_sample(): config = _object_detect_config() result = 'Something' - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = image is None and inference_result is None face_detector = FaceDetector(**config) @@ -124,7 +124,7 @@ def test_bad_sample_good_sample(): config = _face_detect_config() result = 'Something' - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result face_detector = FaceDetector(**config) @@ -143,8 +143,8 @@ def sample_callback(image=None, inference_result=None): ) assert result assert len(result) == 1 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.8 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -155,7 +155,7 @@ def test_background_image_no_person(): config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = not image and not inference_result face_detector = FaceDetector(**config) @@ -172,7 +172,7 @@ def test_one_person_high_confidence_face_low_confidence_two_stage_pipe(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result # test stage one, obect detection -> out @@ -183,8 +183,8 @@ def sample_callback(image=None, inference_result=None): object_detector.receive_next_sample(image=img) assert result assert len(result) == 1 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.9 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -203,7 +203,7 @@ def test2_one_person_high_confidence_face_low_confidence_two_stage_pipe(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result @@ -215,8 +215,8 @@ def sample_callback(image=None, inference_result=None): object_detector.receive_next_sample(image=img) assert result assert len(result) == 1 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.9 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -235,7 +235,7 @@ def test_one_person_two_stage_pipe_low_person_confidence(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result @@ -255,7 +255,7 @@ def test_two_person_high_confidence_one_face_high_confidence_two_stage_pipe(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result @@ -267,13 +267,13 @@ def sample_callback(image=None, inference_result=None): object_detector.receive_next_sample(image=img) assert result assert len(result) == 2 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.9 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 - category, confidence, (x0, y0, x1, y1) = result[1] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[1] + assert label == 'person' assert confidence > 0.9 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -285,8 +285,8 @@ def sample_callback(image=None, inference_result=None): object_detector.receive_next_sample(image=img) assert result assert len(result) == 1 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.9 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -307,7 +307,7 @@ def test_two_person_with_faces_no_confidence_one_stage_pipe(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result face_detector = FaceDetector(**face_config) @@ -326,7 +326,7 @@ def test_one_person_face_high_confidence_one_stage_pipe(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result @@ -340,8 +340,8 @@ def sample_callback(image=None, inference_result=None): ) assert result assert len(result) == 1 - category, confidence, (x0, y0, x1, y1) = result[0] - assert category == 'person' + label, confidence, (x0, y0, x1, y1) = result[0] + assert label == 'person' assert confidence > 0.8 assert x0 > 0 and x0 < x1 assert y0 > 0 and y0 < y1 @@ -353,7 +353,7 @@ def test_one_person_no_face_two_stage(): face_config = _face_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result diff --git a/tests/pipeline/ai/test_object_detect.py b/tests/pipeline/ai/test_object_detect.py index 9387b0f6..98839614 100644 --- a/tests/pipeline/ai/test_object_detect.py +++ b/tests/pipeline/ai/test_object_detect.py @@ -85,7 +85,7 @@ def test_background_image(): config = _object_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result object_detector = ObjectDetector(**config) @@ -101,7 +101,7 @@ def test_one_person(): config = _object_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result @@ -124,7 +124,7 @@ def test_no_sample(): config = _object_detect_config() result = 'Something' - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = image is None and inference_result is None object_detector = ObjectDetector(**config) @@ -139,7 +139,7 @@ def test_bad_sample_good_sample(): config = _object_detect_config() result = 'nothing passed to me' - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result object_detector = ObjectDetector(**config) @@ -165,7 +165,7 @@ def test_one_person_no_face(): config = _object_detect_config() result = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal result result = inference_result diff --git a/tests/pipeline/avsource/test_avsource.py b/tests/pipeline/avsource/test_avsource.py index ffa68716..1a969326 100644 --- a/tests/pipeline/avsource/test_avsource.py +++ b/tests/pipeline/avsource/test_avsource.py @@ -79,7 +79,7 @@ def test_start_stop_file_source_image_size(): sample_received = threading.Event() sample_image = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal sample_received sample_image = image @@ -139,7 +139,7 @@ def test_start_stop_file_source_person_detect(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image @@ -191,7 +191,7 @@ def test_stop_on_video_EOS(): sample_received = threading.Event() sample_image = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal sample_received sample_image = image @@ -234,7 +234,7 @@ def test_still_image_input_detect_person_exit_eos(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image @@ -288,7 +288,7 @@ def test_still_image_input_detect_person_exit_stop_signal(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image @@ -408,7 +408,7 @@ def test_exception_on_new_sample(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image @@ -509,7 +509,7 @@ def test_gst_process_kill(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image @@ -625,7 +625,7 @@ def test_gst_process_terminate(): sample_image = None detections = None - def sample_callback(image=None, inference_result=None): + def sample_callback(image=None, inference_result=None, **kwargs): nonlocal sample_image nonlocal detection_received sample_image = image diff --git a/tests/pipeline/test_interpreter.py b/tests/pipeline/test_interpreter.py index 3f8d762a..8219ae1b 100644 --- a/tests/pipeline/test_interpreter.py +++ b/tests/pipeline/test_interpreter.py @@ -56,7 +56,7 @@ def test_get_pipelines_one(): assert isinstance(p[0], interpreter.Pipeline) assert p[0].name == 'pipeline_one' assert isinstance(p[0]._pipe_elements[0], _TestSourceElement) - assert p[0]._pipe_elements[0].config == {'uri': 'test'} + assert p[0]._pipe_elements[0].config['uri'] == 'test' def test_get_pipelines_two(): @@ -76,10 +76,10 @@ def test_get_pipelines_two(): assert p[0].name == 'pipeline_one' assert isinstance(p[0]._pipe_elements[0], _TestSourceElement) assert isinstance(p[1], interpreter.Pipeline) - assert p[0]._pipe_elements[0].config == {'uri': 'test'} + assert p[0]._pipe_elements[0].config['uri'] == 'test' assert p[1].name == 'pipeline_two' assert isinstance(p[1]._pipe_elements[0], _TestSourceElement) - assert p[1]._pipe_elements[0].config == {'uri': 'test2'} + assert p[1]._pipe_elements[0].config['uri'] == 'test2' def test_pipeline_start(): diff --git a/tests/pipeline/test_store.py b/tests/pipeline/test_store.py index a07e1641..c70708aa 100644 --- a/tests/pipeline/test_store.py +++ b/tests/pipeline/test_store.py @@ -4,6 +4,8 @@ from PIL import Image import os import json +import logging +from ambianic.pipeline.timeline import PipelineContext def test_process_sample_none(): @@ -23,11 +25,11 @@ class _TestSaveDetectionSamples(SaveDetectionSamples): _json_path = None _inf_result = None - def _save_sample(self, now, image, inference_result): + def _save_sample(self, now, image, inference_result, inference_meta): self._save_sample_called = True self._inf_result = inference_result self._img_path, self._json_path = \ - super()._save_sample(now, image, inference_result) + super()._save_sample(now, image, inference_result, inference_meta) def test_store_positive_detection(): @@ -38,7 +40,10 @@ def test_store_positive_detection(): 'tmp/' ) out_dir = os.path.abspath(out_dir) - store = _TestSaveDetectionSamples(output_directory=out_dir) + context = PipelineContext(unique_pipeline_name='test pipeline') + context.data_dir = out_dir + store = _TestSaveDetectionSamples(context=context, + event_log=logging.getLogger()) img = Image.new('RGB', (60, 30), color='red') detections = [ ('person', 0.98, (0, 1, 2, 3)) @@ -58,7 +63,7 @@ def test_store_positive_detection(): assert store._save_sample_called assert store._inf_result == detections assert store._img_path - img_dir = os.path.dirname(os.path.abspath(store._img_path)) + img_dir = os.path.dirname(os.path.abspath(store._img_path / "../../")) assert img_dir == out_dir out_img = Image.open(store._img_path) print(img_dir) @@ -66,20 +71,21 @@ def test_store_positive_detection(): assert out_img.mode == 'RGB' assert out_img.size[0] == 60 assert out_img.size[1] == 30 - json_dir = os.path.dirname(os.path.abspath(store._json_path)) + json_dir = os.path.dirname(os.path.abspath(store._json_path / "../../")) assert json_dir == out_dir print(json_dir) print(store._json_path) with open(store._json_path) as f: json_inf = json.load(f) print(json_inf) - img_fname = json_inf['image'] - img_fpath = os.path.join(out_dir, img_fname) + img_fname = json_inf['image_file_name'] + rel_dir = json_inf['rel_dir'] + img_fpath = os.path.join(out_dir, rel_dir, img_fname) assert img_fpath == str(store._img_path) json_inf_res = json_inf['inference_result'] assert len(json_inf_res) == 1 json_inf_res = json_inf_res[0] - assert json_inf_res['category'] == 'person' + assert json_inf_res['label'] == 'person' assert json_inf_res['confidence'] == 0.98 assert json_inf_res['box']['xmin'] == 0 assert json_inf_res['box']['ymin'] == 1 @@ -95,7 +101,11 @@ def test_store_negative_detection(): 'tmp/' ) out_dir = os.path.abspath(out_dir) - store = _TestSaveDetectionSamples(output_directory=out_dir) + out_dir = os.path.abspath(out_dir) + context = PipelineContext(unique_pipeline_name='test pipeline') + context.data_dir = out_dir + store = _TestSaveDetectionSamples(context=context, + event_log=logging.getLogger()) img = Image.new('RGB', (60, 30), color='red') detections = [] processed_samples = list(store.process_sample(image=img, @@ -110,7 +120,7 @@ def test_store_negative_detection(): assert store._save_sample_called assert store._inf_result == detections assert store._img_path - img_dir = os.path.dirname(os.path.abspath(store._img_path)) + img_dir = os.path.dirname(os.path.abspath(store._img_path / "../../")) assert img_dir == out_dir out_img = Image.open(store._img_path) print(img_dir) @@ -118,15 +128,16 @@ def test_store_negative_detection(): assert out_img.mode == 'RGB' assert out_img.size[0] == 60 assert out_img.size[1] == 30 - json_dir = os.path.dirname(os.path.abspath(store._json_path)) + json_dir = os.path.dirname(os.path.abspath(store._json_path / "../../")) assert json_dir == out_dir print(json_dir) print(store._json_path) with open(store._json_path) as f: json_inf = json.load(f) print(json_inf) - img_fname = json_inf['image'] - img_fpath = os.path.join(out_dir, img_fname) + img_fname = json_inf['image_file_name'] + rel_dir = json_inf['rel_dir'] + img_fpath = os.path.join(out_dir, rel_dir, img_fname) assert img_fpath == str(store._img_path) json_inf_res = json_inf['inference_result'] assert not json_inf_res @@ -136,20 +147,24 @@ class _TestSaveDetectionSamples2(SaveDetectionSamples): _save_sample_called = False - def _save_sample(self, now, image, inference_result): + def _save_sample(self, now, image, inference_result, inference_meta): self._save_sample_called = True raise RuntimeError() def test_process_sample_exception(): """Exception during processing should not prevent passing the sample on.""" - store = _TestSaveDetectionSamples2(output_directory="./tmp/") + context = PipelineContext(unique_pipeline_name='test pipeline') + context.data_dir = "./tmp/" + store = _TestSaveDetectionSamples2(context=context, + event_log=logging.getLogger()) img = Image.new('RGB', (60, 30), color='red') detections = [ ('person', 0.98, (0, 1, 2, 3)) ] processed_samples = list(store.process_sample(image=img, - inference_result=detections)) + inference_result=detections, + inference_meta=None)) assert store._save_sample_called assert len(processed_samples) == 1 print(processed_samples) diff --git a/tests/test_config.py b/tests/test_config.py index 60bf16da..d1525a99 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,6 +1,7 @@ """Test configuration functions.""" import pytest import logging +import logging.handlers import ambianic from ambianic.server import AmbianicServer from ambianic import server @@ -21,10 +22,13 @@ def test_log_config_with_file(): } server._configure_logging(config=log_config) handlers = logging.getLogger().handlers + log_fn = None for h in handlers: - if isinstance(h, logging.FileHandler): + if isinstance(h, logging.handlers.RotatingFileHandler): log_fn = h.baseFilename assert log_fn == log_config['file'] + # at least one log file name should be configured + assert log_fn def test_log_config_without_file(): @@ -33,7 +37,7 @@ def test_log_config_without_file(): server._configure_logging(config=log_config) handlers = logging.getLogger().handlers for h in handlers: - assert not isinstance(h, logging.FileHandler) + assert not isinstance(h, logging.handlers.RotatingFileHandler) def test_log_config_with_debug_level():