diff --git a/sinner/BatchProcessingCore.py b/sinner/BatchProcessingCore.py index 9ed0c67f..0b05d9ed 100644 --- a/sinner/BatchProcessingCore.py +++ b/sinner/BatchProcessingCore.py @@ -125,8 +125,10 @@ def run(self) -> None: else: if state.is_started: self.update_status(f'Temp resources for this target already exists with {state.processed_frames_count} frames processed, continue processing with {state.processor_name}') - - self.process(current_processor, handler, state) + if current_processor.self_processing: + current_processor.process(handler, state) + else: + self.process(current_processor, handler, state) current_processor.release_resources() current_target_path = state.path temp_resources.append(state.path) diff --git a/sinner/handlers/frame/CV2VideoHandler.py b/sinner/handlers/frame/CV2VideoHandler.py index a70f4cfa..7e6f2db8 100644 --- a/sinner/handlers/frame/CV2VideoHandler.py +++ b/sinner/handlers/frame/CV2VideoHandler.py @@ -2,8 +2,9 @@ import os.path from pathlib import Path from typing import List - +from concurrent.futures import ThreadPoolExecutor, as_completed, Future import cv2 +import psutil from cv2 import VideoCapture from tqdm import tqdm @@ -89,34 +90,52 @@ def resolution(self) -> tuple[int, int]: return self._resolution def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]: + def write_done(future_: Future[bool]) -> None: + progress.update() + start = frames_range[0] if frames_range[0] is not None else 0 stop = frames_range[1] if frames_range[1] is not None else self.fc - 1 - # fixme: do not ignore, if frames already ignored over the frame index - with tqdm( - total=stop, - desc='Extracting frame', - unit='frame', - dynamic_ncols=True, - bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]', - initial=start - ) as progress: + + with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core + future_to_frame = {} capture = self.open() capture.set(cv2.CAP_PROP_POS_FRAMES, start) - filename_length = len(str(self.fc)) # a way to determine frame names length + filename_length = len(str(self.fc)) Path(path).mkdir(parents=True, exist_ok=True) - while start <= stop: - frame: Frame - ret, frame = capture.read() - if not ret: - break - filename: str = os.path.join(path, str(start).zfill(filename_length) + ".png") - if write_to_image(frame, filename) is False: - raise Exception(f"Error writing {frame.nbytes} bytes to {filename}") - progress.update() - start += 1 - capture.release() - frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png'))) - return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)] + + # Initialize the progress bar + with tqdm( + total=stop, + desc='Extracting frame', + unit='frame', + dynamic_ncols=True, + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]', + initial=start + ) as progress: + + for frame_index in range(start, stop + 1): + frame: Frame + ret, frame = capture.read() + if not ret: + break + filename: str = os.path.join(path, str(frame_index).zfill(filename_length) + ".png") + # Submit only the write_to_image function to the executor, excluding it processing time from the loop + future: Future[bool] = executor.submit(write_to_image, frame, filename) + future.add_done_callback(write_done) + future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to + + for future in as_completed(future_to_frame): + frame_index = future_to_frame[future] + try: + if not future.result(): + raise Exception(f"Error writing frame {frame_index}") + except Exception as exc: + print(f'Frame {frame_index} generated an exception: {exc}') + + capture.release() + + frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png'))) + return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)] def extract_frame(self, frame_number: int) -> NumberedFrame: if frame_number > self.fc: diff --git a/sinner/models/State.py b/sinner/models/State.py index 6e76bf92..b232738a 100644 --- a/sinner/models/State.py +++ b/sinner/models/State.py @@ -164,13 +164,13 @@ def final_check(self) -> tuple[bool, List[int]]: if is_file(file_path) and os.path.getsize(file_path) == 0: zero_sized_files_count += 1 if zero_sized_files_count > 0: - self.update_status(message=f"There is zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD) + self.update_status(message=f"There are zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD) result = False lost_frames = [] if self.final_check_integrity and not self.is_finished: lost_frames = self.check_integrity() if lost_frames: - self.update_status(message=f"There is lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD) + self.update_status(message=f"There are lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD) result = False return result, lost_frames diff --git a/sinner/processors/frame/BaseFrameProcessor.py b/sinner/processors/frame/BaseFrameProcessor.py index 859ce252..2fa03d73 100644 --- a/sinner/processors/frame/BaseFrameProcessor.py +++ b/sinner/processors/frame/BaseFrameProcessor.py @@ -4,6 +4,7 @@ from argparse import Namespace +from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler from sinner.models.State import State from sinner.Status import Status from sinner.validators.AttributeLoader import Rules @@ -13,6 +14,7 @@ class BaseFrameProcessor(ABC, Status): execution_provider: List[str] + self_processing: bool = False parameters: Namespace @@ -57,3 +59,6 @@ def execution_providers(self) -> List[str]: def configure_output_filename(self, callback: Callable[[str], None]) -> None: pass + + def process(self, handler: BaseFrameHandler, state: State) -> None: + pass diff --git a/sinner/processors/frame/FrameExtractor.py b/sinner/processors/frame/FrameExtractor.py index 3773b423..9d5b8afb 100644 --- a/sinner/processors/frame/FrameExtractor.py +++ b/sinner/processors/frame/FrameExtractor.py @@ -1,6 +1,9 @@ import os from argparse import Namespace +from tqdm import tqdm + +from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler from sinner.models.State import State from sinner.Status import Status from sinner.typing import Frame @@ -10,6 +13,7 @@ class FrameExtractor(BaseFrameProcessor): emoji: str = '🏃' + self_processing: bool = True def rules(self) -> Rules: return [ @@ -27,3 +31,21 @@ def configure_state(self, state: State) -> None: def process_frame(self, frame: Frame) -> Frame: return frame + + def process(self, handler: BaseFrameHandler, state: State) -> None: + handler.get_frames_paths(path=state.path, frames_range=(state.processed_frames_count, None)) + _, lost_frames = state.final_check() + if lost_frames: + with tqdm( + total=len(lost_frames), + desc="Processing lost frames", unit='frame', + dynamic_ncols=True, + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]', + ) as progress: + for frame_index in lost_frames: + state.save_temp_frame(handler.extract_frame(frame_index)) + progress.update() + + is_ok, _ = state.final_check() + if not is_ok: + raise Exception("Something went wrong on processed frames check")