Skip to content

Commit

Permalink
Implemented threading with proper graceful shutdown. Added logging fr…
Browse files Browse the repository at this point in the history
…amework.
  • Loading branch information
ivelin committed Aug 29, 2019
1 parent 0f33290 commit 5db0d7b
Show file tree
Hide file tree
Showing 22 changed files with 18,628 additions and 122 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
tmp/
**/._**
**/*tmp___*
venv

Empty file modified ai_models/coco_labels.txt
100644 → 100755
Empty file.
Empty file modified ai_models/efficientnet-edgetpu-L_quant_edgetpu.tflite
100644 → 100755
Empty file.
Empty file modified ai_models/imagenet_labels.txt
100644 → 100755
Empty file.
Empty file modified ai_models/inat_bird_labels.txt
100644 → 100755
Empty file.
Empty file modified ai_models/inat_insect_labels.txt
100644 → 100755
Empty file.
Empty file modified ai_models/inat_plant_labels.txt
100644 → 100755
Empty file.
Empty file modified ai_models/pet_labels.txt
100644 → 100755
Empty file.
114 changes: 114 additions & 0 deletions ambianic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
name = "ambianic"

import time
import logging
import threading
import signal
import ambianic.flaskr
from ambianic.cameras.detect import CameraStreamProcessor


AI_MODELS_DIR = "ai_models"
CONFIG_DIR = "config"

is_configured = False

log = logging.getLogger(__name__)


def configure():
# TODO: read from an environment configured config file
logging.basicConfig(level=logging.INFO)
logging.info('configured')
return


class ThreadedJob(threading.Thread):

def __init__(self, job):
threading.Thread.__init__(self)

self.job = job
# The shutdown_flag is a threading.Event object that
# indicates whether the thread should be terminated.
# self.shutdown_flag = threading.Event()
# ... Other thread setup code here ...

def run(self):
log.info('Thread #%s started' % self.ident)

self.job.start()
# the following technique is helpful when the job is not stoppable
#while not self.shutdown_flag.is_set():
# # ... Job code here ...
# time.sleep(0.5)

# ... Clean shutdown code here ...
log.info('Thread #%s stopped' % self.ident)

def stop(self):
log.info('Thread #%s is signalled to stop' % self.ident)
self.job.stop()


class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
Method for controlled multi-threaded Python app exit suggested by George Notaras:
https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
"""
pass


def service_shutdown(signum, frame):
log.info('Caught signal %d' % signum)
raise ServiceExit


def start():
if not is_configured:
configure()
log.info('Started')

# Register the signal handlers
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

print('Starting main program')

# Start the job threads
try:
# start AI inference loop on camera streams
cams = CameraStreamProcessor()
j1 = ThreadedJob(cams)

# start web app server
flask_server = flaskr.FlaskServer()
j2 = ThreadedJob(flask_server)

j1.start()
j2.start()

def heartbeat():
log.info("Main loop alive.")

# Keep the main thread running, otherwise signals are ignored.
while True:
time.sleep(0.5)
threading.Timer(10, heartbeat).start()

except ServiceExit:
# Terminate the running threads.
# Set the shutdown flag on each thread to trigger a clean shutdown of each thread.
# j1.shutdown_flag.set()
j1.stop()
# j2.shutdown_flag.set()
j2.stop()
# Wait for the threads to close...
j1.join()
j2.join()

log.info('Exiting main program.')

File renamed without changes.
3 changes: 3 additions & 0 deletions ambianic/cameras/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import logging

logger = logging.getLogger(__name__)
12 changes: 12 additions & 0 deletions ambianic/cameras/camera.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<title>gst-stream</title>
</head>
<body>
<video width=320 height=240 autoplay>
<source src="http://hass.lan:8778">
</video>
</body>
</html>
3 changes: 1 addition & 2 deletions cameras/classify.py → ambianic/cameras/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import argparse
import time
import re
import svgwrite
import os
from edgetpu.classification.engine import ClassificationEngine
from gstreamer import InputStreamProcessor
Expand All @@ -43,7 +42,7 @@ def main():
default=os.path.join(default_model_dir, default_labels))
parser.add_argument('--top_k', type=int, default=3,
help='number of classes with highest score to display')
parser.add_argument('--threshold', type=float, default=0.1,
parser.add_argument('--threshold', type=float, default=0.6,
help='class score threshold')
args = parser.parse_args()

Expand Down
115 changes: 115 additions & 0 deletions ambianic/cameras/detect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A demo which runs object detection on camera frames.
export TEST_DATA=/usr/lib/python3/dist-packages/edgetpu/test_data
Run face detection model:
python3 -m edgetpuvision.detect \
--model ${TEST_DATA}/mobilenet_ssd_v2_face_quant_postprocess_edgetpu.tflite
Run coco model:
python3 -m edgetpuvision.detect \
--model ${TEST_DATA}/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite \
--labels ${TEST_DATA}/coco_labels.txt
"""
import argparse
import time
import re
import os
import logging
from edgetpu.detection.engine import DetectionEngine
import ambianic
from ambianic.cameras.gstreamer import InputStreamProcessor

log = logging.getLogger(__name__)


def load_labels(path):
p = re.compile(r'\s*(\d+)(.+)')
with open(path, 'r', encoding='utf-8') as f:
lines = (p.match(line).groups() for line in f.readlines())
return {int(num): text.strip() for num, text in lines}

def shadow_text(dwg, x, y, text, font_size=20):
dwg.add(dwg.text(text, insert=(x+1, y+1), fill='black', font_size=font_size))
dwg.add(dwg.text(text, insert=(x, y), fill='white', font_size=font_size))

def generate_svg(dwg, objs, labels, text_lines):
width, height = dwg.attribs['width'], dwg.attribs['height']
for y, line in enumerate(text_lines):
shadow_text(dwg, 10, y*20, line)
for obj in objs:
x0, y0, x1, y1 = obj.bounding_box.flatten().tolist()
x, y, w, h = x0, y0, x1 - x0, y1 - y0
x, y, w, h = int(x * width), int(y * height), int(w * width), int(h * height)
percent = int(100 * obj.score)
label = '%d%% %s' % (percent, labels[obj.label_id])
shadow_text(dwg, x, y - 5, label)
dwg.add(dwg.rect(insert=(x,y), size=(w, h),
fill='red', fill_opacity=0.3, stroke='white'))
#print("SVG canvas width: {w}, height: {h}".format(w=width,h=height))
#dwg.add(dwg.rect(insert=(0,0), size=(width, height),
# fill='green', fill_opacity=0.2, stroke='white'))

class CameraStreamProcessor():

def __init__(self):
self.input_proc = None

def start(self):
log.info("Starting %s ", self.__class__.__name__)
default_model_dir = ambianic.AI_MODELS_DIR
default_model = 'mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite'
default_labels = 'coco_labels.txt'
parser = argparse.ArgumentParser()
parser.add_argument('--model', help='.tflite model path',
default=os.path.join(default_model_dir,default_model))
parser.add_argument('--labels', help='label file path',
default=os.path.join(default_model_dir, default_labels))
parser.add_argument('--top_k', type=int, default=3,
help='number of classes with highest score to display')
parser.add_argument('--threshold', type=float, default=0.2,
help='class score threshold')
args = parser.parse_args()

print("Loading %s with %s labels."%(args.model, args.labels))
engine = DetectionEngine(args.model)
labels = load_labels(args.labels)

last_time = time.monotonic()

def inference_callback(image, svg_canvas):
nonlocal last_time
start_time = time.monotonic()
objs = engine.DetectWithImage(image, threshold=args.threshold,
keep_aspect_ratio=True, relative_coord=True,
top_k=args.top_k)
end_time = time.monotonic()
text_lines = [
'Inference: %.2f ms' %((end_time - start_time) * 1000),
'FPS: %.2f fps' %(1.0/(end_time - last_time)),
]
print(' '.join(text_lines))
last_time = end_time
generate_svg(svg_canvas, objs, labels, text_lines)

self.input_proc = InputStreamProcessor(inference_callback)
result = self.input_proc.run_pipeline()
log.info("Stopped %s", self.__class__.__name__)

def stop(self):
log.info("Stopping %s", self.__class__.__name__)
self.input_proc.stop_pipeline()
Loading

0 comments on commit 5db0d7b

Please sign in to comment.