From 74d69960c3119151f6b57357e59e707762fd4643 Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Fri, 5 Jan 2024 16:15:44 +0400 Subject: [PATCH] Added memory control/info display code for the frame extractor --- sinner/handlers/frame/CV2VideoHandler.py | 41 +++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/sinner/handlers/frame/CV2VideoHandler.py b/sinner/handlers/frame/CV2VideoHandler.py index 7e6f2db8..265eb97a 100644 --- a/sinner/handlers/frame/CV2VideoHandler.py +++ b/sinner/handlers/frame/CV2VideoHandler.py @@ -1,7 +1,7 @@ import glob import os.path from pathlib import Path -from typing import List +from typing import List, Any from concurrent.futures import ThreadPoolExecutor, as_completed, Future import cv2 import psutil @@ -14,7 +14,7 @@ from sinner.helpers.FrameHelper import write_to_image, read_from_image from sinner.models.NumberedFrame import NumberedFrame from sinner.typing import NumeratedFramePath, Frame -from sinner.utilities import get_file_name, is_file +from sinner.utilities import get_file_name, is_file, get_mem_usage, suggest_max_memory from sinner.validators.AttributeLoader import Rules @@ -22,6 +22,9 @@ class CV2VideoHandler(BaseFrameHandler): emoji: str = '📹' output_fps: float + max_memory: int + + _statistics: dict[str, int] = {'mem_rss_max': 0, 'mem_vms_max': 0, 'limits_reaches': 0} def rules(self) -> Rules: return [ @@ -30,6 +33,10 @@ def rules(self) -> Rules: 'default': lambda: self.fps, 'help': 'FPS of resulting video' }, + { + 'parameter': 'max-memory', # key defined in Sin, but class can be called separately in tests + 'default': suggest_max_memory(), + }, { 'module_help': 'The video processing module, based on CV2 library' } @@ -91,12 +98,15 @@ def resolution(self) -> tuple[int, int]: def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]: def write_done(future_: Future[bool]) -> None: + futures.remove(future_) + progress.set_postfix(self.get_postfix(len(futures))) progress.update() start = frames_range[0] if frames_range[0] is not None else 0 stop = frames_range[1] if frames_range[1] is not None else self.fc - 1 with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core + futures: list[Future[bool]] = [] future_to_frame = {} capture = self.open() capture.set(cv2.CAP_PROP_POS_FRAMES, start) @@ -109,10 +119,9 @@ def write_done(future_: Future[bool]) -> None: desc='Extracting frame', unit='frame', dynamic_ncols=True, - bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]', + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]', initial=start ) as progress: - for frame_index in range(start, stop + 1): frame: Frame ret, frame = capture.read() @@ -122,7 +131,12 @@ def write_done(future_: Future[bool]) -> None: # Submit only the write_to_image function to the executor, excluding it processing time from the loop future: Future[bool] = executor.submit(write_to_image, frame, filename) future.add_done_callback(write_done) + futures.append(future) + progress.set_postfix(self.get_postfix(len(futures))) future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to + if get_mem_usage('vms', 'g') >= self.max_memory: + futures[:1][0].result() + self._statistics['limits_reaches'] += 1 for future in as_completed(future_to_frame): frame_index = future_to_frame[future] @@ -137,6 +151,25 @@ def write_done(future_: Future[bool]) -> None: frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png'))) return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)] + def get_mem_usage(self) -> str: + mem_rss = get_mem_usage() + mem_vms = get_mem_usage('vms') + if self._statistics['mem_rss_max'] < mem_rss: + self._statistics['mem_rss_max'] = mem_rss + if self._statistics['mem_vms_max'] < mem_vms: + self._statistics['mem_vms_max'] = mem_vms + return '{:.2f}'.format(mem_rss).zfill(5) + 'MB [MAX:{:.2f}'.format(self._statistics['mem_rss_max']).zfill(5) + 'MB]' + '/' + '{:.2f}'.format(mem_vms).zfill(5) + 'MB [MAX:{:.2f}'.format( + self._statistics['mem_vms_max']).zfill(5) + 'MB]' + + def get_postfix(self, futures_length: int) -> dict[str, Any]: + postfix = { + 'memory_usage': self.get_mem_usage(), + 'futures': futures_length, + } + if self._statistics['limits_reaches'] > 0: + postfix['limit_reaches'] = self._statistics['limits_reaches'] + return postfix + def extract_frame(self, frame_number: int) -> NumberedFrame: if frame_number > self.fc: raise EOutOfRange(frame_number, 0, self.fc)