Skip to content

Commit

Permalink
fix: samples rest api
Browse files Browse the repository at this point in the history
  • Loading branch information
ivelin committed Oct 24, 2019
1 parent 0ed36e2 commit bb7c730
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
78 changes: 50 additions & 28 deletions src/ambianic/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import abc
import time
from typing import Iterable
from ambianic.util import ManagedService

log = logging.getLogger(__name__)
Expand All @@ -16,21 +17,46 @@ class PipeElement(ManagedService):
"""The basic building block of an Ambianic pipeline."""

def __init__(self):
# log.warning('PipeElement __init__ invoked')
super().__init__()
self._state = PIPE_STATE_STOPPED
self._next_element = None
self._latest_heartbeat = time.monotonic()

@property
def state(self):
"""Lifecycle state of the pipe element."""
return self._state

def start(self):
"""Only sourcing elements (first in a pipeline) need to override.
It is invoked once when the enclosing pipeline is started. It should
continue to run until the corresponding stop() method is invoked on the
same object from a separate pipeline lifecycle manager thread.
It is recommended for overriding methods to invoke this base method
via super().start() before proceeding with custom logic.
"""
self._state = PIPE_STATE_RUNNING

@abc.abstractmethod
def heal(self): # pragma: no cover
"""Override with adequate implementation of a healing procedure.
heal() is invoked by a lifecycle manager when its determined that
the element does not respond within reasonable timeframe.
This can happen for example if the element depends on external IO
resources, which become unavailable for an extended period of time.
The healing procedure should be considered a chance to recover or find
an alternative way to proceed.
If heal does not reset the pipe element back to a responsive state,
it is likely that the lifecycle manager will stop the
element and its ecnlosing pipeline.
"""
pass

def healthcheck(self):
Expand All @@ -39,25 +65,25 @@ def healthcheck(self):
:returns: (timestamp, status) tuple with most recent heartbeat
timestamp and health status code ('OK' normally).
"""
oldest_heartbeat = self._latest_heartbeat
status = 'OK' # At some point status may carry richer information
return oldest_heartbeat, status
return self._latest_heartbeat, status

def heartbeat(self):
"""Set the heartbeat timestamp to time.monotonic()."""
log.debug('Pipeline element %s heartbeat signal.',
self.__class__.__name__)
"""Set the heartbeat timestamp to time.monotonic().
Keeping the heartbeat timestamp current informs
the lifecycle manager that this element is functioning
well.
"""
now = time.monotonic()
lapse = now - self._latest_heartbeat
log.debug('Pipeline element %s heartbeat lapse %f',
self.__class__.__name__, lapse)
self._latest_heartbeat = now

def stop(self):
"""Receive stop signal and act accordingly.
Subclasses should override this method by
first invoking their super class implementation and then running
Subclasses implementing sourcing elements should override this method
by first invoking their super class implementation and then running
through steps specific to stopping their ongoing sample processing.
"""
Expand All @@ -66,17 +92,21 @@ def stop(self):
def connect_to_next_element(self, next_element=None):
"""Connect this element to the next element in the pipe.
Subclasses should not have to override this method.
Subclasses should not override this method.
"""
assert next_element
assert isinstance(next_element, PipeElement)
self._next_element = next_element

def receive_next_sample(self, **sample):
"""Receive next sample from a connected previous element.
"""Receive next sample from a connected previous element if applicable.
Subclasses should not have to override this method.
All pipeline elements except for the first (sourcing) element
in the pipeline will depend on this method to feed them with new
samples to process.
Subclasses should not override this method.
:Parameters:
----------
Expand All @@ -88,16 +118,6 @@ def receive_next_sample(self, **sample):
"""
self.heartbeat()
# NOTE: A future implementation could maximize hardware
# resources by launching each sample processing into
# a separate thread. For example if an AI element
# returns 10 person boxes which then need to be
# scanned by the next element for faces, and the
# underlying architecture provides 16 available CPU cores
# or 10 EdgeTPUs, then each face detection in a person box
# can be launched independently from the others as soon as
# the person boxes come in from the object detection
# process_sample generator.
for processed_sample in self.process_sample(**sample):
if self._next_element:
if (processed_sample):
Expand All @@ -107,8 +127,8 @@ def receive_next_sample(self, **sample):
self.heartbeat()

@abc.abstractmethod # pragma: no cover
def process_sample(self, **sample):
"""Implement processing in subclass as a generator function.
def process_sample(self, **sample) -> Iterable[dict]:
"""Override and implement as generator.
Invoked by receive_next_sample() when the previous element
(or pipeline source) feeds another data input sample.
Expand All @@ -125,8 +145,10 @@ def process_sample(self, **sample):
adjacent connected pipe elements.
:Returns:
processed_sample: dict
Processed sample that will be passed to the next pipeline element.
----------
processed_sample: Iterable[dict]
Generates processed samples to be passed on
to the next pipeline element.
"""
yield sample
Expand Down
5 changes: 4 additions & 1 deletion src/ambianic/webapp/flaskr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import flask
from requests import get
from werkzeug.serving import make_server
from pathlib import Path
from ambianic.util import ServiceExit, ThreadedJob, ManagedService
from .server import samples
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -182,7 +183,9 @@ def static_file(path):

@app.route('/api/data/<path:path>')
def data_file(path):
return flask.send_from_directory('../../data', path)
data_path = Path('./data').resolve()
log.info('Serving static data file from: %r', data_path / path)
return flask.send_from_directory(data_path, path)

@app.route('/client', defaults={'path': 'index.html'})
@app.route('/client/', defaults={'path': 'index.html'})
Expand Down

0 comments on commit bb7c730

Please sign in to comment.