Skip to content

Commit

Permalink
Merge pull request #102 from pozitronik/self_processing
Browse files Browse the repository at this point in the history
This PR adds a self-processing feature to frame processors. It can be used when the main processing logic is not effective.
Also, there is an implementation of the self-processing feature in the FrameExtractor, which drastically increases the frame extraction speed.
  • Loading branch information
pozitronik authored Dec 31, 2023
2 parents 0b9d4f2 + 5c2bee0 commit 881fe5d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 28 deletions.
6 changes: 4 additions & 2 deletions sinner/BatchProcessingCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ def run(self) -> None:
else:
if state.is_started:
self.update_status(f'Temp resources for this target already exists with {state.processed_frames_count} frames processed, continue processing with {state.processor_name}')

self.process(current_processor, handler, state)
if current_processor.self_processing:
current_processor.process(handler, state)
else:
self.process(current_processor, handler, state)
current_processor.release_resources()
current_target_path = state.path
temp_resources.append(state.path)
Expand Down
67 changes: 43 additions & 24 deletions sinner/handlers/frame/CV2VideoHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import os.path
from pathlib import Path
from typing import List

from concurrent.futures import ThreadPoolExecutor, as_completed, Future
import cv2
import psutil
from cv2 import VideoCapture
from tqdm import tqdm

Expand Down Expand Up @@ -89,34 +90,52 @@ def resolution(self) -> tuple[int, int]:
return self._resolution

def get_frames_paths(self, path: str, frames_range: tuple[int | None, int | None] = (None, None)) -> List[NumeratedFramePath]:
def write_done(future_: Future[bool]) -> None:
progress.update()

start = frames_range[0] if frames_range[0] is not None else 0
stop = frames_range[1] if frames_range[1] is not None else self.fc - 1
# fixme: do not ignore, if frames already ignored over the frame index
with tqdm(
total=stop,
desc='Extracting frame',
unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
initial=start
) as progress:

with ThreadPoolExecutor(max_workers=psutil.cpu_count()) as executor: # use one worker per cpu core
future_to_frame = {}
capture = self.open()
capture.set(cv2.CAP_PROP_POS_FRAMES, start)
filename_length = len(str(self.fc)) # a way to determine frame names length
filename_length = len(str(self.fc))
Path(path).mkdir(parents=True, exist_ok=True)
while start <= stop:
frame: Frame
ret, frame = capture.read()
if not ret:
break
filename: str = os.path.join(path, str(start).zfill(filename_length) + ".png")
if write_to_image(frame, filename) is False:
raise Exception(f"Error writing {frame.nbytes} bytes to {filename}")
progress.update()
start += 1
capture.release()
frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png')))
return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)]

# Initialize the progress bar
with tqdm(
total=stop,
desc='Extracting frame',
unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]',
initial=start
) as progress:

for frame_index in range(start, stop + 1):
frame: Frame
ret, frame = capture.read()
if not ret:
break
filename: str = os.path.join(path, str(frame_index).zfill(filename_length) + ".png")
# Submit only the write_to_image function to the executor, excluding it processing time from the loop
future: Future[bool] = executor.submit(write_to_image, frame, filename)
future.add_done_callback(write_done)
future_to_frame[future] = frame_index # Keep track of which frame the future corresponds to

for future in as_completed(future_to_frame):
frame_index = future_to_frame[future]
try:
if not future.result():
raise Exception(f"Error writing frame {frame_index}")
except Exception as exc:
print(f'Frame {frame_index} generated an exception: {exc}')

capture.release()

frames_path = sorted(glob.glob(os.path.join(glob.escape(path), '*.png')))
return [(int(get_file_name(file_path)), file_path) for file_path in frames_path if is_file(file_path)]

def extract_frame(self, frame_number: int) -> NumberedFrame:
if frame_number > self.fc:
Expand Down
4 changes: 2 additions & 2 deletions sinner/models/State.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ def final_check(self) -> tuple[bool, List[int]]:
if is_file(file_path) and os.path.getsize(file_path) == 0:
zero_sized_files_count += 1
if zero_sized_files_count > 0:
self.update_status(message=f"There is zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD)
self.update_status(message=f"There are zero-sized files in {self.path} temp directory ({zero_sized_files_count} of {processed_frames_count}). Check for free disk space and access rights.", mood=Mood.BAD)
result = False
lost_frames = []
if self.final_check_integrity and not self.is_finished:
lost_frames = self.check_integrity()
if lost_frames:
self.update_status(message=f"There is lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD)
self.update_status(message=f"There are lost frames in the processed sequence: {format_sequences(lost_frames)}", mood=Mood.BAD)
result = False

return result, lost_frames
Expand Down
5 changes: 5 additions & 0 deletions sinner/processors/frame/BaseFrameProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from argparse import Namespace

from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler
from sinner.models.State import State
from sinner.Status import Status
from sinner.validators.AttributeLoader import Rules
Expand All @@ -13,6 +14,7 @@

class BaseFrameProcessor(ABC, Status):
execution_provider: List[str]
self_processing: bool = False

parameters: Namespace

Expand Down Expand Up @@ -57,3 +59,6 @@ def execution_providers(self) -> List[str]:

def configure_output_filename(self, callback: Callable[[str], None]) -> None:
pass

def process(self, handler: BaseFrameHandler, state: State) -> None:
pass
22 changes: 22 additions & 0 deletions sinner/processors/frame/FrameExtractor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
from argparse import Namespace

from tqdm import tqdm

from sinner.handlers.frame.BaseFrameHandler import BaseFrameHandler
from sinner.models.State import State
from sinner.Status import Status
from sinner.typing import Frame
Expand All @@ -10,6 +13,7 @@

class FrameExtractor(BaseFrameProcessor):
emoji: str = '🏃'
self_processing: bool = True

def rules(self) -> Rules:
return [
Expand All @@ -27,3 +31,21 @@ def configure_state(self, state: State) -> None:

def process_frame(self, frame: Frame) -> Frame:
return frame

def process(self, handler: BaseFrameHandler, state: State) -> None:
handler.get_frames_paths(path=state.path, frames_range=(state.processed_frames_count, None))
_, lost_frames = state.final_check()
if lost_frames:
with tqdm(
total=len(lost_frames),
desc="Processing lost frames", unit='frame',
dynamic_ncols=True,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]',
) as progress:
for frame_index in lost_frames:
state.save_temp_frame(handler.extract_frame(frame_index))
progress.update()

is_ok, _ = state.final_check()
if not is_ok:
raise Exception("Something went wrong on processed frames check")

0 comments on commit 881fe5d

Please sign in to comment.