Skip to content

Commit

Permalink
Merge pull request #103 from pozitronik/extractor_memory_control
Browse files Browse the repository at this point in the history
Added memory control/info display code for the frame extractor
  • Loading branch information
pozitronik authored Jan 5, 2024
2 parents 881fe5d + 74d6996 commit 2793860
Showing 1 changed file with 37 additions and 4 deletions.
41 changes: 37 additions & 4 deletions sinner/handlers/frame/CV2VideoHandler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import glob
import os.path
from pathlib import Path
from typing import List
from typing import List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
import cv2
import psutil
Expand All @@ -14,14 +14,17 @@
from sinner.helpers.FrameHelper import write_to_image, read_from_image
from sinner.models.NumberedFrame import NumberedFrame
from sinner.typing import NumeratedFramePath, Frame
from sinner.utilities import get_file_name, is_file
from sinner.utilities import get_file_name, is_file, get_mem_usage, suggest_max_memory
from sinner.validators.AttributeLoader import Rules


class CV2VideoHandler(BaseFrameHandler):
emoji: str = '📹'

output_fps: float
max_memory: int

_statistics: dict[str, int] = {'mem_rss_max': 0, 'mem_vms_max': 0, 'limits_reaches': 0}

def rules(self) -> Rules:
return [
Expand All @@ -30,6 +33,10 @@ def rules(self) -> Rules:
'default': lambda: self.fps,
'help': 'FPS of resulting video'
},
{
'parameter': 'max-memory', # key defined in Sin, but class can be called separately in tests
'default': suggest_max_memory(),
},
{
'module_help': 'The video processing module, based on CV2 library'
}
Expand Down Expand Up @@ -91,12 +98,15 @@ def resolution(self) -> tuple[int, int]:

def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]:
def write_done(future_: Future[bool]) -> None:
futures.remove(future_)
progress.set_postfix(self.get_postfix(len(futures)))
progress.update()

start = frames_range[0] if frames_range[0] is not None else 0
stop = frames_range[1] if frames_range[1] is not None else self.fc - 1

with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core
futures: list[Future[bool]] = []
future_to_frame = {}
capture = self.open()
capture.set(cv2.CAP_PROP_POS_FRAMES, start)
Expand All @@ -109,10 +119,9 @@ def write_done(future_: Future[bool]) -> None:
desc='Extracting frame',
unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]',
initial=start
) as progress:

for frame_index in range(start, stop + 1):
frame: Frame
ret, frame = capture.read()
Expand All @@ -122,7 +131,12 @@ def write_done(future_: Future[bool]) -> None:
# Submit only the write_to_image function to the executor, excluding it processing time from the loop
future: Future[bool] = executor.submit(write_to_image, frame, filename)
future.add_done_callback(write_done)
futures.append(future)
progress.set_postfix(self.get_postfix(len(futures)))
future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to
if get_mem_usage('vms', 'g') >= self.max_memory:
futures[:1][0].result()
self._statistics['limits_reaches'] += 1

for future in as_completed(future_to_frame):
frame_index = future_to_frame[future]
Expand All @@ -137,6 +151,25 @@ def write_done(future_: Future[bool]) -> None:
frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png')))
return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)]

def get_mem_usage(self) -> str:
mem_rss = get_mem_usage()
mem_vms = get_mem_usage('vms')
if self._statistics['mem_rss_max'] < mem_rss:
self._statistics['mem_rss_max'] = mem_rss
if self._statistics['mem_vms_max'] < mem_vms:
self._statistics['mem_vms_max'] = mem_vms
return '{:.2f}'.format(mem_rss).zfill(5) + 'MB [MAX:{:.2f}'.format(self._statistics['mem_rss_max']).zfill(5) + 'MB]' + '/' + '{:.2f}'.format(mem_vms).zfill(5) + 'MB [MAX:{:.2f}'.format(
self._statistics['mem_vms_max']).zfill(5) + 'MB]'

def get_postfix(self, futures_length: int) -> dict[str, Any]:
postfix = {
'memory_usage': self.get_mem_usage(),
'futures': futures_length,
}
if self._statistics['limits_reaches'] > 0:
postfix['limit_reaches'] = self._statistics['limits_reaches']
return postfix

def extract_frame(self, frame_number: int) -> NumberedFrame:
if frame_number > self.fc:
raise EOutOfRange(frame_number, 0, self.fc)
Expand Down

0 comments on commit 2793860

Please sign in to comment.