from __future__ import absolute_import import numpy as np from sklearn.utils.linear_assignment_ import linear_assignment from . import kalman_filter INFTY_COST = 1e+5 def min_cost_matching( distance_metric, max_distance, tracks, detections, track_indices=None, detection_indices=None): if track_indices is None: track_indices = np.arange(len(tracks)) if detection_indices is None: detection_indices = np.arange(len(detections)) if len(detection_indices) == 0 or len(track_indices) == 0: return [], track_indices, detection_indices # Nothing to match. cost_matrix = distance_metric( tracks, detections, track_indices, detection_indices) cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5 indices = linear_assignment(cost_matrix) matches, unmatched_tracks, unmatched_detections = [], [], [] for col, detection_idx in enumerate(detection_indices): if col not in indices[:, 1]: unmatched_detections.append(detection_idx) for row, track_idx in enumerate(track_indices): if row not in indices[:, 0]: unmatched_tracks.append(track_idx) for row, col in indices: track_idx = track_indices[row] detection_idx = detection_indices[col] if cost_matrix[row, col] > max_distance: unmatched_tracks.append(track_idx) unmatched_detections.append(detection_idx) else: matches.append((track_idx, detection_idx)) return matches, unmatched_tracks, unmatched_detections def matching_cascade( distance_metric, max_distance, cascade_depth, tracks, detections, track_indices=None, detection_indices=None): if track_indices is None: track_indices = list(range(len(tracks))) if detection_indices is None: detection_indices = list(range(len(detections))) unmatched_detections = detection_indices matches = [] for level in range(cascade_depth): if len(unmatched_detections) == 0: break track_indices_l = [ k for k in track_indices if tracks[k].time_since_update == 1 + level ] if len(track_indices_l) == 0: continue matches_l, _, unmatched_detections = \ min_cost_matching( distance_metric, max_distance, tracks, detections, track_indices_l, unmatched_detections) matches += matches_l unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches)) return matches, unmatched_tracks, unmatched_detections def gate_cost_matrix( kf, cost_matrix, tracks, detections, track_indices, detection_indices, gated_cost=INFTY_COST, only_position=False): gating_dim = 2 if only_position else 4 gating_threshold = kalman_filter.chi2inv95[gating_dim] measurements = np.asarray( [detections[i].to_xyah() for i in detection_indices]) for row, track_idx in enumerate(track_indices): track = tracks[track_idx] gating_distance = kf.gating_distance( track.mean, track.covariance, measurements, only_position) cost_matrix[row, gating_distance > gating_threshold] = gated_cost # 设置为inf ,距离远的直接扩大到无穷 return cost_matrix