Skip to content

Commit

Permalink
[adaptive_detector] Fix first scene not being checked against min_sce…
Browse files Browse the repository at this point in the history
…ne_len

Refactor detector test cases to make it easier to add new ones.
  • Loading branch information
Breakthrough committed Mar 9, 2024
1 parent 7de8f43 commit c7a5965
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 148 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Video Scene Cut Detection and Analysis Tool

----------------------------------------------------------

### Latest Release: v0.6.3 (March 8, 2024)
### Latest Release: v0.6.3 (March 9, 2024)

**Website**: [scenedetect.com](https://www.scenedetect.com)

Expand Down
2 changes: 1 addition & 1 deletion scenedetect/_cli/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run_scenedetect(context: CliContext):

if context.load_scenes_input:
# Skip detection if load-scenes was used.
logger.info("Loading scenes from file: %s", context.load_scenes_input)
logger.info("Skipping detection, loading scenes from: %s", context.load_scenes_input)
if context.stats_file_path:
logger.warning("WARNING: -s/--stats will be ignored due to load-scenes.")
scene_list, cut_list = _load_scenes(context)
Expand Down
28 changes: 12 additions & 16 deletions scenedetect/detectors/adaptive_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def __init__(
self._adaptive_ratio_key = AdaptiveDetector.ADAPTIVE_RATIO_KEY_TEMPLATE.format(
window_width=window_width, luma_only='' if not luma_only else '_lum')
self._first_frame_num = None
self._last_frame_num = None

# NOTE: This must be different than `self._last_scene_cut` which is used by the base class.
self._last_cut: Optional[int] = None

self._buffer = []
Expand Down Expand Up @@ -131,6 +131,10 @@ def process_frame(self, frame_num: int, frame_img: Optional[np.ndarray]) -> List

super().process_frame(frame_num=frame_num, frame_img=frame_img)

# Initialize last scene cut point at the beginning of the frames of interest.
if self._last_cut is None:
self._last_cut = frame_num

required_frames = 1 + (2 * self.window_width)
self._buffer.append((frame_num, self._frame_score))
if not len(self._buffer) >= required_frames:
Expand All @@ -152,23 +156,15 @@ def process_frame(self, frame_num: int, frame_img: Optional[np.ndarray]) -> List
if self.stats_manager is not None:
self.stats_manager.set_metrics(target[0], {self._adaptive_ratio_key: adaptive_ratio})

cut_list = []
# Check to see if adaptive_ratio exceeds the adaptive_threshold as well as there
# being a large enough content_val to trigger a cut
if (adaptive_ratio >= self.adaptive_threshold and target[1] >= self.min_content_val):

if self._last_cut is None:
# No previously detected cuts
cut_list.append(target[0])
self._last_cut = target[0]
elif (target[0] - self._last_cut) >= self.min_scene_len:
# Respect the min_scene_len parameter
cut_list.append(target[0])
# TODO: Should this be updated every time the threshold is exceeded?
# It might help with flash suppression for example.
self._last_cut = target[0]

return cut_list
threshold_met: bool = (
adaptive_ratio >= self.adaptive_threshold and target[1] >= self.min_content_val)
min_length_met: bool = (frame_num - self._last_cut) >= self.min_scene_len
if threshold_met and min_length_met:
self._last_cut = target[0]
return [target[0]]
return []

def get_content_val(self, frame_num: int) -> Optional[float]:
"""Returns the average content change for a frame."""
Expand Down
2 changes: 1 addition & 1 deletion scenedetect/detectors/content_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def process_frame(self, frame_num: int, frame_img: numpy.ndarray) -> List[int]:

# We consider any frame over the threshold a new scene, but only if
# the minimum scene length has been reached (otherwise it is ignored).
min_length_met = (frame_num - self._last_scene_cut) >= self._min_scene_len
min_length_met: bool = (frame_num - self._last_scene_cut) >= self._min_scene_len
if self._frame_score >= self._threshold and min_length_met:
self._last_scene_cut = frame_num
return [frame_num]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ def test_cli_time_end_of_video():


@pytest.mark.parametrize('detector_command', ALL_DETECTORS)
def test_cli_detector(detector_command: str): #
def test_cli_detector(detector_command: str):
"""Test each detection algorithm."""
# Ensure all detectors work without a statsfile.
# Ensure all detectors work without a statsfile.
assert invoke_scenedetect('-i {VIDEO} time {TIME} {DETECTOR}', DETECTOR=detector_command) == 0


Expand Down
273 changes: 150 additions & 123 deletions tests/test_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,93 +17,161 @@
test case material.
"""

import time
from dataclasses import dataclass
import os
import typing as ty

from scenedetect import detect, SceneManager, FrameTimecode, StatsManager
import pytest

from scenedetect import detect, SceneManager, FrameTimecode, StatsManager, SceneDetector
from scenedetect.detectors import AdaptiveDetector, ContentDetector, ThresholdDetector
from scenedetect.backends.opencv import VideoStreamCv2

# TODO: Test more parameters and add more videos. Parameterize the tests below such that
# a detector instance is combined with the other parameters like ground truth that go along
# with a specific video and detector values. E.g. Use Video-000, Video-001, etc..., and map
# that to a particular filename.

TEST_MOVIE_CLIP_START_FRAMES_ACTUAL = [1199, 1226, 1260, 1281, 1334, 1365, 1590, 1697, 1871]
"""Ground truth of start frame for each fast cut in `test_movie_clip`."""

TEST_VIDEO_FILE_START_FRAMES_ACTUAL = [0, 15, 198, 376]
"""Results for `test_video_file` with default ThresholdDetector values."""

FADES_FLOOR_START_FRAMES = [0, 84, 167, 245]
"""Results for `test_fades_clip` with default ThresholdDetector values."""

FADES_CEILING_START_FRAMES = [0, 42, 125, 209]
"""Results for `test_fades_clip` with ThresholdDetector fade to light with threshold 243."""


def test_detect(test_video_file):
""" Test scenedetect.detect and ThresholdDetector. """
scene_list = detect(video_path=test_video_file, detector=ThresholdDetector())
assert len(scene_list) == len(TEST_VIDEO_FILE_START_FRAMES_ACTUAL)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert all(x == y for (x, y) in zip(TEST_VIDEO_FILE_START_FRAMES_ACTUAL, detected_start_frames))


def test_content_detector(test_movie_clip):
""" Test SceneManager with VideoStreamCv2 and ContentDetector. """
video = VideoStreamCv2(test_movie_clip)
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector())

video_fps = video.frame_rate
start_time = FrameTimecode('00:00:50', video_fps)
end_time = FrameTimecode('00:01:19', video_fps)

video.seek(start_time)
scene_manager.auto_downscale = True

scene_manager.detect_scenes(video=video, end_time=end_time)
scene_list = scene_manager.get_scene_list()
assert len(scene_list) == len(TEST_MOVIE_CLIP_START_FRAMES_ACTUAL)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert TEST_MOVIE_CLIP_START_FRAMES_ACTUAL == detected_start_frames
# Ensure last scene's end timecode matches the end time we set.
assert scene_list[-1][1] == end_time


def test_adaptive_detector(test_movie_clip):
""" Test SceneManager with VideoStreamCv2 and AdaptiveDetector. """
video = VideoStreamCv2(test_movie_clip)
scene_manager = SceneManager()
scene_manager.add_detector(AdaptiveDetector())
scene_manager.auto_downscale = True

video_fps = video.frame_rate
start_time = FrameTimecode('00:00:50', video_fps)
end_time = FrameTimecode('00:01:19', video_fps)

video.seek(start_time)
scene_manager.detect_scenes(video=video, end_time=end_time)

scene_list = scene_manager.get_scene_list()
assert len(scene_list) == len(TEST_MOVIE_CLIP_START_FRAMES_ACTUAL)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert TEST_MOVIE_CLIP_START_FRAMES_ACTUAL == detected_start_frames
# Ensure last scene's end timecode matches the end time we set.
assert scene_list[-1][1] == end_time


def test_threshold_detector(test_video_file):
""" Test SceneManager with VideoStreamCv2 and ThresholdDetector. """
video = VideoStreamCv2(test_video_file)
scene_manager = SceneManager()
scene_manager.add_detector(ThresholdDetector())
scene_manager.auto_downscale = True
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()
assert len(scene_list) == len(TEST_VIDEO_FILE_START_FRAMES_ACTUAL)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert all(x == y for (x, y) in zip(TEST_VIDEO_FILE_START_FRAMES_ACTUAL, detected_start_frames))
# TODO: Reduce code duplication here and in `conftest.py`
def get_absolute_path(relative_path: str) -> str:
""" Returns the absolute path to a (relative) path of a file that
should exist within the tests/ directory.
Throws FileNotFoundError if the file could not be found.
"""
abs_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), relative_path)
if not os.path.exists(abs_path):
raise FileNotFoundError("""
Test video file (%s) must be present to run test case. This file can be obtained by running the following commands from the root of the repository:
git fetch --depth=1 https://github.com/Breakthrough/PySceneDetect.git refs/heads/resources:refs/remotes/origin/resources
git checkout refs/remotes/origin/resources -- tests/resources/
git reset
""" % relative_path)
return abs_path


@dataclass
class TestCase:
"""Properties for detector test cases."""
path: str
"""Path to video for test case."""
detector: SceneDetector
"""Detector instance to use."""
start_time: int
"""Start time as frames."""
end_time: int
"""End time as frames."""
scene_boundaries: ty.List[int]
"""Scene boundaries."""

def detect(self):
"""Run scene detection for test case. Should only be called once."""
return detect(
video_path=self.path,
detector=self.detector,
start_time=self.start_time,
end_time=self.end_time)


def get_fast_cut_test_cases():
"""Fixture for parameterized test cases that detect fast cuts."""
return [
pytest.param(
TestCase(
path=get_absolute_path("resources/goldeneye.mp4"),
detector=ContentDetector(),
start_time=1199,
end_time=1450,
scene_boundaries=[1199, 1226, 1260, 1281, 1334, 1365]),
id="content_default"),
pytest.param(
TestCase(
path=get_absolute_path("resources/goldeneye.mp4"),
detector=AdaptiveDetector(),
start_time=1199,
end_time=1450,
scene_boundaries=[1199, 1226, 1260, 1281, 1334, 1365]),
id="adaptive_default"),
pytest.param(
TestCase(
path=get_absolute_path("resources/goldeneye.mp4"),
detector=ContentDetector(min_scene_len=30),
start_time=1199,
end_time=1450,
scene_boundaries=[1199, 1260, 1334, 1365]),
id="content_min_scene_len"),
pytest.param(
TestCase(
path=get_absolute_path("resources/goldeneye.mp4"),
detector=AdaptiveDetector(min_scene_len=30),
start_time=1199,
end_time=1450,
scene_boundaries=[1199, 1260, 1334, 1365]),
id="adaptive_min_scene_len"),
]


def get_fade_in_out_test_cases():
"""Fixture for parameterized test cases that detect fades."""
# TODO: min_scene_len doesn't seem to be working as intended for ThresholdDetector.
# Possibly related to #278: https://github.com/Breakthrough/PySceneDetect/issues/278
return [
pytest.param(
TestCase(
path=get_absolute_path("resources/testvideo.mp4"),
detector=ThresholdDetector(),
start_time=0,
end_time=500,
scene_boundaries=[0, 15, 198, 376]),
id="threshold_testvideo_default"),
pytest.param(
TestCase(
path=get_absolute_path("resources/fades.mp4"),
detector=ThresholdDetector(),
start_time=0,
end_time=250,
scene_boundaries=[0, 84, 167]),
id="threshold_fades_default"),
pytest.param(
TestCase(
path=get_absolute_path("resources/fades.mp4"),
detector=ThresholdDetector(
threshold=12.0,
method=ThresholdDetector.Method.FLOOR,
add_final_scene=True,
),
start_time=0,
end_time=250,
scene_boundaries=[0, 84, 167, 245]),
id="threshold_fades_floor"),
pytest.param(
TestCase(
path=get_absolute_path("resources/fades.mp4"),
detector=ThresholdDetector(
threshold=243.0,
method=ThresholdDetector.Method.CEILING,
add_final_scene=True,
),
start_time=0,
end_time=250,
scene_boundaries=[0, 42, 125, 209]),
id="threshold_fades_ceil"),
]


@pytest.mark.parametrize("test_case", get_fast_cut_test_cases())
def test_detect_fast_cuts(test_case: TestCase):
scene_list = test_case.detect()
start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert test_case.scene_boundaries == start_frames
assert scene_list[0][0] == test_case.start_time
assert scene_list[-1][1] == test_case.end_time


@pytest.mark.parametrize("test_case", get_fade_in_out_test_cases())
def test_detect_fades(test_case: TestCase):
scene_list = test_case.detect()
start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert test_case.scene_boundaries == start_frames
assert scene_list[0][0] == test_case.start_time
assert scene_list[-1][1] == test_case.end_time


def test_detectors_with_stats(test_video_file):
Expand All @@ -116,10 +184,7 @@ def test_detectors_with_stats(test_video_file):
scene_manager.add_detector(detector())
scene_manager.auto_downscale = True
end_time = FrameTimecode('00:00:08', video.frame_rate)
benchmark_start = time.time()
scene_manager.detect_scenes(video=video, end_time=end_time)
benchmark_end = time.time()
time_no_stats = benchmark_end - benchmark_start
initial_scene_len = len(scene_manager.get_scene_list())
assert initial_scene_len > 0 # test case must have at least one scene!
# Re-analyze using existing stats manager.
Expand All @@ -129,44 +194,6 @@ def test_detectors_with_stats(test_video_file):
video.reset()
scene_manager.auto_downscale = True

benchmark_start = time.time()
scene_manager.detect_scenes(video=video, end_time=end_time)
benchmark_end = time.time()
time_with_stats = benchmark_end - benchmark_start
scene_list = scene_manager.get_scene_list()
assert len(scene_list) == initial_scene_len

print("--------------------------------------------------------------------")
print("StatsManager Benchmark For %s" % (detector.__name__))
print("--------------------------------------------------------------------")
print("No Stats:\t%2.1fs" % time_no_stats)
print("With Stats:\t%2.1fs" % time_with_stats)
print("--------------------------------------------------------------------")


def test_threshold_detector_fade_out(test_fades_clip):
"""Test ThresholdDetector handles fading out to black."""
video = VideoStreamCv2(test_fades_clip)
scene_manager = SceneManager()
scene_manager.add_detector(ThresholdDetector(add_final_scene=True))
scene_manager.auto_downscale = True
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()
assert len(scene_list) == len(FADES_FLOOR_START_FRAMES)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert all(x == y for (x, y) in zip(FADES_FLOOR_START_FRAMES, detected_start_frames))


def test_threshold_detector_fade_in(test_fades_clip):
"""Test ThresholdDetector handles fading in from white."""
video = VideoStreamCv2(test_fades_clip)
scene_manager = SceneManager()
scene_manager.add_detector(
ThresholdDetector(
threshold=243, method=ThresholdDetector.Method.CEILING, add_final_scene=True))
scene_manager.auto_downscale = True
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()
assert len(scene_list) == len(FADES_CEILING_START_FRAMES)
detected_start_frames = [timecode.get_frames() for timecode, _ in scene_list]
assert all(x == y for (x, y) in zip(FADES_CEILING_START_FRAMES, detected_start_frames))
Loading

0 comments on commit c7a5965

Please sign in to comment.