forked from DataSciNAll/vinoobject
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvideoplayer.py
109 lines (94 loc) · 3.3 KB
/
videoplayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import threading
import time
class VideoPlayer:
"""
Custom video player to fulfill FPS requirements. You can set target FPS and output size,
flip the video horizontally or skip first N frames.
:param source: Video source. It could be either camera device or video file.
:param size: Output frame size.
:param flip: Flip source horizontally.
:param fps: Target FPS.
:param skip_first_frames: Skip first N frames.
"""
def __init__(self, source, size=None, flip=False, fps=None, skip_first_frames=0):
import cv2
self.cv2 = cv2 # This is done to access the package in class methods
self.__cap = cv2.VideoCapture(source)
if not self.__cap.isOpened():
raise RuntimeError(
f"Cannot open {'camera' if isinstance(source, int) else ''} {source}"
)
# skip first N frames
self.__cap.set(cv2.CAP_PROP_POS_FRAMES, skip_first_frames)
# fps of input file
self.__input_fps = self.__cap.get(cv2.CAP_PROP_FPS)
if self.__input_fps <= 0:
self.__input_fps = 60
# target fps given by user
self.__output_fps = fps if fps is not None else self.__input_fps
self.__flip = flip
self.__size = None
self.__interpolation = None
if size is not None:
self.__size = size
# AREA better for shrinking, LINEAR better for enlarging
self.__interpolation = (
cv2.INTER_AREA
if size[0] < self.__cap.get(cv2.CAP_PROP_FRAME_WIDTH)
else cv2.INTER_LINEAR
)
# first frame
_, self.__frame = self.__cap.read()
self.__lock = threading.Lock()
self.__thread = None
self.__stop = False
"""
Start playing.
"""
def start(self):
self.__stop = False
self.__thread = threading.Thread(target=self.__run, daemon=True)
self.__thread.start()
"""
Stop playing and release resources.
"""
def stop(self):
self.__stop = True
if self.__thread is not None:
self.__thread.join()
self.__cap.release()
def __run(self):
prev_time = 0
while not self.__stop:
t1 = time.time()
ret, frame = self.__cap.read()
if not ret:
break
# fulfill target fps
if 1 / self.__output_fps < time.time() - prev_time:
prev_time = time.time()
# replace by current frame
with self.__lock:
self.__frame = frame
t2 = time.time()
# time to wait [s] to fulfill input fps
wait_time = 1 / self.__input_fps - (t2 - t1)
# wait until
time.sleep(max(0, wait_time))
self.__frame = None
"""
Get current frame.
"""
def next(self):
import cv2
with self.__lock:
if self.__frame is None:
return None
# need to copy frame, because can be cached and reused if fps is low
frame = self.__frame.copy()
if self.__size is not None:
frame = self.cv2.resize(frame, self.__size, interpolation=self.__interpolation)
if self.__flip:
frame = self.cv2.flip(frame, 1)
return frame