Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cleaning the code removing dead part #25

Merged
merged 13 commits into from
Mar 29, 2024
12 changes: 0 additions & 12 deletions bytetracker/basetrack.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
from collections import OrderedDict

import numpy as np


class TrackState(object):
New = 0
Tracked = 1
Expand All @@ -17,16 +12,9 @@ class BaseTrack(object):
is_activated = False
state = TrackState.New

history = OrderedDict()
features = []
curr_feature = None
score = 0
start_frame = 0
frame_id = 0
time_since_update = 0

# multi-camera
location = (np.inf, np.inf)

@property
def end_frame(self):
Expand Down
23 changes: 1 addition & 22 deletions bytetracker/byte_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(self, tlwh, score, cls):
self.is_activated = False

self.score = score
self.tracklet_len = 0
self.cls = cls

def predict(self):
Expand Down Expand Up @@ -66,7 +65,6 @@ def activate(self, kalman_filter, frame_id):
self.track_id = self.next_id()
self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))

self.tracklet_len = 0
self.state = TrackState.Tracked
if frame_id == 1:
self.is_activated = True
Expand All @@ -78,7 +76,6 @@ def re_activate(self, new_track, frame_id, new_id=False):
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
)
self.tracklet_len = 0
self.state = TrackState.Tracked
self.is_activated = True
self.frame_id = frame_id
Expand All @@ -96,7 +93,6 @@ def update(self, new_track, frame_id):
:return:
"""
self.frame_id = frame_id
self.tracklet_len += 1
self.cls = new_track.cls

new_tlwh = new_track.tlwh
Expand Down Expand Up @@ -142,23 +138,6 @@ def tlwh_to_xyah(tlwh):
ret[2] /= ret[3]
return ret

def to_xyah(self):
return self.tlwh_to_xyah(self.tlwh)

@staticmethod
# @jit(nopython=True)
def tlbr_to_tlwh(tlbr):
ret = np.asarray(tlbr).copy()
ret[2:] -= ret[:2]
return ret

@staticmethod
# @jit(nopython=True)
def tlwh_to_tlbr(tlwh):
ret = np.asarray(tlwh).copy()
ret[2:] += ret[:2]
return ret

def __repr__(self):
return "OT_{}_({}-{})".format(self.track_id, self.start_frame, self.end_frame)

Expand Down Expand Up @@ -262,7 +241,7 @@ def update(self, dets, frame_id):
strack_pool[i] for i in u_track if strack_pool[i].state == TrackState.Tracked
]
dists = matching.iou_distance(r_tracked_stracks, detections_second)
matches, u_track, u_detection_second = matching.linear_assignment(dists, thresh=0.5)
matches, u_track, _ = matching.linear_assignment(dists, thresh=0.5)
for itracked, idet in matches:
track = r_tracked_stracks[itracked]
det = detections_second[idet]
Expand Down
60 changes: 0 additions & 60 deletions bytetracker/kalman_filter.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
import numpy as np
import scipy.linalg

"""
Table for the 0.95 quantile of the chi-square distribution with N degrees of
freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv
function and used as Mahalanobis gating threshold.
"""
chi2inv95 = {
1: 3.8415,
2: 5.9915,
3: 7.8147,
4: 9.4877,
5: 11.070,
6: 12.592,
7: 14.067,
8: 15.507,
9: 16.919,
}


class KalmanFilter(object):
"""
Expand Down Expand Up @@ -229,46 +212,3 @@ def update(self, mean, covariance, measurement):
(kalman_gain, projected_cov, kalman_gain.T)
)
return new_mean, new_covariance

def gating_distance(self, mean, covariance, measurements, only_position=False, metric="maha"):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]

d = measurements - mean
if metric == "gaussian":
return np.sum(d * d, axis=1)
elif metric == "maha":
cholesky_factor = np.linalg.cholesky(covariance)
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True
)
squared_maha = np.sum(z * z, axis=0)
return squared_maha
else:
raise ValueError("invalid distance metric")
119 changes: 1 addition & 118 deletions bytetracker/matching.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,5 @@
import lap
import numpy as np
import scipy
from scipy.spatial.distance import cdist

