RockeyCoss
add code files”
51f6859
raw
history blame
7.47 kB
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import warnings
import numpy as np
import torch
from mmdet.core import bbox2result
from mmdet.models import BaseDetector
class DeployBaseDetector(BaseDetector):
"""DeployBaseDetector."""
def __init__(self, class_names, device_id):
super(DeployBaseDetector, self).__init__()
self.CLASSES = class_names
self.device_id = device_id
def simple_test(self, img, img_metas, **kwargs):
raise NotImplementedError('This method is not implemented.')
def aug_test(self, imgs, img_metas, **kwargs):
raise NotImplementedError('This method is not implemented.')
def extract_feat(self, imgs):
raise NotImplementedError('This method is not implemented.')
def forward_train(self, imgs, img_metas, **kwargs):
raise NotImplementedError('This method is not implemented.')
def val_step(self, data, optimizer):
raise NotImplementedError('This method is not implemented.')
def train_step(self, data, optimizer):
raise NotImplementedError('This method is not implemented.')
def forward_test(self, *, img, img_metas, **kwargs):
raise NotImplementedError('This method is not implemented.')
def async_simple_test(self, img, img_metas, **kwargs):
raise NotImplementedError('This method is not implemented.')
def forward(self, img, img_metas, return_loss=True, **kwargs):
outputs = self.forward_test(img, img_metas, **kwargs)
batch_dets, batch_labels = outputs[:2]
batch_masks = outputs[2] if len(outputs) == 3 else None
batch_size = img[0].shape[0]
img_metas = img_metas[0]
results = []
rescale = kwargs.get('rescale', True)
for i in range(batch_size):
dets, labels = batch_dets[i], batch_labels[i]
if rescale:
scale_factor = img_metas[i]['scale_factor']
if isinstance(scale_factor, (list, tuple, np.ndarray)):
assert len(scale_factor) == 4
scale_factor = np.array(scale_factor)[None, :] # [1,4]
dets[:, :4] /= scale_factor
if 'border' in img_metas[i]:
# offset pixel of the top-left corners between original image
# and padded/enlarged image, 'border' is used when exporting
# CornerNet and CentripetalNet to onnx
x_off = img_metas[i]['border'][2]
y_off = img_metas[i]['border'][0]
dets[:, [0, 2]] -= x_off
dets[:, [1, 3]] -= y_off
dets[:, :4] *= (dets[:, :4] > 0).astype(dets.dtype)
dets_results = bbox2result(dets, labels, len(self.CLASSES))
if batch_masks is not None:
masks = batch_masks[i]
img_h, img_w = img_metas[i]['img_shape'][:2]
ori_h, ori_w = img_metas[i]['ori_shape'][:2]
masks = masks[:, :img_h, :img_w]
if rescale:
masks = masks.astype(np.float32)
masks = torch.from_numpy(masks)
masks = torch.nn.functional.interpolate(
masks.unsqueeze(0), size=(ori_h, ori_w))
masks = masks.squeeze(0).detach().numpy()
if masks.dtype != bool:
masks = masks >= 0.5
segms_results = [[] for _ in range(len(self.CLASSES))]
for j in range(len(dets)):
segms_results[labels[j]].append(masks[j])
results.append((dets_results, segms_results))
else:
results.append(dets_results)
return results
class ONNXRuntimeDetector(DeployBaseDetector):
"""Wrapper for detector's inference with ONNXRuntime."""
def __init__(self, onnx_file, class_names, device_id):
super(ONNXRuntimeDetector, self).__init__(class_names, device_id)
import onnxruntime as ort
# get the custom op path
ort_custom_op_path = ''
try:
from mmcv.ops import get_onnxruntime_op_path
ort_custom_op_path = get_onnxruntime_op_path()
except (ImportError, ModuleNotFoundError):
warnings.warn('If input model has custom op from mmcv, \
you may have to build mmcv with ONNXRuntime from source.')
session_options = ort.SessionOptions()
# register custom op for onnxruntime
if osp.exists(ort_custom_op_path):
session_options.register_custom_ops_library(ort_custom_op_path)
sess = ort.InferenceSession(onnx_file, session_options)
providers = ['CPUExecutionProvider']
options = [{}]
is_cuda_available = ort.get_device() == 'GPU'
if is_cuda_available:
providers.insert(0, 'CUDAExecutionProvider')
options.insert(0, {'device_id': device_id})
sess.set_providers(providers, options)
self.sess = sess
self.io_binding = sess.io_binding()
self.output_names = [_.name for _ in sess.get_outputs()]
self.is_cuda_available = is_cuda_available
def forward_test(self, imgs, img_metas, **kwargs):
input_data = imgs[0]
# set io binding for inputs/outputs
device_type = 'cuda' if self.is_cuda_available else 'cpu'
if not self.is_cuda_available:
input_data = input_data.cpu()
self.io_binding.bind_input(
name='input',
device_type=device_type,
device_id=self.device_id,
element_type=np.float32,
shape=input_data.shape,
buffer_ptr=input_data.data_ptr())
for name in self.output_names:
self.io_binding.bind_output(name)
# run session to get outputs
self.sess.run_with_iobinding(self.io_binding)
ort_outputs = self.io_binding.copy_outputs_to_cpu()
return ort_outputs
class TensorRTDetector(DeployBaseDetector):
"""Wrapper for detector's inference with TensorRT."""
def __init__(self, engine_file, class_names, device_id, output_names=None):
super(TensorRTDetector, self).__init__(class_names, device_id)
warnings.warn('`output_names` is deprecated and will be removed in '
'future releases.')
from mmcv.tensorrt import TRTWraper, load_tensorrt_plugin
try:
load_tensorrt_plugin()
except (ImportError, ModuleNotFoundError):
warnings.warn('If input model has custom op from mmcv, \
you may have to build mmcv with TensorRT from source.')
output_names = ['dets', 'labels']
model = TRTWraper(engine_file, ['input'], output_names)
with_masks = False
# if TensorRT has totally 4 inputs/outputs, then
# the detector should have `mask` output.
if len(model.engine) == 4:
model.output_names = output_names + ['masks']
with_masks = True
self.model = model
self.with_masks = with_masks
def forward_test(self, imgs, img_metas, **kwargs):
input_data = imgs[0].contiguous()
with torch.cuda.device(self.device_id), torch.no_grad():
outputs = self.model({'input': input_data})
outputs = [outputs[name] for name in self.model.output_names]
outputs = [out.detach().cpu().numpy() for out in outputs]
return outputs