-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiou_tracker.py
191 lines (150 loc) · 8.71 KB
/
iou_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# ---------------------------------------------------------
# IOU Tracker
# Copyright (c) 2017 TU Berlin, Communication Systems Group
# Licensed under The MIT License [see LICENSE for details]
# Written by Erik Bochinski
# ---------------------------------------------------------
from time import time
import numpy as np
from pykalman import KalmanFilter
from util import load_mot, iou, interp_tracks
def active_criteria(x, tracks):
"""
Take matching candidate and track, offset the track's last bounding box by the predicted offset, and calculate IOU.
Args:
x (list [roi, bbox, score]): a detection from this frame.
tracks (list [[frames], Kalman_filter]): a track containing all frames and a Kalman filter associated with it.
"""
ofdx, ofdy, _, _ = tracks[0][-1]['pred_state'] - tracks[0][-1]['cur_state']
offset_vector = np.array([ofdy, ofdx, ofdy, ofdx])
offset_roi = tracks[0][-1]['roi'] + offset_vector
th = iou(x['roi'], offset_roi)
return th
def setup_kf(imean, a=[[1, 0, 0.5, 0], [0, 1, 0, 0.5], [0, 0, 1, 0], [0, 0, 0, 1]], o=[[1, 0, 0, 0], [0, 1, 0, 0]]):
"""
Initialize Kalman filter object for each new tracks.
The transfermation matrix (a) and observation matrix (o) can be tuned to better suit with a specific motion pattern,
but to preserve generality we are using a simple constant speed model here.
Args:
imean (2x1 array): 1x2 array or list of the location of centroid.
a (array): transformation matrix that governs state transition to the next time step. Size varies with model.
o (array): observation matrix that defines the observable states. Size varies with model.
"""
return KalmanFilter(transition_matrices=a, observation_matrices=o, initial_state_mean=[*imean,0,0])
def track_iou(detections, sigma_l, sigma_iou, sigma_p, sigma_len, skip_frames=False, n_skip=3):
"""
Simple IOU based tracker with Kalman filter.
This tracker is based on the original IOU Tracker.
See "High-Speed Tracking-by-Detection Without Using Image Information by E. Bochinski, V. Eiselein, T. Sikora" for
more information.
Args:
detections (list): list of detections per frame, usually generated by util.load_mot
sigma_l (float): low detection threshold.
sigma_iou (float): IOU threshold.
sigma_p (int): maximum frames a track remains pending before termination.
sigma_len (int): minimum track length in frames.
skip_frames (boolean): whether to skip some frames to speed up tracking.
n_skip (int): when skip_frames is True, the program uses one out of every n_skip frames for tracking.
Returns:
list: list of tracks.
"""
tracks_active = []
tracks_pending = []
tracks_finished = []
for frame_num, detections_frame in enumerate(detections, start=1):
if skip_frames and (frame_num % n_skip != 0): continue # optionally skip (n_skip - 1) of each (n_skip) frames
# apply low threshold to detections
detections = [det for det in detections_frame if det['score'] >= sigma_l]
updated_tracks = []
for tracks in tracks_active:
if len(detections) > 0:
# get det with highest iou
best_match = max(detections, key=lambda x: active_criteria(x, tracks))
if active_criteria(best_match, tracks) >= sigma_iou:
filtered_state_mean, filtered_state_cov = tracks[1].filter_update(tracks[0][-1]['cur_state'], tracks[0][-1]['cur_covar'], best_match['centroid'])
best_match['cur_state'] = filtered_state_mean
best_match['cur_covar'] = filtered_state_cov
best_match['pred_state'], best_match['pred_covar'] = tracks[1].filter_update(filtered_state_mean, filtered_state_cov)
tracks[0].append(best_match)
updated_tracks.append(tracks)
# remove from best matching detection from detections
del detections[detections.index(best_match)]
# if track was not updated
if len(updated_tracks) == 0 or tracks is not updated_tracks[-1]:
# keep track in tracks_pending, where tracks will be kept for sigma_p frames before track termination
tracks_pending.append(tracks)
tracks_to_keep = []
for tracks in tracks_pending:
if frame_num - tracks[0][-1]['frame'] > sigma_p:
if len(tracks[0]) >= sigma_len: # finish long tracks that have been inactive for more than sigma_p frames
tracks_finished.append(tracks[0])
else:
continue # discard inactive, short tracks
elif len(detections) == 0:
# if track is fresh enough but no detections in this frame are available for matching,
# keep the track pending and extrapolate for one time step
tracks[0][-1]['pred_state'], tracks[0][-1]['pred_covar'] = tracks[1].filter_update(tracks[0][-1]['pred_state'], tracks[0][-1]['pred_covar'])
tracks_to_keep.append(tracks)
else:
# replicating the process in tracks_active
# get det with highest iou
best_match = max(detections, key=lambda x: active_criteria(x, tracks))
if active_criteria(best_match, tracks) >= sigma_iou:
filtered_state_mean, filtered_state_cov = tracks[1].filter_update(tracks[0][-1]['cur_state'], tracks[0][-1]['cur_covar'], best_match['centroid'])
best_match['cur_state'] = filtered_state_mean
best_match['cur_covar'] = filtered_state_cov
best_match['pred_state'], best_match['pred_covar'] = tracks[1].filter_update(filtered_state_mean, filtered_state_cov)
tracks[0].append(best_match)
updated_tracks.append(tracks)
del detections[detections.index(best_match)]
else:
# if the proposed match does not pass the threshold, keep the track pending
# tracks[0][-1]['pred_state'], tracks[0][-1]['pred_covar'] = tracks[1].filter_update(tracks[0][-1]['pred_state'], tracks[0][-1]['pred_covar'])
tracks_to_keep.append(tracks)
# form pending tracks for next frame
tracks_pending = tracks_to_keep
# create new tracks
new_tracks = [[[det], setup_kf(det['centroid'])] for det in detections]
for det in new_tracks:
det[0][0]['cur_state'] = [*det[0][0]['centroid'], 0, 0]
det[0][0]['cur_covar'] = [[100, 0, 25, 0], [0, 100, 0, 25], [0, 0, 25, 0], [0, 0, 0, 25]]
det[0][0]['pred_state'], det[0][0]['pred_covar'] = det[1].filter_update(det[0][0]['cur_state'], det[0][0]['cur_covar'])
tracks_active = updated_tracks + new_tracks
# finish all remaining active tracks
tracks_finished += [track[0] for track in tracks_active if len(track[0]) >= sigma_p]
tracks_finished += [track[0] for track in tracks_pending if len(track[0]) >= sigma_p]
tracks_trimmed = interp_tracks(tracks_finished)
return tracks_trimmed
def track_iou_matlab_wrapper(detections, sigma_l, sigma_iou, sigma_p, sigma_len, skip_frames=False, n_skip=3):
"""
Matlab wrapper of the iou tracker for the detrac evaluation toolkit.
Args:
detections (numpy.array): numpy array of detections, usually supplied by run_tracker.m
sigma_l (float): low detection threshold.
sigma_iou (float): IOU threshold.
sigma_p (int): maximum frames a track remains pending before termination.
sigma_len (int): minimum track length in frames.
skip_frames (boolean): whether to skip some frames to speed up tracking.
n_skip (int): when skip_frames is True, the tracker uses only one out of every n_skip frames for tracking.
Returns:
float: speed in frames per second.
list: list of tracks.
"""
detections = detections.reshape((7, -1)).transpose()
detections = load_mot(detections)
start = time()
tracks = track_iou(detections, sigma_l, sigma_iou, sigma_p, sigma_len, skip_frames, n_skip)
end = time()
id_ = 1
out = []
for track in tracks:
for tracklet in track:
out += [float(tracklet['roi'][1]), float(tracklet['roi'][0]), float(tracklet['roi'][3] - tracklet['roi'][1]), float(tracklet['roi'][2] - tracklet['roi'][0]), float(tracklet['frame']), float(id_)]
id_ += 1
num_frames = len(detections)
# this part occasionally throws ZeroDivisionError when evaluated in the DETRAC toolkit without the except clause
try:
speed = num_frames / (end - start)
except:
speed = num_frames / 0.1
return speed, out