You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For a bit of context, I'm trying to rewrite a single core python application that does a lot of image processing, to instead split off obtaining the image, and processing the image, in different processes using python multiprocessing. All whilst de-spaghettifying the main module. The application in question: https://github.com/Toufool/AutoSplit
I have decent experience of ReactiveX concepts in RxJS as I use it a lot when working with Angular.
I have read https://rxpy.readthedocs.io/en/latest/get_started.html#concurrency, it seems great when it'll come time to process the images (my application resizes some images and compares them with others for similarity). All I have to do is subscribe to an observable that emits the latest image when ready, and use the observe_on operator in my pipe (different parts of my application will use the source image in different ways, and they can all run in their own process/thread, namely 1 resizing and comparison, 2 displaying in the UI, 3 taking a screenshot and dumping it).
My problem comes from making the image obtention part multiprocessed. I don't want to spawn a process everytime I try to get a frame.
I also have different capture methods (BitBlt, BitBlt+FullContentRendering, D3dDesktopDuplication, Windows Graphics Capture API, ...) that all have wildly differing implementations which I'm already abstracting away. (some I have to poll constantly at an arbitrary framerate, some expose a datastream or callbacks with the framerate having to be specified ahead of time, or out of my control).
So I want a single process that runs continuously (either as fast as it can, or limited by an arbitrary timer to save on energy/battery life) and sends a "new frame" event whenever available. Preferably I'd also want it to NOT run whilst there's no subscribers.
Is this possible with RxPY? I'm trying to figure this out and not shoot myself in the foot before I start refactoring.
I was thinking something like this (no idea if this would even work, just trying to get the idea across):
from __future__ importannotationsfrommultiprocessingimportProcessimportcv2fromreactiveximportcreatefromreactivex.abc.disposableimportDisposableBasefromreactivex.abc.observerimportObserverBasefromreactivex.abc.schedulerimportSchedulerBasefromreactivex.operatorsimportsubscribe_onfromreactivex.schedulerimportThreadPoolScheduler# <snip imports>classBitBltCaptureMethod(CaptureMethodBase):
name="BitBlt"short_description="fastest, least compatible"description= (
"\nThe best option when compatible. But it cannot properly record "+"\nOpenGL, Hardware Accelerated or Exclusive Fullscreen windows. "+"\nThe smaller the selected region, the more efficient it is. "
)
_render_full_content=Falsedef__init__(self, autosplit: AutoSplit):
self.__autosplit_ref=autosplitself.frameObservable=create(self.__image_obtention_loop).pipe(
# Single thread, this always runs in its own dedicated processsubscribe_on(ThreadPoolScheduler(1)),
)
def__image_obtention_loop(
self,
observer: ObserverBase[cv2.Mat|None],
scheduler: SchedulerBase|None,
) ->DisposableBase:
whileTrue:
observer.on_next(self.__get_frame())
def__get_frame(self) ->cv2.Mat|None:
# Implementation details, this is the very heavy function# does things with `.self.__autosplit_ref`, and `_render_full_content`
Or even by directly using Process from multiprocessing like so?
# <snip>classBitBltCaptureMethod(CaptureMethodBase, Process):
name="BitBlt"short_description="fastest, least compatible"description= (
"\nThe best option when compatible. But it cannot properly record "+"\nOpenGL, Hardware Accelerated or Exclusive Fullscreen windows. "+"\nThe smaller the selected region, the more efficient it is. "
)
_render_full_content=Falsedef__init__(self, autosplit: AutoSplit):
self.__autosplit_ref=autosplitself.__frameSubject: BehaviorSubject[cv2.Mat|None] =BehaviorSubject(None)
# How do you expose an observable from a subject without exposing its "next" method in RxPY???self.frameObservable=Observable()
defrun(self):
# Frame obtention loopwhileTrue: # Is there a better way to not run this when there's no observers?iflen(self.__frameSubject.observers) >0:
self.__frameSubject.on_next(self.__get_frame())
else:
sleep(0)
def__get_frame(self) ->cv2.Mat|None:
# Implementation details, this is the very heavy function# does things with `.self.__autosplit_ref`, and `_render_full_content`
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
For a bit of context, I'm trying to rewrite a single core python application that does a lot of image processing, to instead split off obtaining the image, and processing the image, in different processes using python multiprocessing. All whilst de-spaghettifying the main module. The application in question: https://github.com/Toufool/AutoSplit
I have decent experience of ReactiveX concepts in RxJS as I use it a lot when working with Angular.
I have read https://rxpy.readthedocs.io/en/latest/get_started.html#concurrency, it seems great when it'll come time to process the images (my application resizes some images and compares them with others for similarity). All I have to do is subscribe to an observable that emits the latest image when ready, and use the
observe_on
operator in my pipe (different parts of my application will use the source image in different ways, and they can all run in their own process/thread, namely 1 resizing and comparison, 2 displaying in the UI, 3 taking a screenshot and dumping it).My problem comes from making the image obtention part multiprocessed. I don't want to spawn a process everytime I try to get a frame.
I also have different capture methods (BitBlt, BitBlt+FullContentRendering, D3dDesktopDuplication, Windows Graphics Capture API, ...) that all have wildly differing implementations which I'm already abstracting away. (some I have to poll constantly at an arbitrary framerate, some expose a datastream or callbacks with the framerate having to be specified ahead of time, or out of my control).
So I want a single process that runs continuously (either as fast as it can, or limited by an arbitrary timer to save on energy/battery life) and sends a "new frame" event whenever available. Preferably I'd also want it to NOT run whilst there's no subscribers.
Is this possible with RxPY? I'm trying to figure this out and not shoot myself in the foot before I start refactoring.
I was thinking something like this (no idea if this would even work, just trying to get the idea across):
Or even by directly using
Process
frommultiprocessing
like so?Beta Was this translation helpful? Give feedback.
All reactions