Skip to content

Commit

Permalink
Cleanup. Detecting from gstream pipeline the actual input stream shap…
Browse files Browse the repository at this point in the history
…e (width and height) as well as inference image shape.
  • Loading branch information
ivelin committed Aug 28, 2019
1 parent ab581d5 commit 0f33290
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 142 deletions.
13 changes: 7 additions & 6 deletions cameras/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import svgwrite
import os
from edgetpu.classification.engine import ClassificationEngine
import gstreamer
from gstreamer import InputStreamProcessor

def load_labels(path):
p = re.compile(r'\s*(\d+)(.+)')
Expand All @@ -29,8 +29,8 @@ def load_labels(path):

def generate_svg(dwg, text_lines):
for y, line in enumerate(text_lines):
dwg.add(dwg.text(line, insert=(11, y*20+1), fill='black', font_size='20'))
dwg.add(dwg.text(line, insert=(10, y*20), fill='white', font_size='20'))
dwg.add(dwg.text(line, insert=(101, 10 + (y+1)*20+1), fill='black', font_size='20'))
dwg.add(dwg.text(line, insert=(100, 10 + (y+1)*20), fill='white', font_size='20'))

def main():
default_model_dir = "ai_models"
Expand All @@ -52,10 +52,10 @@ def main():
labels = load_labels(args.labels)

