Skip to content

Commit

Permalink
feat(pipeline): timeline of pipeline events
Browse files Browse the repository at this point in the history
  • Loading branch information
ivelin committed Oct 27, 2019
1 parent bb7c730 commit 779c5db
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 133 deletions.
25 changes: 25 additions & 0 deletions ambianic-debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash
set -ex

MY_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")
MY_DIR=$(dirname "${MY_PATH}")

# test if coral usb stick is available
USB_DIR=/dev/bus/usb

if [ -d "$USB_DIR" ]; then
USB_ARG="--device $USB_DIR"
else
USB_ARG=""
fi

# check if there is an image update
docker pull ambianic/ambianic:dev
# run dev image
docker run -it --rm \
--name ambianic-dev \
--mount type=bind,source="$MY_DIR",target=/workspace \
--publish 1234:1234 \
--publish 8778:8778 \
$USB_ARG \
ambianic/ambianic:dev
15 changes: 4 additions & 11 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,11 @@ data_dir: &data_dir ./data
# Set logging level to one of DEBUG, INFO, WARNING, ERROR
logging:
file: ./data/ambianic-log.txt
level: INFO

# path to saved object detections from front door camera stream
front_door_object_detect_dir: &fd_object_detect_dir ./data/detections/front-door/objects
# path to saved face detections from front door camera stream
front_door_face_detect_dir: &fd_face_detect_dir ./data/detections/front-door/faces

entry_area_object_detect_dir: &ea_object_detect_dir ./data/detections/entry-area/objects
entry_area_face_detect_dir: &ea_face_detect_dir ./data/detections/entry-area/faces
level: DEBUG

# Pipeline event timeline configuration
timeline:
event_log: ./data/timeline-event-log.yaml

# Cameras and other input data sources
# Using Home Assistant conventions to ease upcoming integration
Expand Down Expand Up @@ -68,14 +63,12 @@ pipelines:
<<: *tfm_image_detection
confidence_threshold: 0.8
- save_detections: # save samples from the inference results
output_directory: *fd_object_detect_dir
positive_interval: 2 # how often (in seconds) to save samples with ANY results above the confidence threshold
idle_interval: 6000 # how often (in seconds) to save samples with NO results above the confidence threshold
- detect_faces: # run ai inference on the samples from the previous element output
<<: *tfm_face_detection
confidence_threshold: 0.8
- save_detections: # save samples from the inference results
output_directory: *fd_face_detect_dir
positive_interval: 2
idle_interval: 600

Expand Down
67 changes: 64 additions & 3 deletions src/ambianic/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from typing import Iterable
from ambianic.util import ManagedService
from ambianic.pipeline.timeline import PipelineContext

log = logging.getLogger(__name__)

Expand All @@ -16,11 +17,71 @@
class PipeElement(ManagedService):
"""The basic building block of an Ambianic pipeline."""

def __init__(self):
def __init__(self,
element_name=None,
context: PipelineContext = None,
event_log: logging.Logger = None):
"""Create a PipeElement instance."""
super().__init__()
self._name = element_name
self._state = PIPE_STATE_STOPPED
self._next_element = None
self._latest_heartbeat = time.monotonic()
self._context = context
self._timeline_event_log = event_log

@property
def name(self) -> str:
"""Return this element's reference name in pipeline definitions."""
return self._name

@property
def context(self) -> PipelineContext:
"""Pipeline execution context.
:Returns:
-------
type: PipelineContext
pipeline execution context
"""
return self._context

def push_context(self, element_context: dict = None):
"""Push this element information to the context stack.
Invoke before the element yields its first sample output
for a given input sample.
:Parameters:
----------
element_context : dict
Contextual info about this element.
"""
if element_context is None:
element_context = {}
element_context['class'] = self.__class__.__name__
self._context.push_element_context(element_context)

def pop_context(self) -> dict:
"""Pop element information from the context stack.
Invoke after the element yields its last sample output
for a given input sample.
:Returns:
-------
type: dict
Element context info.
"""
return self._context.pop_element_context()

@property
def event_log(self) -> logging.Logger:
"""Get timeline event log for the current pipe execution context."""
return self._timeline_event_log

