#!/usr/bin/env python
|
# -*- coding: utf-8 -*-
|
# @Time : 2019/10/30 13:40
|
# @Author : Scheaven
|
# @File : ds_tracker.py
|
# @description:
|
from timeit import time
|
import cv2
|
import numpy as np
|
from PIL import Image
|
from lib.deep_sort.tracker import Tracker
|
from lib.deep_sort import preprocessing
|
from lib.deep_sort import nn_matching
|
from lib.deep_sort.detection import Detection
|
from lib.utils.utils import video_open
|
from collections import deque
|
|
pts = [deque(maxlen=30) for _ in range(9999)]
|
np.random.seed(100)
|
COLORS = np.random.randint(0, 255, size=(200, 3),
|
dtype="uint8")
|
|
def human_tracker(yolo, encoder, args):
|
max_cosine_distance = 0.3
|
nn_budget = None
|
nms_max_overlap = 1.0
|
|
metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
|
tracker = Tracker(metric)
|
|
video = video_open(args)
|
video_capture = video.generate_video()
|
w = int(video_capture.get(3))
|
h = int(video_capture.get(4))
|
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
|
out = cv2.VideoWriter('./output/' + '_output.avi', fourcc, 15, (w, h))
|
i = 0
|
fps = 0
|
track_fps = 0
|
while True:
|
ret, frame = video_capture.read()
|
if ret != True:
|
break
|
t1 = time.time()
|
image = Image.fromarray(frame)
|
time3 = time.time()
|
boxs = yolo.detect_image(image)
|
time4 = time.time()
|
print('detect cost is', time4 - time3)
|
features = encoder(frame, boxs)
|
print("features shape: ", features.shape)
|
|
detections = [Detection(bbox, 1.0, feature) for bbox, feature in zip(boxs, features)]
|
|
boxes = np.array([d.tlwh for d in detections])
|
scores = np.array([d.confidence for d in detections])
|
indices = preprocessing.non_max_suppression(boxes, nms_max_overlap, scores)
|
detections = [detections[i] for i in indices]
|
|
print('features extract is', time4 - time3)
|
|
|
|
tracker.predict()
|
tracker.update(detections)
|
|
fps = (fps + (1. / (time.time() - t1))) / 2
|
track_fps = (track_fps + (1. / (time.time() - time4))) / 2
|
print("fps= %f" % (fps))
|
indexIDs = []
|
i = int(0)
|
for track in tracker.tracks:
|
if track.is_confirmed() and track.time_since_update > 1:
|
continue
|
indexIDs.append(int(track.track_id))
|
color = [int(c) for c in COLORS[indexIDs[i] % len(COLORS)]]
|
|
bbox = track.to_tlbr()
|
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (color), 2)
|
# cv2.putText(frame, str(track.track_id), (int(bbox[0]), int(bbox[1])), 0, 5e-3 * 200, (0, 255, 0), 2)
|
# cv2.putText(frame, str(round(fps, 2)), (100, 100), 0, 5e-3 * 300, (0, 0 , 255), 2)
|
# cv2.putText(frame, str(track_fps), (100, 50), 0, 5e-3 * 300, (0, 0 , 255), 2)
|
cv2.putText(frame, str(track.track_id), (int(bbox[0]), int(bbox[1] - 20)), 0, 5e-3 * 150, (color), 2)
|
|
|
# cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (color), 3)
|
# cv2.putText(frame, str(track.track_id), (int(bbox[0]), int(bbox[1] - 50)), 0, 5e-3 * 150, (color), 2)
|
|
# cv2.putText(frame, str(class_names[0]), (int(bbox[0]), int(bbox[1] - 20)), 0, 5e-3 * 150, (color), 2)
|
|
center = (int(((bbox[0]) + (bbox[2])) / 2), int(((bbox[1]) + (bbox[3])) / 2))
|
# track_id[center]
|
i += 1
|
pts[track.track_id].append(center)
|
thickness = 5
|
cv2.circle(frame, (center), 1, color, thickness)
|
|
for j in range(1, len(pts[track.track_id])):
|
if pts[track.track_id][j - 1] is None or pts[track.track_id][j] is None:
|
continue
|
thickness = int(np.sqrt(64 / float(j + 1)) * 2)
|
cv2.line(frame, (pts[track.track_id][j - 1]), (pts[track.track_id][j]), (color), thickness)
|
|
for det in detections:
|
bbox = det.to_tlbr()
|
# cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255, 0, 0), 2)
|
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255, 255, 255), 2)
|
|
# cv2.imshow('', cv2.resize(frame,(854, 480)))
|
out.write(frame)
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
break
|
|
video_capture.release()
|
cv2.destroyAllWindows()
|