""" ByteTrack Implementation for HarpoonNet Simple and effective multi-object tracking """ import numpy as np from collections import OrderedDict import lap class TrackState: """Track state enumeration""" New = 0 Tracked = 1 Lost = 2 Removed = 3 class BaseTrack: """Base class for tracking objects""" _count = 0 def __init__(self): self.track_id = 0 self.is_activated = False self.state = TrackState.New self.history = OrderedDict() self.features = [] self.curr_feature = None self.score = 0 self.start_frame = 0 self.frame_id = 0 self.time_since_update = 0 self.location = (np.inf, np.inf) @property def end_frame(self): return self.frame_id @staticmethod def next_id(): BaseTrack._count += 1 return BaseTrack._count def activate(self, kalman_filter, frame_id): """Start a new tracklet""" self.track_id = self.next_id() self.frame_id = frame_id self.start_frame = frame_id self.state = TrackState.Tracked self.is_activated = True self.time_since_update = 0 def predict(self): """Predict the next state""" pass def update(self, new_track, frame_id): """Update the track with new detection""" self.frame_id = frame_id self.time_since_update = 0 if self.state == TrackState.New: self.state = TrackState.Tracked def mark_lost(self): """Mark track as lost""" self.state = TrackState.Lost def mark_removed(self): """Mark track as removed""" self.state = TrackState.Removed class STrack(BaseTrack): """Single object track with Kalman filter""" def __init__(self, tlwh, score): super().__init__() # Store tlwh directly self._tlwh = np.asarray(tlwh, dtype=np.float32) self._tlbr = self.tlwh_to_tlbr(self._tlwh) self.score = score self.tracklet_len = 0 self.smooth_feat = None self.curr_feat = None self.alpha = 0.9 # Simple motion model (position + velocity) self.mean = np.zeros(8) # [x, y, w, h, vx, vy, vw, vh] self.covariance = np.eye(8) * 1000 # Initialize state self.mean[:4] = self.tlbr_to_xywh(self._tlbr) @staticmethod def tlwh_to_tlbr(tlwh): """Convert tlwh to tlbr format""" ret = np.asarray(tlwh).copy() ret[2:] += ret[:2] return ret @staticmethod def tlbr_to_tlwh(tlbr): """Convert tlbr to tlwh format""" ret = np.asarray(tlbr).copy() ret[2:] -= ret[:2] return ret @staticmethod def tlbr_to_xywh(tlbr): """Convert tlbr to xywh (center format)""" ret = np.asarray(tlbr).copy() ret[:2] += ret[2:] / 2 return ret @staticmethod def xywh_to_tlbr(xywh): """Convert xywh to tlbr format""" ret = np.asarray(xywh).copy() ret[:2] -= ret[2:] / 2 ret[2:] += ret[:2] return ret def predict(self): """Predict next state using simple motion model""" # Simple constant velocity model self.mean[:4] += self.mean[4:8] # Add process noise self.covariance += np.eye(8) * 0.1 # Update tlbr self._tlbr = self.xywh_to_tlbr(self.mean[:4]) self._tlwh = self.tlbr_to_tlwh(self._tlbr) def update(self, new_track, frame_id): """Update track with new detection""" super().update(new_track, frame_id) self.frame_id = frame_id self.tracklet_len += 1 # Update state new_tlbr = new_track._tlbr new_xywh = self.tlbr_to_xywh(new_tlbr) # Simple Kalman update # Calculate velocity if self.tracklet_len > 1: self.mean[4:8] = new_xywh - self.mean[:4] # Update position self.mean[:4] = new_xywh self._tlbr = new_tlbr self._tlwh = self.tlbr_to_tlwh(new_tlbr) self.score = new_track.score # Reset covariance self.covariance = np.eye(8) * 100 self.state = TrackState.Tracked self.is_activated = True def activate(self, kalman_filter, frame_id): """Activate track""" super().activate(kalman_filter, frame_id) self.tracklet_len = 0 def re_activate(self, new_track, frame_id, new_id=False): """Re-activate lost track""" self.update(new_track, frame_id) self.tracklet_len = 0 self.state = TrackState.Tracked self.is_activated = True if new_id: self.track_id = self.next_id() @property def tlwh(self): """Get tlwh format""" return self._tlwh.copy() @property def tlbr(self): """Get tlbr format""" return self._tlbr.copy() class BYTETracker: """ByteTrack multi-object tracker""" def __init__(self, frame_rate=30, track_thresh=0.5, track_buffer=30, match_thresh=0.8): self.frame_rate = frame_rate self.track_thresh = track_thresh self.track_buffer = track_buffer self.match_thresh = match_thresh self.frame_id = 0 self.tracked_stracks = [] # type: list[STrack] self.lost_stracks = [] # type: list[STrack] self.removed_stracks = [] # type: list[STrack] self.kalman_filter = None # We'll use simple motion model def update(self, detections): """ Update tracker with new detections Args: detections: List of detections in format [x1, y1, x2, y2, score] Returns: List of active tracks """ self.frame_id += 1 activated_stracks = [] refind_stracks = [] lost_stracks = [] removed_stracks = [] if len(detections) > 0: # Convert detections to STrack objects detections = np.array(detections) scores = detections[:, 4] bboxes = detections[:, :4] # Separate high and low confidence detections high_conf_mask = scores >= self.track_thresh low_conf_mask = (scores >= 0.1) & (scores < self.track_thresh) dets_high = bboxes[high_conf_mask] scores_high = scores[high_conf_mask] dets_low = bboxes[low_conf_mask] scores_low = scores[low_conf_mask] # Create STrack objects if len(dets_high) > 0: detections_high = [STrack(self.tlbr_to_tlwh(bbox), score) for bbox, score in zip(dets_high, scores_high)] else: detections_high = [] if len(dets_low) > 0: detections_low = [STrack(self.tlbr_to_tlwh(bbox), score) for bbox, score in zip(dets_low, scores_low)] else: detections_low = [] else: detections_high = [] detections_low = [] # Predict existing tracks for track in self.tracked_stracks: track.predict() # First association with high confidence detections matched, unmatched_dets, unmatched_trks = self.associate( self.tracked_stracks, detections_high, self.match_thresh ) # Update matched tracks for m in matched: track = self.tracked_stracks[m[1]] det = detections_high[m[0]] track.update(det, self.frame_id) activated_stracks.append(track) # Second association with low confidence detections if len(unmatched_dets) > 0 and len(detections_low) > 0: unmatched_tracks = [self.tracked_stracks[i] for i in unmatched_trks] matched_low, unmatched_dets_low, unmatched_trks_low = self.associate( unmatched_tracks, detections_low, 0.5 ) for m in matched_low: track = unmatched_tracks[m[1]] det = detections_low[m[0]] track.update(det, self.frame_id) activated_stracks.append(track) # Update unmatched tracks for i in unmatched_trks_low: track = unmatched_tracks[i] track.mark_lost() lost_stracks.append(track) else: # Mark unmatched tracks as lost for i in unmatched_trks: track = self.tracked_stracks[i] track.mark_lost() lost_stracks.append(track) # Deal with unmatched detections if len(unmatched_dets) > 0: unmatched_detections = [detections_high[i] for i in unmatched_dets] for det in unmatched_detections: det.activate(self.kalman_filter, self.frame_id) activated_stracks.append(det) # Try to re-identify lost tracks for track in self.lost_stracks: if self.frame_id - track.end_frame > self.track_buffer: track.mark_removed() removed_stracks.append(track) # Update track lists self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked] self.tracked_stracks.extend(activated_stracks) self.lost_stracks = [t for t in self.lost_stracks if t.state == TrackState.Lost] self.lost_stracks.extend(lost_stracks) self.removed_stracks.extend(removed_stracks) # Remove old tracks self.removed_stracks = [t for t in self.removed_stracks if self.frame_id - t.end_frame <= self.track_buffer] # Return active tracks return self.tracked_stracks def associate(self, tracks, detections, thresh): """Associate tracks with detections using IoU""" if len(tracks) == 0 or len(detections) == 0: return [], list(range(len(detections))), list(range(len(tracks))) # Calculate IoU matrix iou_matrix = np.zeros((len(detections), len(tracks)), dtype=np.float32) for d, det in enumerate(detections): for t, trk in enumerate(tracks): iou_matrix[d, t] = self.iou(det.tlbr, trk.tlbr) # Use Hungarian algorithm for assignment if min(iou_matrix.shape) > 0: a = (iou_matrix > thresh).astype(np.int32) if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1) else: matched_indices = self.linear_assignment(-iou_matrix) else: matched_indices = np.empty(shape=(0, 2)) unmatched_detections = [] for d, det in enumerate(detections): if d not in matched_indices[:, 0]: unmatched_detections.append(d) unmatched_trackers = [] for t, trk in enumerate(tracks): if t not in matched_indices[:, 1]: unmatched_trackers.append(t) # Filter out matched with low IoU matches = [] for m in matched_indices: if iou_matrix[m[0], m[1]] < thresh: unmatched_detections.append(m[0]) unmatched_trackers.append(m[1]) else: matches.append(m.reshape(1, 2)) if len(matches) == 0: matches = np.empty((0, 2), dtype=int) else: matches = np.concatenate(matches, axis=0) return matches, unmatched_detections, unmatched_trackers def linear_assignment(self, cost_matrix): """Solve linear assignment problem""" try: _, x, y = lap.lapjv(cost_matrix, extend_cost=True) return np.array([[y[i], i] for i in x if i >= 0]) except ImportError: # Fallback to scipy if lap is not available from scipy.optimize import linear_sum_assignment x, y = linear_sum_assignment(cost_matrix) return np.array(list(zip(x, y))) @staticmethod def iou(bbox1, bbox2): """Calculate IoU between two bounding boxes""" x1, y1, x2, y2 = bbox1 x3, y3, x4, y4 = bbox2 # Calculate intersection xx1 = max(x1, x3) yy1 = max(y1, y3) xx2 = min(x2, x4) yy2 = min(y2, y4) if xx2 <= xx1 or yy2 <= yy1: return 0.0 intersection = (xx2 - xx1) * (yy2 - yy1) area1 = (x2 - x1) * (y2 - y1) area2 = (x4 - x3) * (y4 - y3) union = area1 + area2 - intersection return intersection / union if union > 0 else 0.0 @staticmethod def tlbr_to_tlwh(tlbr): """Convert tlbr to tlwh format""" ret = np.asarray(tlbr).copy() ret[2:] -= ret[:2] return ret