@property
def state(self):
Expand Down Expand Up @@ -161,7 +222,7 @@ class HealthChecker(PipeElement):
based on received output samples and their frequency.
"""

def __init__(self, health_status_callback=None):
def __init__(self, health_status_callback=None, **kwargs):
"""Create instance given health status callback.
The health status call back will be invoked each time
Expand All @@ -173,7 +234,7 @@ def __init__(self, health_status_callback=None):
Method that is expected to measure the overall pipeline throughput
health.
"""
super().__init__()
super().__init__(**kwargs)
assert health_status_callback
self._health_status_callback = health_status_callback

Expand Down
19 changes: 14 additions & 5 deletions src/ambianic/pipeline/ai/face_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class FaceDetector(TFImageDetection):

@staticmethod
def crop_image(image, box):
"""Crop image to given box."""
# Size of the image in pixels (size of orginal image)
# (This is not mandatory)
width, height = image.size
Expand All @@ -28,6 +29,7 @@ def crop_image(image, box):
return im1

def process_sample(self, **sample):
"""Detect faces in the given image sample."""
log.debug("Pipe element %s received new sample with keys %s.",
self.__class__.__name__,
str([*sample]))
Expand All @@ -46,20 +48,27 @@ def process_sample(self, **sample):
else:
# - apply face detection to cropped person areas
# - pass face detections on to next pipe element
for category, confidence, box in prev_inference_result:
if category == 'person' and \
for label, confidence, box in prev_inference_result:
if label == 'person' and \
confidence >= self._tfengine.confidence_threshold:
person_regions.append(box)
log.debug('Received %d person boxes for face detection',
len(person_regions))
for box in person_regions:
person_image = self.crop_image(image, box)
inference_result = self.detect(image=person_image)
log.warning('Face detection inference_result: %r',
inference_result)
log.debug('Face detection inference_result: %r',
inference_result)
inf_meta = {
'display': 'Face Detection'
# id
# version
# etc
}
processed_sample = {
'image': person_image,
'inference_result': inference_result
'inference_result': inference_result,
'inference_meta': inf_meta
}
yield processed_sample
except Exception as e:
Expand Down
15 changes: 7 additions & 8 deletions src/ambianic/pipeline/ai/image_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ def __init__(self,
"""
# log.warning('TFImageDetection __init__ invoked')
super().__init__()
super().__init__(**kwargs)
self._tfengine = TFInferenceEngine(
model=model,
labels=labels,
confidence_threshold=confidence_threshold,
top_k=top_k,
**kwargs)
top_k=top_k)
self._labels = self.load_labels(self._tfengine.labels_path)
self.last_time = time.monotonic()

Expand Down Expand Up @@ -123,7 +122,7 @@ def detect(self, image=None):
list of tuples
List of top_k detections above confidence_threshold.
Each detection is a tuple of:
(category, confidence, (x0, y0, x1, y1))
(label, confidence, (x0, y0, x1, y1))
"""
assert image
Expand Down Expand Up @@ -204,14 +203,14 @@ def detect(self, image=None):
# protect against models that return arbitrary labels
# when the confidence is low
if (li < len(self._labels)):
category = self._labels[li]
label = self._labels[li]
box = boxes[0, i, :]
x0 = box[1]
y0 = box[0]
x1 = box[3]
y1 = box[2]
inference_result.append((
category,
label,
confidence,
(x0, y0, x1, y1)))
return inference_result
Expand All @@ -225,6 +224,6 @@ def detect(self, image=None):
# for obj in objs:
# x0, y0, x1, y1 = obj.bounding_box.flatten().tolist()
# confidence = obj.score
# category = self.labels[obj.label_id]
# inference_result.append((category, confidence, (x0, y0, x1, y1)))
# label = self.labels[obj.label_id]
# inference_result.append((label, confidence, (x0, y0, x1, y1)))
# return inference_result
7 changes: 6 additions & 1 deletion src/ambianic/pipeline/ai/object_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ObjectDetector(TFImageDetection):
"""Detects objects in an image."""

def process_sample(self, **sample):
"""Detect objects in sample image."""
log.debug("%s received new sample", self.__class__.__name__)
if not sample:
# pass through empty samples to next element
Expand All @@ -19,10 +20,14 @@ def process_sample(self, **sample):
try:
image = sample['image']
inference_result = self.detect(image=image)
inf_meta = {
'display': 'Object Detection'
}
# pass on the results to the next connected pipe element
processed_sample = {
'image': image,
'inference_result': inference_result
'inference_result': inference_result,
'inference_meta': inf_meta
}
yield processed_sample
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/ambianic/pipeline/avsource/av_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, uri=None, type=None, live=False, **kwargs):
in case there is disruption of the source stream
until explicit stop() is requested of the element.
"""
super().__init__()
super().__init__(**kwargs)
assert uri
element_conf = dict(kwargs)
element_conf['uri'] = uri
Expand Down
31 changes: 25 additions & 6 deletions src/ambianic/pipeline/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from ambianic.pipeline.ai.face_detect import FaceDetector
from ambianic.pipeline.store import SaveDetectionSamples
from ambianic.pipeline import PipeElement, HealthChecker
from ambianic.pipeline import timeline
from ambianic.util import ThreadedJob, ManagedService, stacktrace

