diff --git a/cameras/classify.py b/cameras/classify.py index 2aa8ea64..04a5e0d8 100755 --- a/cameras/classify.py +++ b/cameras/classify.py @@ -19,7 +19,7 @@ import svgwrite import os from edgetpu.classification.engine import ClassificationEngine -import gstreamer +from gstreamer import InputStreamProcessor def load_labels(path): p = re.compile(r'\s*(\d+)(.+)') @@ -29,8 +29,8 @@ def load_labels(path): def generate_svg(dwg, text_lines): for y, line in enumerate(text_lines): - dwg.add(dwg.text(line, insert=(11, y*20+1), fill='black', font_size='20')) - dwg.add(dwg.text(line, insert=(10, y*20), fill='white', font_size='20')) + dwg.add(dwg.text(line, insert=(101, 10 + (y+1)*20+1), fill='black', font_size='20')) + dwg.add(dwg.text(line, insert=(100, 10 + (y+1)*20), fill='white', font_size='20')) def main(): default_model_dir = "ai_models" @@ -52,10 +52,10 @@ def main(): labels = load_labels(args.labels) last_time = time.monotonic() - def user_callback(image, svg_canvas): + def inference_callback(image, svg_canvas): nonlocal last_time start_time = time.monotonic() - print('Running TF/Coral classification model.\n ') + #print('Running TF/Coral classification model.\n ') results = engine.ClassifyWithImage(image, threshold=args.threshold, top_k=args.top_k) end_time = time.monotonic() text_lines = [ @@ -68,7 +68,8 @@ def user_callback(image, svg_canvas): last_time = end_time generate_svg(svg_canvas, text_lines) - result = gstreamer.run_pipeline(user_callback) + input_proc = InputStreamProcessor() + result = input_proc.run_pipeline(inference_callback) if __name__ == '__main__': main() diff --git a/cameras/gstreamer.py b/cameras/gstreamer.py index 991647d7..637cf14f 100755 --- a/cameras/gstreamer.py +++ b/cameras/gstreamer.py @@ -25,142 +25,146 @@ GObject.threads_init() Gst.init(None) -def on_bus_message(bus, message, loop): - t = message.type - if t == Gst.MessageType.EOS: - print('End of stream. Exiting gstreamer loop for this video stream.') - loop.quit() - elif t == Gst.MessageType.WARNING: - err, debug = message.parse_warning() - sys.stderr.write('Warning: %s: %s\n' % (err, debug)) - elif t == Gst.MessageType.ERROR: - err, debug = message.parse_error() - sys.stderr.write('Error: %s: %s\n' % (err, debug)) - loop.quit() - return True - -def on_new_sample(sink, overlay, appsink_size, user_function): - print('New image sample received.') - sample = sink.emit('pull-sample') - buf = sample.get_buffer() - caps = sample.get_caps() - struct = caps.get_structure(0) - # print("caps struct: {}".format(struct)) - video_source_width = struct["width"] - video_source_height = struct["height"] - print("Input video source width: {}, height: {}".format(video_source_width, video_source_height)) - result, mapinfo = buf.map(Gst.MapFlags.READ) - if result: - img = Image.frombytes('RGB', (appsink_size[0], appsink_size[1]), mapinfo.data, 'raw') - svg_canvas = svgwrite.Drawing('', size=(appsink_size[0], appsink_size[1])) - user_function(img, svg_canvas) - overlay.set_property('data', svg_canvas.tostring()) - buf.unmap(mapinfo) - return Gst.FlowReturn.OK - -def run_pipeline(user_function, - appsink_size=(320, 180)): - -# WORKS GREAT with streaming. No AI inference. Even smaller file sizes. 3.4MB per 1MB video. CPU at 10%! Uses GPU! -# PIPELINE = ' uridecodebin name=source latency=0 ! queue ! videoconvert ! omxh264enc ! h264parse ' -# PIPELINE += " ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000" - -# WORKS SLOW. Works fine with AI inference, but pegs the CPU at 200% -# turns out certain gstreamer video conversion operations are challenging to move to GPU and are taxing on CPU. -# TODO: figure out RPI hardware acceleration for h264 to RGB conversion, overlay and scaling. -# Default gst ops: videoconvert, videoscale and overlay are slow, using CPU. - PIPELINE = ' uridecodebin name=source latency=0 ' - PIPELINE += """ ! tee name=t - t. ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element} - t. ! {leaky_q} ! videoconvert - ! rsvgoverlay name=overlay fit-to-frame=true ! videoconvert - """ - # save video stream to files with 10 minutes duration - PIPELINE += " ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=600000000000" - -# Experimental version: doesn't work yet -# PIPELINE = ' uridecodebin name=source latency=100 ' + \ -# ' source. ! application/x-rtp, media=(string)audio ! decodebin ! audioconvert ! fakesink silent=false ' +\ -# ' source. ! application/x-rtp, media=(string)video ! decodebin ! ' +\ -# ' ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! video/x-raw, format=RGB' + \ -# ' ! {leaky_q} ! {sink_caps} ! {sink_element}' - -# PIPELINE += """ -# source. ! decodebin ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element} -# source. ! decodebin ! {leaky_q} ! videoconvert -# """ - -# PIPELINE += " ! vp8enc ! webmmux ! queue leaky=2 ! tcpserversink host=hass.lan port=8778 recover-policy=keyframe sync-method=latest-keyframe" - -#PIPELINE += " ! omxh264enc ! h264parse ! tee name=t_out " \ -# "t_out. ! {leaky_q} ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000 " \ -# "t_out. ! {leaky_q} ! decodebin ! vp8enc ! webmmux ! queue leaky=2 ! tcpserversink host=hass.lan port=8778 recover-policy=keyframe sync-method=latest-keyframe" - -# ! queue !rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! video/x-raw, format=RGB' -# " ! h264parse ! omxh264dec ! videoconvert ! {sink_caps} ! {sink_element}" -# "! queue ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element}" - -# PIPELINE += """ ! tee name=t -# t. ! {leaky_q} ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element} -# t. ! {leaky_q} ! videoconvert -# """ -# PIPELINE += " ! queue ! videoconvert ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=60000000000" - -# t. ! {leaky_q} ! videoconvert ! {sink_caps} ! {sink_element} -# t. ! {leaky_q} ! rtph264depay ! h264parse ! v4l2h264dec capture-io-mode=4 ! v4l2video12convert output-io-mode=5 capture-io-mode=4 ! {sink_caps} ! {sink_element} - -# OUTDATED: -# PIPELINE += '{leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}' -# PIPELINE += " ! splitmuxsink muxer=matroskamux location=\"test1-%02d.mkv\" max-size-time=60000000000" - - LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream' -# SINK_CAPS = 'video/x-raw,format=RGB' - SINK_CAPS = 'video/x-raw,format=RGB,width={width},pixel-aspect-ratio=1/1' - SINK_ELEMENT = 'appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true' - - sink_caps = SINK_CAPS.format(width=appsink_size[0], height=appsink_size[1]) - pipeline = PIPELINE.format(leaky_q=LEAKY_Q, - sink_caps=sink_caps, - sink_element=SINK_ELEMENT) - print('Gstreamer pipeline: ', pipeline) - pipeline = Gst.parse_launch(pipeline) - - video_src = pipeline.get_by_name('source') - video_src.props.uri = "rtsp://admin:121174l2ll74@192.168.86.131:554/ISAPI/Streaming/channels/101/picture" - print("Video source URI: {}".format(video_src.props.uri)) - overlay = pipeline.get_by_name('overlay') - print("overlay sink: {}".format(str(overlay))) - appsink = pipeline.get_by_name('appsink') - print("appsink: {}".format(str(appsink))) - print("appsink will emit signals: {}".format(appsink.props.emit_signals)) - print("Connecting AI model and detection overlay to video stream") - appsink.connect('new-sample', partial(on_new_sample, - overlay=overlay, appsink_size=appsink_size, user_function=user_function)) - loop = GObject.MainLoop() - - # set Gst debug log level -# Gst.debug_set_active(True) -# Gst.debug_set_default_threshold(3) - - # Set up a pipeline bus watch to catch errors. - bus = pipeline.get_bus() - bus.add_signal_watch() - bus.connect('message', on_bus_message, loop) - - # Run pipeline. - pipeline.set_state(Gst.State.PLAYING) - try: - print("Entering main gstreamer loop") - loop.run() - print("Exited gstreamer loop") - except Exception as e: - sys.stderr("GST loop exited with error: {} ".format(str(e))) - pass - # Clean up. - print("Cleaning up GST resources") - pipeline.set_state(Gst.State.NULL) - while GLib.MainContext.default().iteration(False): + +class InputStreamProcessor: + """ + Handles a wide range of media input sources and calls back for TF inference. + This class is not thread safe. Should not be called from multiple threads simultaneously. + """ + + class Shape: + width = height = None pass - print("Done.") + + def __init__(self): + # Gstreamer pipeline for a given input source (could be image, audio or video) + self.pipeline = None + self.video_source = None + # shape of the input stream image or video + self.source_shape = None + # appsink handlies GStreamer callbacks for TF inference + self.appsink = None + # shape of the image passed to Tensorflow for inference + # default values that are close to most TF image model input tensors + self.appsink_shape = self.Shape() + self.appsink_shape.width = 320 + self.appsink_shape.height = 320 + # overlay is where we draw labels and bounding boxes for users to see inference results + self.overlay = None + # inference callback + self.inference_callback = None + + def on_autoplug_continue(self, src_bin, src_pad, src_caps): + #print('on_autoplug_continue called for uridecodebin') + #print('src_bin: {}'.format(str(src_bin))) + #print('src_pad: {}'.format(str(src_pad))) + #print('src_caps: {}'.format(str(src_caps))) + struct = src_caps.get_structure(0) + #print("src caps struct: {}".format(struct)) + app_width = struct["width"] + app_height = struct["height"] + if app_width: + print("Input source width: {}, height: {}".format(app_width, app_height)) + return True + + def on_bus_message(self, bus, message, loop): + t = message.type + if t == Gst.MessageType.EOS: + print('End of stream. Exiting gstreamer loop for this video stream.') + loop.quit() + elif t == Gst.MessageType.WARNING: + err, debug = message.parse_warning() + sys.stderr.write('Warning: %s: %s\n' % (err, debug)) + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + sys.stderr.write('Error: %s: %s\n' % (err, debug)) + loop.quit() + return True + + def on_new_sample(self, sink): + print('New image sample received.') + sample = sink.emit('pull-sample') + buf = sample.get_buffer() + caps = sample.get_caps() + struct = caps.get_structure(0) + # print("appsink caps struct: {}".format(struct)) + app_width = struct["width"] + app_height = struct["height"] + # print("appsink(inference image) width: {}, height: {}".format(app_width, app_height)) + result, mapinfo = buf.map(Gst.MapFlags.READ) + if result: + img = Image.frombytes('RGB', (app_width, app_height), mapinfo.data, 'raw') + svg_canvas = svgwrite.Drawing('', size=(app_width, app_height)) + self.inference_callback(img, svg_canvas) + self.overlay.set_property('data', svg_canvas.tostring()) + buf.unmap(mapinfo) + return Gst.FlowReturn.OK + + def run_pipeline(self, inf_callback, + appsink_size=None): + + # Note to self: The pipeline args below work but slow. Work fine with AI inference, but peg the CPU at 200% + # turns out certain gstreamer video conversion operations are challenging to move to GPU and are taxing on CPU. + # TODO: figure out RPI4 hardware acceleration for h264 to RGB conversion, overlay and scaling. + # Default gst ops: videoconvert, videoscale and overlay are slow, using CPU. + PIPELINE = ' uridecodebin name=source latency=0 ' + PIPELINE += """ ! tee name=t + t. ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element} + t. ! {leaky_q} ! videoconvert + ! rsvgoverlay name=overlay fit-to-frame=true ! videoconvert + """ + # save video stream to files with 10 minutes duration + PIPELINE += " ! omxh264enc ! h264parse ! splitmuxsink muxer=matroskamux location=\"tmp/test1-%02d.mkv\" max-size-time=600000000000" + + LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream' + SINK_CAPS = 'video/x-raw,format=RGB,width={width},pixel-aspect-ratio=1/1' + SINK_ELEMENT = 'appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true' + + if appsink_size: + self.appsink_shape = appsink_size + sink_caps = SINK_CAPS.format(width=self.appsink_shape.width, height=self.appsink_shape.height) + pipeline_args = PIPELINE.format(leaky_q=LEAKY_Q, + sink_caps=sink_caps, + sink_element=SINK_ELEMENT) + print('Gstreamer pipeline: ', pipeline_args) + self.pipeline = Gst.parse_launch(pipeline_args) + self.video_source = self.pipeline.get_by_name('source') + self.video_source.props.uri = "rtsp://admin:121174l2ll74@192.168.86.131:554/ISAPI/Streaming/channels/101/picture" + self.video_source.connect('autoplug-continue', self.on_autoplug_continue) + self.overlay = self.pipeline.get_by_name('overlay') + print("overlay sink: {}".format(str(self.overlay))) + self.appsink = self.pipeline.get_by_name('appsink') + print("appsink: {}".format(str(self.appsink))) + print("appsink will emit signals: {}".format(self.appsink.props.emit_signals)) + print("Connecting AI model and detection overlay to video stream") + self.appsink.connect('new-sample', self.on_new_sample) + self.inference_callback = inf_callback + loop = GObject.MainLoop() + + # set Gst debug log level + # Gst.debug_set_active(True) + # Gst.debug_set_default_threshold(3) + + # Set up a pipeline bus watch to catch errors. + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect('message', self.on_bus_message, loop) + + # Run pipeline. + self.pipeline.set_state(Gst.State.PLAYING) + try: + print("Entering main gstreamer loop") + loop.run() + print("Exited gstreamer loop") + except Exception as e: + sys.stderr("GST loop exited with error: {} ".format(str(e))) + pass + + # Clean up. + print("Cleaning up GST resources") + self.pipeline.set_state(Gst.State.NULL) + while GLib.MainContext.default().iteration(False): + pass + print("Done.")