from bytetracker import kalman_filter


def merge_matches(m1, m2, shape):
O, P, Q = shape
m1 = np.asarray(m1)
m2 = np.asarray(m2)

M1 = scipy.sparse.coo_matrix((np.ones(len(m1)), (m1[:, 0], m1[:, 1])), shape=(O, P))
M2 = scipy.sparse.coo_matrix((np.ones(len(m2)), (m2[:, 0], m2[:, 1])), shape=(P, Q))

mask = M1 * M2
match = mask.nonzero()
match = list(zip(match[0], match[1]))
unmatched_O = tuple(set(range(O)) - set([i for i, j in match]))
unmatched_Q = tuple(set(range(Q)) - set([j for i, j in match]))

return match, unmatched_O, unmatched_Q


def _indices_to_matches(cost_matrix, indices, thresh):
matched_cost = cost_matrix[tuple(zip(*indices))]
matched_mask = matched_cost <= thresh

matches = indices[matched_mask]
unmatched_a = tuple(set(range(cost_matrix.shape[0])) - set(matches[:, 0]))
unmatched_b = tuple(set(range(cost_matrix.shape[1])) - set(matches[:, 1]))

return matches, unmatched_a, unmatched_b


def linear_assignment(cost_matrix, thresh):
Expand All @@ -42,7 +10,7 @@ def linear_assignment(cost_matrix, thresh):
tuple(range(cost_matrix.shape[1])),
)
matches, unmatched_a, unmatched_b = [], [], []
cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
for ix, mx in enumerate(x):
if mx >= 0:
matches.append([ix, mx])
Expand Down Expand Up @@ -95,91 +63,6 @@ def iou_distance(atracks, btracks):
return cost_matrix


def v_iou_distance(atracks, btracks):
"""
Compute cost based on IoU
:type atracks: list[STrack]
:type btracks: list[STrack]

:rtype cost_matrix np.ndarray
"""

if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) or (
len(btracks) > 0 and isinstance(btracks[0], np.ndarray)
):
atlbrs = atracks
btlbrs = btracks
else:
atlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in atracks]
btlbrs = [track.tlwh_to_tlbr(track.pred_bbox) for track in btracks]
_ious = ious(atlbrs, btlbrs)
cost_matrix = 1 - _ious

return cost_matrix


def embedding_distance(tracks, detections, metric="cosine"):
"""
:param tracks: list[STrack]
:param detections: list[BaseTrack]
:param metric:
:return: cost_matrix np.ndarray
"""

cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
if cost_matrix.size == 0:
return cost_matrix
det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float32)
# for i, track in enumerate(tracks):
# cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float32)
cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Nomalized features
return cost_matrix


def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
if cost_matrix.size == 0:
return cost_matrix
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray([det.to_xyah() for det in detections])
for row, track in enumerate(tracks):
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position
)
cost_matrix[row, gating_distance > gating_threshold] = np.inf
return cost_matrix


def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
if cost_matrix.size == 0:
return cost_matrix
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray([det.to_xyah() for det in detections])
for row, track in enumerate(tracks):
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position, metric="maha"
)
cost_matrix[row, gating_distance > gating_threshold] = np.inf
cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
return cost_matrix


def fuse_iou(cost_matrix, tracks, detections):
if cost_matrix.size == 0:
return cost_matrix
reid_sim = 1 - cost_matrix
iou_dist = iou_distance(tracks, detections)
iou_sim = 1 - iou_dist
fuse_sim = reid_sim * (1 + iou_sim) / 2
det_scores = np.array([det.score for det in detections])
det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
# fuse_sim = fuse_sim * (1 + det_scores) / 2
fuse_cost = 1 - fuse_sim
return fuse_cost


def fuse_score(cost_matrix, detections):
if cost_matrix.size == 0:
return cost_matrix
Expand Down
Loading