log = logging.getLogger(__name__)


def get_pipelines(pipelines_config):
def get_pipelines(pipelines_config, data_dir=None):
"""Initialize and return pipelines given config parameters.
:Parameters:
Expand All @@ -36,7 +37,7 @@ def get_pipelines(pipelines_config):
if pipelines_config:
for pname, pdef in pipelines_config.items():
log.info("loading %s pipeline configuration", pname)
p = Pipeline(pname=pname, pconfig=pdef)
p = Pipeline(pname=pname, pconfig=pdef, data_dir=data_dir)
pipelines.append(p)
else:
log.warning('No pipelines configured.')
Expand Down Expand Up @@ -80,7 +81,13 @@ def __init__(self, config=None):
pipelines_config = config.get('pipelines', None)
print('pipelines config: %r' % pipelines_config)
if pipelines_config:
self._pipelines = get_pipelines(pipelines_config)
# get main data dir config and pass
# on to pipelines to use
data_dir = config.get('data_dir', None)
if not data_dir:
data_dir = './data'
self._pipelines = get_pipelines(pipelines_config,
data_dir=data_dir)
for pp in self._pipelines:
pj = ThreadedJob(pp)
self._threaded_jobs.append(pj)
Expand Down Expand Up @@ -213,7 +220,7 @@ def _on_unknown_pipe_element(self, name=None):
' Ignoring element and moving forward.',
name)

def __init__(self, pname=None, pconfig=None):
def __init__(self, pname=None, pconfig=None, data_dir=None):
"""Init and load pipeline config."""
assert pname, "Pipeline name required"
self.name = pname
Expand All @@ -229,17 +236,28 @@ def __init__(self, pname=None, pconfig=None):
# in the future status may represent a spectrum of health issues
self._latest_health_status = True
self._healing_thread = None
self._context = timeline.PipelineContext(
unique_pipeline_name=self.name)
self._context.data_dir = data_dir
self._event_log = timeline.get_event_log(
pipeline_context=self._context)
for element_def in self.config:
log.info('Pipeline %s loading next element: %s',
pname, element_def)
element_name = [*element_def][0]
assert element_name
element_config = element_def[element_name]
element_class = self.PIPELINE_OPS.get(element_name, None)
if element_class:
log.info('Pipeline %s adding element name %s '
'with class %s and config %s',
pname, element_name, element_class, element_config)
element = element_class(**element_config)
element = element_class(
**element_config,
element_name=element_name,
context=self._context,
event_log=self._event_log
)
self._pipe_elements.append(element)
else:
self._on_unknown_pipe_element(name=element_name)
Expand Down Expand Up @@ -272,7 +290,8 @@ def start(self):
e_next = self._pipe_elements[i]
e.connect_to_next_element(e_next)
last_element = self._pipe_elements[len(self._pipe_elements)-1]
hc = HealthChecker(health_status_callback=self._heartbeat)
hc = HealthChecker(health_status_callback=self._heartbeat,
element_name='health_check')
last_element.connect_to_next_element(hc)
self._pipe_elements[0].start()
log.info("Stopped %s", self.__class__.__name__)
Expand Down
Loading

0 comments on commit 779c5db

Please sign in to comment.