Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import os.path as osp | |
import warnings | |
import numpy as np | |
import torch | |
from mmdet.core import bbox2result | |
from mmdet.models import BaseDetector | |
class DeployBaseDetector(BaseDetector): | |
"""DeployBaseDetector.""" | |
def __init__(self, class_names, device_id): | |
super(DeployBaseDetector, self).__init__() | |
self.CLASSES = class_names | |
self.device_id = device_id | |
def simple_test(self, img, img_metas, **kwargs): | |
raise NotImplementedError('This method is not implemented.') | |
def aug_test(self, imgs, img_metas, **kwargs): | |
raise NotImplementedError('This method is not implemented.') | |
def extract_feat(self, imgs): | |
raise NotImplementedError('This method is not implemented.') | |
def forward_train(self, imgs, img_metas, **kwargs): | |
raise NotImplementedError('This method is not implemented.') | |
def val_step(self, data, optimizer): | |
raise NotImplementedError('This method is not implemented.') | |
def train_step(self, data, optimizer): | |
raise NotImplementedError('This method is not implemented.') | |
def forward_test(self, *, img, img_metas, **kwargs): | |
raise NotImplementedError('This method is not implemented.') | |
def async_simple_test(self, img, img_metas, **kwargs): | |
raise NotImplementedError('This method is not implemented.') | |
def forward(self, img, img_metas, return_loss=True, **kwargs): | |
outputs = self.forward_test(img, img_metas, **kwargs) | |
batch_dets, batch_labels = outputs[:2] | |
batch_masks = outputs[2] if len(outputs) == 3 else None | |
batch_size = img[0].shape[0] | |
img_metas = img_metas[0] | |
results = [] | |
rescale = kwargs.get('rescale', True) | |
for i in range(batch_size): | |
dets, labels = batch_dets[i], batch_labels[i] | |
if rescale: | |
scale_factor = img_metas[i]['scale_factor'] | |
if isinstance(scale_factor, (list, tuple, np.ndarray)): | |
assert len(scale_factor) == 4 | |
scale_factor = np.array(scale_factor)[None, :] # [1,4] | |
dets[:, :4] /= scale_factor | |
if 'border' in img_metas[i]: | |
# offset pixel of the top-left corners between original image | |
# and padded/enlarged image, 'border' is used when exporting | |
# CornerNet and CentripetalNet to onnx | |
x_off = img_metas[i]['border'][2] | |
y_off = img_metas[i]['border'][0] | |
dets[:, [0, 2]] -= x_off | |
dets[:, [1, 3]] -= y_off | |
dets[:, :4] *= (dets[:, :4] > 0).astype(dets.dtype) | |
dets_results = bbox2result(dets, labels, len(self.CLASSES)) | |
if batch_masks is not None: | |
masks = batch_masks[i] | |
img_h, img_w = img_metas[i]['img_shape'][:2] | |
ori_h, ori_w = img_metas[i]['ori_shape'][:2] | |
masks = masks[:, :img_h, :img_w] | |
if rescale: | |
masks = masks.astype(np.float32) | |
masks = torch.from_numpy(masks) | |
masks = torch.nn.functional.interpolate( | |
masks.unsqueeze(0), size=(ori_h, ori_w)) | |
masks = masks.squeeze(0).detach().numpy() | |
if masks.dtype != bool: | |
masks = masks >= 0.5 | |
segms_results = [[] for _ in range(len(self.CLASSES))] | |
for j in range(len(dets)): | |
segms_results[labels[j]].append(masks[j]) | |
results.append((dets_results, segms_results)) | |
else: | |
results.append(dets_results) | |
return results | |
class ONNXRuntimeDetector(DeployBaseDetector): | |
"""Wrapper for detector's inference with ONNXRuntime.""" | |
def __init__(self, onnx_file, class_names, device_id): | |
super(ONNXRuntimeDetector, self).__init__(class_names, device_id) | |
import onnxruntime as ort | |
# get the custom op path | |
ort_custom_op_path = '' | |
try: | |
from mmcv.ops import get_onnxruntime_op_path | |
ort_custom_op_path = get_onnxruntime_op_path() | |
except (ImportError, ModuleNotFoundError): | |
warnings.warn('If input model has custom op from mmcv, \ | |
you may have to build mmcv with ONNXRuntime from source.') | |
session_options = ort.SessionOptions() | |
# register custom op for onnxruntime | |
if osp.exists(ort_custom_op_path): | |
session_options.register_custom_ops_library(ort_custom_op_path) | |
sess = ort.InferenceSession(onnx_file, session_options) | |
providers = ['CPUExecutionProvider'] | |
options = [{}] | |
is_cuda_available = ort.get_device() == 'GPU' | |
if is_cuda_available: | |
providers.insert(0, 'CUDAExecutionProvider') | |
options.insert(0, {'device_id': device_id}) | |
sess.set_providers(providers, options) | |
self.sess = sess | |
self.io_binding = sess.io_binding() | |
self.output_names = [_.name for _ in sess.get_outputs()] | |
self.is_cuda_available = is_cuda_available | |
def forward_test(self, imgs, img_metas, **kwargs): | |
input_data = imgs[0] | |
# set io binding for inputs/outputs | |
device_type = 'cuda' if self.is_cuda_available else 'cpu' | |
if not self.is_cuda_available: | |
input_data = input_data.cpu() | |
self.io_binding.bind_input( | |
name='input', | |
device_type=device_type, | |
device_id=self.device_id, | |
element_type=np.float32, | |
shape=input_data.shape, | |
buffer_ptr=input_data.data_ptr()) | |
for name in self.output_names: | |
self.io_binding.bind_output(name) | |
# run session to get outputs | |
self.sess.run_with_iobinding(self.io_binding) | |
ort_outputs = self.io_binding.copy_outputs_to_cpu() | |
return ort_outputs | |
class TensorRTDetector(DeployBaseDetector): | |
"""Wrapper for detector's inference with TensorRT.""" | |
def __init__(self, engine_file, class_names, device_id, output_names=None): | |
super(TensorRTDetector, self).__init__(class_names, device_id) | |
warnings.warn('`output_names` is deprecated and will be removed in ' | |
'future releases.') | |
from mmcv.tensorrt import TRTWraper, load_tensorrt_plugin | |
try: | |
load_tensorrt_plugin() | |
except (ImportError, ModuleNotFoundError): | |
warnings.warn('If input model has custom op from mmcv, \ | |
you may have to build mmcv with TensorRT from source.') | |
output_names = ['dets', 'labels'] | |
model = TRTWraper(engine_file, ['input'], output_names) | |
with_masks = False | |
# if TensorRT has totally 4 inputs/outputs, then | |
# the detector should have `mask` output. | |
if len(model.engine) == 4: | |
model.output_names = output_names + ['masks'] | |
with_masks = True | |
self.model = model | |
self.with_masks = with_masks | |
def forward_test(self, imgs, img_metas, **kwargs): | |
input_data = imgs[0].contiguous() | |
with torch.cuda.device(self.device_id), torch.no_grad(): | |
outputs = self.model({'input': input_data}) | |
outputs = [outputs[name] for name in self.model.output_names] | |
outputs = [out.detach().cpu().numpy() for out in outputs] | |
return outputs | |