last_time = time.monotonic()
def user_callback(image, svg_canvas):
def inference_callback(image, svg_canvas):
nonlocal last_time
start_time = time.monotonic()
print('Running TF/Coral classification model.\n ')
#print('Running TF/Coral classification model.\n ')
results = engine.ClassifyWithImage(image, threshold=args.threshold, top_k=args.top_k)
end_time = time.monotonic()
text_lines = [
Expand All @@ -68,7 +68,8 @@ def user_callback(image, svg_canvas):
last_time = end_time
generate_svg(svg_canvas, text_lines)

result = gstreamer.run_pipeline(user_callback)
input_proc = InputStreamProcessor()
result = input_proc.run_pipeline(inference_callback)

if __name__ == '__main__':
main()
276 changes: 140 additions & 136 deletions cameras/gstreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,142 +25,146 @@
GObject.threads_init()
Gst.init(None)

def on_bus_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print('End of stream. Exiting gstreamer loop for this video stream.')
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write('Warning: %s: %s\n' % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write('Error: %s: %s\n' % (err, debug))
loop.quit()
return True

def on_new_sample(sink, overlay, appsink_size, user_function):
print('New image sample received.')
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
caps = sample.get_caps()
struct = caps.get_structure(0)
# print("caps struct: {}".format(struct))
video_source_width = struct["width"]
video_source_height = struct["height"]
print("Input video source width: {}, height: {}".format(video_source_width, video_source_height))
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
img = Image.frombytes('RGB', (appsink_size[0], appsink_size[1]), mapinfo.data, 'raw')
svg_canvas = svgwrite.Drawing('', size=(appsink_size[0], appsink_size[1]))
user_function(img, svg_canvas)
overlay.set_property('data', svg_canvas.tostring())
buf.unmap(mapinfo)
return Gst.FlowReturn.OK

def run_pipeline(user_function,
appsink_size=(320, 180)):

# WORKS GREAT with streaming. No AI inference. Even smaller file sizes. 3.4MB per 1MB video. CPU at 10%! Uses GPU!
# PIPELINE = ' uridecodebin name=source latency=0 ! queue ! videoconvert ! omxh264enc ! h264parse '
# PIPELINE += " ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000"

# WORKS SLOW. Works fine with AI inference, but pegs the CPU at 200%
# turns out certain gstreamer video conversion operations are challenging to move to GPU and are taxing on CPU.
# TODO: figure out RPI hardware acceleration for h264 to RGB conversion, overlay and scaling.
# Default gst ops: videoconvert, videoscale and overlay are slow, using CPU.
PIPELINE = ' uridecodebin name=source latency=0 '
PIPELINE += """ ! tee name=t
t. ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}
t. ! {leaky_q} ! videoconvert
! rsvgoverlay name=overlay fit-to-frame=true ! videoconvert
"""
# save video stream to files with 10 minutes duration
PIPELINE += " ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=600000000000"

# Experimental version: doesn't work yet
# PIPELINE = ' uridecodebin name=source latency=100 ' + \
# ' source. ! application/x-rtp, media=(string)audio ! decodebin ! audioconvert ! fakesink silent=false ' +\
# ' source. ! application/x-rtp, media=(string)video ! decodebin ! ' +\
# ' ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! video/x-raw, format=RGB' + \
# ' ! {leaky_q} ! {sink_caps} ! {sink_element}'

# PIPELINE += """
# source. ! decodebin ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}
# source. ! decodebin ! {leaky_q} ! videoconvert
# """

# PIPELINE += " ! vp8enc ! webmmux ! queue leaky=2 ! tcpserversink host=hass.lan port=8778 recover-policy=keyframe sync-method=latest-keyframe"

#PIPELINE += " ! omxh264enc ! h264parse ! tee name=t_out " \
# "t_out. ! {leaky_q} ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000 " \
# "t_out. ! {leaky_q} ! decodebin ! vp8enc ! webmmux ! queue leaky=2 ! tcpserversink host=hass.lan port=8778 recover-policy=keyframe sync-method=latest-keyframe"

# ! queue !rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! video/x-raw, format=RGB'
# " ! h264parse ! omxh264dec ! videoconvert ! {sink_caps} ! {sink_element}"
# "! queue ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element}"

# PIPELINE += """ ! tee name=t
# t. ! {leaky_q} ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element}
# t. ! {leaky_q} ! videoconvert
# """
# PIPELINE += " ! queue ! videoconvert ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000"

# t. ! {leaky_q} ! videoconvert ! {sink_caps} ! {sink_element}
# t. ! {leaky_q} ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2video12convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element}

# OUTDATED:
# PIPELINE += '{leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}'
# PIPELINE += " ! splitmuxsink muxer=matroskamux location=\"test1-%02d.mkv\" max-size-time=60000000000"

LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream'
# SINK_CAPS = 'video/x-raw,format=RGB'
SINK_CAPS = 'video/x-raw,format=RGB,width={width},pixel-aspect-ratio=1/1'
SINK_ELEMENT = 'appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true'

sink_caps = SINK_CAPS.format(width=appsink_size[0], height=appsink_size[1])
pipeline = PIPELINE.format(leaky_q=LEAKY_Q,
sink_caps=sink_caps,
sink_element=SINK_ELEMENT)
print('Gstreamer pipeline: ', pipeline)
pipeline = Gst.parse_launch(pipeline)

video_src = pipeline.get_by_name('source')
video_src.props.uri = "rtsp://admin:[email protected]:554/ISAPI/Streaming/channels/101/picture"
print("Video source URI: {}".format(video_src.props.uri))
overlay = pipeline.get_by_name('overlay')
print("overlay sink: {}".format(str(overlay)))
appsink = pipeline.get_by_name('appsink')
print("appsink: {}".format(str(appsink)))
print("appsink will emit signals: {}".format(appsink.props.emit_signals))
print("Connecting AI model and detection overlay to video stream")
appsink.connect('new-sample', partial(on_new_sample,
overlay=overlay, appsink_size=appsink_size, user_function=user_function))
loop = GObject.MainLoop()

# set Gst debug log level
# Gst.debug_set_active(True)
# Gst.debug_set_default_threshold(3)

# Set up a pipeline bus watch to catch errors.
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', on_bus_message, loop)

# Run pipeline.
pipeline.set_state(Gst.State.PLAYING)
try:
print("Entering main gstreamer loop")
loop.run()
print("Exited gstreamer loop")
except Exception as e:
sys.stderr("GST loop exited with error: {} ".format(str(e)))
pass

# Clean up.
print("Cleaning up GST resources")
pipeline.set_state(Gst.State.NULL)
while GLib.MainContext.default().iteration(False):

class InputStreamProcessor:
"""
Handles a wide range of media input sources and calls back for TF inference.
This class is not thread safe. Should not be called from multiple threads simultaneously.
"""

class Shape:
width = height = None
pass
print("Done.")

def __init__(self):
# Gstreamer pipeline for a given input source (could be image, audio or video)
self.pipeline = None
self.video_source = None
# shape of the input stream image or video
self.source_shape = None
# appsink handlies GStreamer callbacks for TF inference
self.appsink = None
# shape of the image passed to Tensorflow for inference
# default values that are close to most TF image model input tensors
self.appsink_shape = self.Shape()
self.appsink_shape.width = 320
self.appsink_shape.height = 320
# overlay is where we draw labels and bounding boxes for users to see inference results
self.overlay = None
# inference callback
self.inference_callback = None

def on_autoplug_continue(self, src_bin, src_pad, src_caps):
#print('on_autoplug_continue called for uridecodebin')
#print('src_bin: {}'.format(str(src_bin)))
#print('src_pad: {}'.format(str(src_pad)))
#print('src_caps: {}'.format(str(src_caps)))
struct = src_caps.get_structure(0)
#print("src caps struct: {}".format(struct))
app_width = struct["width"]
app_height = struct["height"]
if app_width:
print("Input source width: {}, height: {}".format(app_width, app_height))
return True

def on_bus_message(self, bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print('End of stream. Exiting gstreamer loop for this video stream.')
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write('Warning: %s: %s\n' % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write('Error: %s: %s\n' % (err, debug))
loop.quit()
return True

def on_new_sample(self, sink):
print('New image sample received.')
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
caps = sample.get_caps()
struct = caps.get_structure(0)
# print("appsink caps struct: {}".format(struct))
app_width = struct["width"]
app_height = struct["height"]
# print("appsink(inference image) width: {}, height: {}".format(app_width, app_height))
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
img = Image.frombytes('RGB', (app_width, app_height), mapinfo.data, 'raw')
svg_canvas = svgwrite.Drawing('', size=(app_width, app_height))
self.inference_callback(img, svg_canvas)
self.overlay.set_property('data', svg_canvas.tostring())
buf.unmap(mapinfo)
return Gst.FlowReturn.OK

def run_pipeline(self, inf_callback,
appsink_size=None):

# Note to self: The pipeline args below work but slow. Work fine with AI inference, but peg the CPU at 200%
# turns out certain gstreamer video conversion operations are challenging to move to GPU and are taxing on CPU.
# TODO: figure out RPI4 hardware acceleration for h264 to RGB conversion, overlay and scaling.
# Default gst ops: videoconvert, videoscale and overlay are slow, using CPU.
PIPELINE = ' uridecodebin name=source latency=0 '
PIPELINE += """ ! tee name=t
t. ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}
t. ! {leaky_q} ! videoconvert
! rsvgoverlay name=overlay fit-to-frame=true ! videoconvert
"""
# save video stream to files with 10 minutes duration
PIPELINE += " ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=600000000000"

LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream'
SINK_CAPS = 'video/x-raw,format=RGB,width={width},pixel-aspect-ratio=1/1'
SINK_ELEMENT = 'appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true'

if appsink_size:
self.appsink_shape = appsink_size
sink_caps = SINK_CAPS.format(width=self.appsink_shape.width, height=self.appsink_shape.height)
pipeline_args = PIPELINE.format(leaky_q=LEAKY_Q,
sink_caps=sink_caps,
sink_element=SINK_ELEMENT)
print('Gstreamer pipeline: ', pipeline_args)
self.pipeline = Gst.parse_launch(pipeline_args)
self.video_source = self.pipeline.get_by_name('source')
self.video_source.props.uri = "rtsp://admin:[email protected]:554/ISAPI/Streaming/channels/101/picture"
self.video_source.connect('autoplug-continue', self.on_autoplug_continue)
self.overlay = self.pipeline.get_by_name('overlay')
print("overlay sink: {}".format(str(self.overlay)))
self.appsink = self.pipeline.get_by_name('appsink')
print("appsink: {}".format(str(self.appsink)))
print("appsink will emit signals: {}".format(self.appsink.props.emit_signals))
print("Connecting AI model and detection overlay to video stream")
self.appsink.connect('new-sample', self.on_new_sample)
self.inference_callback = inf_callback
loop = GObject.MainLoop()

# set Gst debug log level
# Gst.debug_set_active(True)
# Gst.debug_set_default_threshold(3)

# Set up a pipeline bus watch to catch errors.
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', self.on_bus_message, loop)

# Run pipeline.
self.pipeline.set_state(Gst.State.PLAYING)
try:
print("Entering main gstreamer loop")
loop.run()
print("Exited gstreamer loop")
except Exception as e:
sys.stderr("GST loop exited with error: {} ".format(str(e)))
pass

# Clean up.
print("Cleaning up GST resources")
self.pipeline.set_state(Gst.State.NULL)
while GLib.MainContext.default().iteration(False):
pass
print("Done.")

0 comments on commit 0f33290

Please sign in to comment.