Skip to content

Commit

Permalink
Move frame file writings to a thread pool, to get a substantial impro…
Browse files Browse the repository at this point in the history
…vement of get_frames_path() speed
  • Loading branch information
pozitronik committed Dec 30, 2023
1 parent 5b49867 commit db1d5f9
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions sinner/handlers/frame/CV2VideoHandler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import glob
import os.path
from asyncio import Future
from pathlib import Path
from typing import List

from concurrent.futures import ThreadPoolExecutor, as_completed
import cv2
import psutil
from cv2 import VideoCapture
from tqdm import tqdm

Expand Down Expand Up @@ -89,34 +91,52 @@ def resolution(self) -> tuple[int, int]:
return self._resolution

def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]:
def write_done(future_: Future[None]) -> None:
progress.update()

start = frames_range[0] if frames_range[0] is not None else 0
stop = frames_range[1] if frames_range[1] is not None else self.fc - 1
# fixme: do not ignore, if frames already ignored over the frame index
with tqdm(
total=stop,
desc='Extracting frame',
unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
initial=start
) as progress:

with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core
future_to_frame = {}
capture = self.open()
capture.set(cv2.CAP_PROP_POS_FRAMES, start)
filename_length = len(str(self.fc)) # a way to determine frame names length
filename_length = len(str(self.fc))
Path(path).mkdir(parents=True, exist_ok=True)
while start <= stop:
frame: Frame
ret, frame = capture.read()
if not ret:
break
filename: str = os.path.join(path, str(start).zfill(filename_length) + ".png")
if write_to_image(frame, filename) is False:
raise Exception(f"Error writing {frame.nbytes} bytes to {filename}")
progress.update()
start += 1
capture.release()
frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png')))
return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)]

# Initialize the progress bar
with tqdm(
total=stop,
desc='Extracting frame',
unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
initial=start
) as progress:

for frame_index in range(start, stop + 1):
frame: Frame
ret, frame = capture.read()
if not ret:
break
filename: str = os.path.join(path, str(frame_index).zfill(filename_length) + ".png")
# Submit only the write_to_image function to the executor, excluding it processing time from the loop
future: Future[None] = executor.submit(write_to_image, frame, filename)
future.add_done_callback(write_done)
future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to

for future in as_completed(future_to_frame):
frame_index = future_to_frame[future]
try:
if not future.result():
raise Exception(f"Error writing frame {frame_index}")
except Exception as exc:
print(f'Frame {frame_index} generated an exception: {exc}')

capture.release()

frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png')))
return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)]

def extract_frame(self, frame_number: int) -> NumberedFrame:
if frame_number > self.fc:
Expand Down

0 comments on commit db1d5f9

Please sign in to comment.