From f0e69e398d088fb81d778f30983ddf1e32f96b24 Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 00:49:26 +0400 Subject: [PATCH 1/6] Implement the self processing feature: frame handlers can do the whole processing part by themselves (useful, when the common processing logic is not suitable) --- sinner/BatchProcessingCore.py | 6 ++++-- sinner/processors/frame/BaseFrameProcessor.py | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sinner/BatchProcessingCore.py b/sinner/BatchProcessingCore.py index 9ed0c67f..0b05d9ed 100644 --- a/sinner/BatchProcessingCore.py +++ b/sinner/BatchProcessingCore.py @@ -125,8 +125,10 @@ def run(self) -> None: else: if state.is_started: self.update_status(f'Temp resources for this target already exists with {state.processed_frames_count} frames processed, continue processing with {state.processor_name}') - - self.process(current_processor, handler, state) + if current_processor.self_processing: + current_processor.process(handler, state) + else: + self.process(current_processor, handler, state) current_processor.release_resources() current_target_path = state.path temp_resources.append(state.path) diff --git a/sinner/processors/frame/BaseFrameProcessor.py b/sinner/processors/frame/BaseFrameProcessor.py index 859ce252..2fa03d73 100644 --- a/sinner/processors/frame/BaseFrameProcessor.py +++ b/sinner/processors/frame/BaseFrameProcessor.py @@ -4,6 +4,7 @@ from argparse import Namespace +from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler from sinner.models.State import State from sinner.Status import Status from sinner.validators.AttributeLoader import Rules @@ -13,6 +14,7 @@ class BaseFrameProcessor(ABC, Status): execution_provider: List[str] + self_processing: bool = False parameters: Namespace @@ -57,3 +59,6 @@ def execution_providers(self) -> List[str]: def configure_output_filename(self, callback: Callable[[str], None]) -> None: pass + + def process(self, handler: BaseFrameHandler, state: State) -> None: + pass From 5b498674458a9de6ed4107df1d5fb233a1a47c53 Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 00:50:06 +0400 Subject: [PATCH 2/6] Implement self-processing logic into FrameExtractor module to increase frame extraction process. --- sinner/processors/frame/FrameExtractor.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sinner/processors/frame/FrameExtractor.py b/sinner/processors/frame/FrameExtractor.py index 3773b423..1180e239 100644 --- a/sinner/processors/frame/FrameExtractor.py +++ b/sinner/processors/frame/FrameExtractor.py @@ -1,6 +1,7 @@ import os from argparse import Namespace +from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler from sinner.models.State import State from sinner.Status import Status from sinner.typing import Frame @@ -10,6 +11,7 @@ class FrameExtractor(BaseFrameProcessor): emoji: str = '🏃' + self_processing: bool = True def rules(self) -> Rules: return [ @@ -27,3 +29,6 @@ def configure_state(self, state: State) -> None: def process_frame(self, frame: Frame) -> Frame: return frame + + def process(self, handler: BaseFrameHandler, state: State) -> None: + handler.get_frames_paths(path=state.path, frames_range=(state.processed_frames_count, None)) From db1d5f9d62a7194d5f2a359ed7ad3c8e5ff41123 Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 01:51:04 +0400 Subject: [PATCH 3/6] Move frame file writings to a thread pool, to get a substantial improvement of get_frames_path() speed --- sinner/handlers/frame/CV2VideoHandler.py | 68 +++++++++++++++--------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/sinner/handlers/frame/CV2VideoHandler.py b/sinner/handlers/frame/CV2VideoHandler.py index a70f4cfa..13b11990 100644 --- a/sinner/handlers/frame/CV2VideoHandler.py +++ b/sinner/handlers/frame/CV2VideoHandler.py @@ -1,9 +1,11 @@ import glob import os.path +from asyncio import Future from pathlib import Path from typing import List - +from concurrent.futures import ThreadPoolExecutor, as_completed import cv2 +import psutil from cv2 import VideoCapture from tqdm import tqdm @@ -89,34 +91,52 @@ def resolution(self) -> tuple[int, int]: return self._resolution def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]: + def write_done(future_: Future[None]) -> None: + progress.update() + start = frames_range[0] if frames_range[0] is not None else 0 stop = frames_range[1] if frames_range[1] is not None else self.fc - 1 - # fixme: do not ignore, if frames already ignored over the frame index - with tqdm( - total=stop, - desc='Extracting frame', - unit='frame', - dynamic_ncols=True, - bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]', - initial=start - ) as progress: + + with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core + future_to_frame = {} capture = self.open() capture.set(cv2.CAP_PROP_POS_FRAMES, start) - filename_length = len(str(self.fc)) # a way to determine frame names length + filename_length = len(str(self.fc)) Path(path).mkdir(parents=True, exist_ok=True) - while start <= stop: - frame: Frame - ret, frame = capture.read() - if not ret: - break - filename: str = os.path.join(path, str(start).zfill(filename_length) + ".png") - if write_to_image(frame, filename) is False: - raise Exception(f"Error writing {frame.nbytes} bytes to {filename}") - progress.update() - start += 1 - capture.release() - frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png'))) - return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)] + + # Initialize the progress bar + with tqdm( + total=stop, + desc='Extracting frame', + unit='frame', + dynamic_ncols=True, + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]', + initial=start + ) as progress: + + for frame_index in range(start, stop + 1): + frame: Frame + ret, frame = capture.read() + if not ret: + break + filename: str = os.path.join(path, str(frame_index).zfill(filename_length) + ".png") + # Submit only the write_to_image function to the executor, excluding it processing time from the loop + future: Future[None] = executor.submit(write_to_image, frame, filename) + future.add_done_callback(write_done) + future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to + + for future in as_completed(future_to_frame): + frame_index = future_to_frame[future] + try: + if not future.result(): + raise Exception(f"Error writing frame {frame_index}") + except Exception as exc: + print(f'Frame {frame_index} generated an exception: {exc}') + + capture.release() + + frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png'))) + return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)] def extract_frame(self, frame_number: int) -> NumberedFrame: if frame_number > self.fc: From ea49e6e1987aea352698b3f021447bab793cb81d Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 12:04:11 +0400 Subject: [PATCH 4/6] Add state checks for self-processing in FrameExtractor --- sinner/processors/frame/FrameExtractor.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sinner/processors/frame/FrameExtractor.py b/sinner/processors/frame/FrameExtractor.py index 1180e239..9d5b8afb 100644 --- a/sinner/processors/frame/FrameExtractor.py +++ b/sinner/processors/frame/FrameExtractor.py @@ -1,6 +1,8 @@ import os from argparse import Namespace +from tqdm import tqdm + from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler from sinner.models.State import State from sinner.Status import Status @@ -32,3 +34,18 @@ def process_frame(self, frame: Frame) -> Frame: def process(self, handler: BaseFrameHandler, state: State) -> None: handler.get_frames_paths(path=state.path, frames_range=(state.processed_frames_count, None)) + _, lost_frames = state.final_check() + if lost_frames: + with tqdm( + total=len(lost_frames), + desc="Processing lost frames", unit='frame', + dynamic_ncols=True, + bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]', + ) as progress: + for frame_index in lost_frames: + state.save_temp_frame(handler.extract_frame(frame_index)) + progress.update() + + is_ok, _ = state.final_check() + if not is_ok: + raise Exception("Something went wrong on processed frames check") From e8c7fb5d605c1b78579e4b1b0dff6ced8138a0cd Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 12:07:27 +0400 Subject: [PATCH 5/6] Typehinting fixed --- sinner/handlers/frame/CV2VideoHandler.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sinner/handlers/frame/CV2VideoHandler.py b/sinner/handlers/frame/CV2VideoHandler.py index 13b11990..7e6f2db8 100644 --- a/sinner/handlers/frame/CV2VideoHandler.py +++ b/sinner/handlers/frame/CV2VideoHandler.py @@ -1,9 +1,8 @@ import glob import os.path -from asyncio import Future from pathlib import Path from typing import List -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, Future import cv2 import psutil from cv2 import VideoCapture @@ -91,7 +90,7 @@ def resolution(self) -> tuple[int, int]: return self._resolution def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]: - def write_done(future_: Future[None]) -> None: + def write_done(future_: Future[bool]) -> None: progress.update() start = frames_range[0] if frames_range[0] is not None else 0 @@ -121,7 +120,7 @@ def write_done(future_: Future[None]) -> None: break filename: str = os.path.join(path, str(frame_index).zfill(filename_length) + ".png") # Submit only the write_to_image function to the executor, excluding it processing time from the loop - future: Future[None] = executor.submit(write_to_image, frame, filename) + future: Future[bool] = executor.submit(write_to_image, frame, filename) future.add_done_callback(write_done) future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to From 5c2bee0962e150fd78dd3e73892739b386f88b8d Mon Sep 17 00:00:00 2001 From: Pozitronik Date: Sun, 31 Dec 2023 12:17:33 +0400 Subject: [PATCH 6/6] Grammar --- sinner/models/State.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sinner/models/State.py b/sinner/models/State.py index 6e76bf92..b232738a 100644 --- a/sinner/models/State.py +++ b/sinner/models/State.py @@ -164,13 +164,13 @@ def final_check(self) -> tuple[bool, List[int]]: if is_file(file_path) and os.path.getsize(file_path) == 0: zero_sized_files_count += 1 if zero_sized_files_count > 0: - self.update_status(message=f"There is zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD) + self.update_status(message=f"There are zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD) result = False lost_frames = [] if self.final_check_integrity and not self.is_finished: lost_frames = self.check_integrity() if lost_frames: - self.update_status(message=f"There is lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD) + self.update_status(message=f"There are lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD) result = False return result, lost_frames