# limit the number of cpus used by high performance libraries import os from typing import Dict os.environ["OMP_NUM_THREADS"] = "1" os.environ["OPENBLAS_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" os.environ["VECLIB_MAXIMUM_THREADS"] = "1" os.environ["NUMEXPR_NUM_THREADS"] = "1" import sys import argparse import os from pathlib import Path import json import traceback import numpy as np import torch FILE = Path(__file__).resolve() ROOT = FILE.parents[0] # yolov5 deepsort root directory if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # add ROOT to PATH ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative def detect(tracker, device, source_path, video_map, opt): ( out, show_vid, save_vid, save_txt, imgsz, evaluate, half, project, exist_ok, update, save_crop, ) = ( opt.output, opt.show_vid, opt.save_vid, opt.save_txt, opt.imgsz, opt.evaluate, opt.half, opt.project, opt.exist_ok, opt.update, opt.save_crop, ) from yolov5.utils.general import xyxy2xywh from yolov5.utils.torch_utils import select_device # Initialize device = select_device(device) half &= device.type != "cpu" # half precision only supported on CUDA # initialize deepsort try: transition_data = video_map["clips"] except: print("no transition_data") transition_data = None try: c_box = video_map["content_box"] except: print("no content_box") c_box = None video_detect = json.load(open(source_path, encoding="UTF-8")) face_detections = video_detect["face_detections"] slice_id = 0 for detects in face_detections: frame_idx = detects["frame_idx"] while ( transition_data and (slice_id < len(transition_data)) and (frame_idx >= transition_data[slice_id]["frame_end"]) ): # print(frame_idx, transition_data[slice_id]['frame_end']) tracker.tracker.tracks = [] slice_id += 1 pred = detects["faces"] if pred is not None and len(pred): # Rescale boxes from img_size to im0 size det = [] confs = [] clss = [] features = [] for p in pred: det.append(p["bbox"]) confs.append(float(p["det_score"])) features.append(p["embedding"]) clss.append(0) det = np.array(det) confs = np.array(confs) clss = np.array(clss) features = torch.Tensor(features) xywhs = xyxy2xywh(det) # pass detections to deepsort if c_box: im0 = np.zeros((c_box[3] - c_box[1], c_box[2] - c_box[0])) else: im0 = np.zeros((video_map["height"], video_map["width"])) outputs = tracker.update( xywhs, confs, clss, im0, use_yolo_preds=True, features=features ) assert len(pred) == len(outputs) for j, output in enumerate(outputs): bboxes = output[0:4] id = output[4] min_box_distance = np.inf match_p = None for p in pred: if "trackid" not in p: c_box_distance = abs( bboxes - np.array(p["bbox"], dtype=np.int) ).sum() if c_box_distance < 10 and c_box_distance < min_box_distance: match_p = p min_box_distance = c_box_distance if match_p: match_p["trackid"] = str(id) else: print("not match: ", frame_idx, bboxes) for p in pred: print(p["bbox"]) else: tracker.increment_ages() return video_map class FaceTrackerByYolo5DeepSort(object): def __init__( self, config_file, device, deep_sort_model="osnet_ibn_x1_0_MSMT17", half: bool=False, ) -> None: from deep_sort.utils.parser import get_config from deep_sort.deep_sort import DeepSort cfg = get_config() cfg.merge_from_file(config_file) # Create as tracker self.tracker = DeepSort( deep_sort_model, device, max_dist=cfg.DEEPSORT.MAX_DIST, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE, max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET, ) def __call__(self, args, video_path, video_map, **kwds) -> Dict: """_summary_ Args: args (_type_): _description_ video_path (_type_): _description_ save_path (_type_): _description_ map_path (_type_): _description_ kwds: # parser.add_argument('--yolo_model', nargs='+', type=str, default='yolov5m.pt', help='model.pt path(s)') parser.add_argument('--deep_sort_model', type=str, default='osnet_ibn_x1_0_MSMT17') # parser.add_argument('--source', type=str, default='0', help='source') # file/folder, 0 for webcam '--output', type=str, default='inference/output', help='output folder' ) # output folder '--imgsz', '--img', '--img-size', nargs='+', type=int, default=[640], help='inference size h,w',) '--conf-thres', type=float, default=0.5, help='object confidence threshold') '--iou-thres', type=float, default=0.5, help='IOU threshold for NMS') '--fourcc',type=str,default='mp4v', help='output video codec (verify ffmpeg support)',) '--device', default='', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') '--show-vid', action='store_true', help='display tracking video results') '--save-vid', action='store_true', help='save video tracking results') '--save-txt', action='store_true', help='save MOT compliant results to *.txt') # class 0 is person, 1 is bycicle, 2 is car... 79 is oven '--classes', nargs='+', type=int, help='filter by class: --class 0, or --class 16 17', ) '--agnostic-nms', action='store_true', help='class-agnostic NMS' ) parser.add_argument('--augment', action='store_true', help='augmented inference') parser.add_argument('--update', action='store_true', help='update all models') parser.add_argument('--evaluate', action='store_true', help='augmented inference') parser.add_argument( "--config_deepsort", type=str, default="deep_sort/configs/deep_sort.yaml" ) "--half", action="store_true", help="use FP16 half-precision inference" ) parser.add_argument('--visualize', action='store_true', help='visualize features') '--max-det', type=int, default=1000, help='maximum detection per image' ) '--save-crop', action='store_true', help='save cropped prediction boxes' ) '--dnn', action='store_true', help='use OpenCV DNN for ONNX inference' ) '--project', default=ROOT / 'runs/track', help='save results to project/name' ) parser.add_argument('--name', default='exp', help='save results to project/name') '--exist-ok', action='store_true', help='existing project/name ok, do not increment', ) '-src_path', type=str, default='/innovation_cfs/entertainment/VideoMashup/video_face_moviepy/10fps', ) '-map_path', type=str, default='/innovation_cfs/entertainment/VideoMashup/video_map/transnetv2_duration_frameidx_moviepy', '-overwrite', default=False, action="store_true" ) # whether overwrite the existing results Returns: Dict: _description_ """ video_info = detect(args, self.tracker, video_path, video_map, **kwds) return video_info