Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import os.path as osp | |
import mmcv | |
import numpy as np | |
import pycocotools.mask as maskUtils | |
from mmdet.core import BitmapMasks, PolygonMasks | |
from ..builder import PIPELINES | |
try: | |
from panopticapi.utils import rgb2id | |
except ImportError: | |
rgb2id = None | |
class LoadImageFromFile: | |
"""Load an image from file. | |
Required keys are "img_prefix" and "img_info" (a dict that must contain the | |
key "filename"). Added or updated keys are "filename", "img", "img_shape", | |
"ori_shape" (same as `img_shape`), "pad_shape" (same as `img_shape`), | |
"scale_factor" (1.0) and "img_norm_cfg" (means=0 and stds=1). | |
Args: | |
to_float32 (bool): Whether to convert the loaded image to a float32 | |
numpy array. If set to False, the loaded image is an uint8 array. | |
Defaults to False. | |
color_type (str): The flag argument for :func:`mmcv.imfrombytes`. | |
Defaults to 'color'. | |
file_client_args (dict): Arguments to instantiate a FileClient. | |
See :class:`mmcv.fileio.FileClient` for details. | |
Defaults to ``dict(backend='disk')``. | |
""" | |
def __init__(self, | |
to_float32=False, | |
color_type='color', | |
channel_order='bgr', | |
file_client_args=dict(backend='disk')): | |
self.to_float32 = to_float32 | |
self.color_type = color_type | |
self.channel_order = channel_order | |
self.file_client_args = file_client_args.copy() | |
self.file_client = None | |
def __call__(self, results): | |
"""Call functions to load image and get image meta information. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded image and meta information. | |
""" | |
if self.file_client is None: | |
self.file_client = mmcv.FileClient(**self.file_client_args) | |
if results['img_prefix'] is not None: | |
filename = osp.join(results['img_prefix'], | |
results['img_info']['filename']) | |
else: | |
filename = results['img_info']['filename'] | |
img_bytes = self.file_client.get(filename) | |
img = mmcv.imfrombytes( | |
img_bytes, flag=self.color_type, channel_order=self.channel_order) | |
if self.to_float32: | |
img = img.astype(np.float32) | |
results['filename'] = filename | |
results['ori_filename'] = results['img_info']['filename'] | |
results['img'] = img | |
results['img_shape'] = img.shape | |
results['ori_shape'] = img.shape | |
results['img_fields'] = ['img'] | |
return results | |
def __repr__(self): | |
repr_str = (f'{self.__class__.__name__}(' | |
f'to_float32={self.to_float32}, ' | |
f"color_type='{self.color_type}', " | |
f"channel_order='{self.channel_order}', " | |
f'file_client_args={self.file_client_args})') | |
return repr_str | |
class LoadImageFromWebcam(LoadImageFromFile): | |
"""Load an image from webcam. | |
Similar with :obj:`LoadImageFromFile`, but the image read from webcam is in | |
``results['img']``. | |
""" | |
def __call__(self, results): | |
"""Call functions to add image meta information. | |
Args: | |
results (dict): Result dict with Webcam read image in | |
``results['img']``. | |
Returns: | |
dict: The dict contains loaded image and meta information. | |
""" | |
img = results['img'] | |
if self.to_float32: | |
img = img.astype(np.float32) | |
results['filename'] = None | |
results['ori_filename'] = None | |
results['img'] = img | |
results['img_shape'] = img.shape | |
results['ori_shape'] = img.shape | |
results['img_fields'] = ['img'] | |
return results | |
class LoadMultiChannelImageFromFiles: | |
"""Load multi-channel images from a list of separate channel files. | |
Required keys are "img_prefix" and "img_info" (a dict that must contain the | |
key "filename", which is expected to be a list of filenames). | |
Added or updated keys are "filename", "img", "img_shape", | |
"ori_shape" (same as `img_shape`), "pad_shape" (same as `img_shape`), | |
"scale_factor" (1.0) and "img_norm_cfg" (means=0 and stds=1). | |
Args: | |
to_float32 (bool): Whether to convert the loaded image to a float32 | |
numpy array. If set to False, the loaded image is an uint8 array. | |
Defaults to False. | |
color_type (str): The flag argument for :func:`mmcv.imfrombytes`. | |
Defaults to 'color'. | |
file_client_args (dict): Arguments to instantiate a FileClient. | |
See :class:`mmcv.fileio.FileClient` for details. | |
Defaults to ``dict(backend='disk')``. | |
""" | |
def __init__(self, | |
to_float32=False, | |
color_type='unchanged', | |
file_client_args=dict(backend='disk')): | |
self.to_float32 = to_float32 | |
self.color_type = color_type | |
self.file_client_args = file_client_args.copy() | |
self.file_client = None | |
def __call__(self, results): | |
"""Call functions to load multiple images and get images meta | |
information. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded images and meta information. | |
""" | |
if self.file_client is None: | |
self.file_client = mmcv.FileClient(**self.file_client_args) | |
if results['img_prefix'] is not None: | |
filename = [ | |
osp.join(results['img_prefix'], fname) | |
for fname in results['img_info']['filename'] | |
] | |
else: | |
filename = results['img_info']['filename'] | |
img = [] | |
for name in filename: | |
img_bytes = self.file_client.get(name) | |
img.append(mmcv.imfrombytes(img_bytes, flag=self.color_type)) | |
img = np.stack(img, axis=-1) | |
if self.to_float32: | |
img = img.astype(np.float32) | |
results['filename'] = filename | |
results['ori_filename'] = results['img_info']['filename'] | |
results['img'] = img | |
results['img_shape'] = img.shape | |
results['ori_shape'] = img.shape | |
# Set initial values for default meta_keys | |
results['pad_shape'] = img.shape | |
results['scale_factor'] = 1.0 | |
num_channels = 1 if len(img.shape) < 3 else img.shape[2] | |
results['img_norm_cfg'] = dict( | |
mean=np.zeros(num_channels, dtype=np.float32), | |
std=np.ones(num_channels, dtype=np.float32), | |
to_rgb=False) | |
return results | |
def __repr__(self): | |
repr_str = (f'{self.__class__.__name__}(' | |
f'to_float32={self.to_float32}, ' | |
f"color_type='{self.color_type}', " | |
f'file_client_args={self.file_client_args})') | |
return repr_str | |
class LoadAnnotations: | |
"""Load multiple types of annotations. | |
Args: | |
with_bbox (bool): Whether to parse and load the bbox annotation. | |
Default: True. | |
with_label (bool): Whether to parse and load the label annotation. | |
Default: True. | |
with_mask (bool): Whether to parse and load the mask annotation. | |
Default: False. | |
with_seg (bool): Whether to parse and load the semantic segmentation | |
annotation. Default: False. | |
poly2mask (bool): Whether to convert the instance masks from polygons | |
to bitmaps. Default: True. | |
denorm_bbox (bool): Whether to convert bbox from relative value to | |
absolute value. Only used in OpenImage Dataset. | |
Default: False. | |
file_client_args (dict): Arguments to instantiate a FileClient. | |
See :class:`mmcv.fileio.FileClient` for details. | |
Defaults to ``dict(backend='disk')``. | |
""" | |
def __init__(self, | |
with_bbox=True, | |
with_label=True, | |
with_mask=False, | |
with_seg=False, | |
poly2mask=True, | |
denorm_bbox=False, | |
file_client_args=dict(backend='disk')): | |
self.with_bbox = with_bbox | |
self.with_label = with_label | |
self.with_mask = with_mask | |
self.with_seg = with_seg | |
self.poly2mask = poly2mask | |
self.denorm_bbox = denorm_bbox | |
self.file_client_args = file_client_args.copy() | |
self.file_client = None | |
def _load_bboxes(self, results): | |
"""Private function to load bounding box annotations. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded bounding box annotations. | |
""" | |
ann_info = results['ann_info'] | |
results['gt_bboxes'] = ann_info['bboxes'].copy() | |
if self.denorm_bbox: | |
bbox_num = results['gt_bboxes'].shape[0] | |
if bbox_num != 0: | |
h, w = results['img_shape'][:2] | |
results['gt_bboxes'][:, 0::2] *= w | |
results['gt_bboxes'][:, 1::2] *= h | |
gt_bboxes_ignore = ann_info.get('bboxes_ignore', None) | |
if gt_bboxes_ignore is not None: | |
results['gt_bboxes_ignore'] = gt_bboxes_ignore.copy() | |
results['bbox_fields'].append('gt_bboxes_ignore') | |
results['bbox_fields'].append('gt_bboxes') | |
gt_is_group_ofs = ann_info.get('gt_is_group_ofs', None) | |
if gt_is_group_ofs is not None: | |
results['gt_is_group_ofs'] = gt_is_group_ofs.copy() | |
return results | |
def _load_labels(self, results): | |
"""Private function to load label annotations. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded label annotations. | |
""" | |
results['gt_labels'] = results['ann_info']['labels'].copy() | |
return results | |
def _poly2mask(self, mask_ann, img_h, img_w): | |
"""Private function to convert masks represented with polygon to | |
bitmaps. | |
Args: | |
mask_ann (list | dict): Polygon mask annotation input. | |
img_h (int): The height of output mask. | |
img_w (int): The width of output mask. | |
Returns: | |
numpy.ndarray: The decode bitmap mask of shape (img_h, img_w). | |
""" | |
if isinstance(mask_ann, list): | |
# polygon -- a single object might consist of multiple parts | |
# we merge all parts into one mask rle code | |
rles = maskUtils.frPyObjects(mask_ann, img_h, img_w) | |
rle = maskUtils.merge(rles) | |
elif isinstance(mask_ann['counts'], list): | |
# uncompressed RLE | |
rle = maskUtils.frPyObjects(mask_ann, img_h, img_w) | |
else: | |
# rle | |
rle = mask_ann | |
mask = maskUtils.decode(rle) | |
return mask | |
def process_polygons(self, polygons): | |
"""Convert polygons to list of ndarray and filter invalid polygons. | |
Args: | |
polygons (list[list]): Polygons of one instance. | |
Returns: | |
list[numpy.ndarray]: Processed polygons. | |
""" | |
polygons = [np.array(p) for p in polygons] | |
valid_polygons = [] | |
for polygon in polygons: | |
if len(polygon) % 2 == 0 and len(polygon) >= 6: | |
valid_polygons.append(polygon) | |
return valid_polygons | |
def _load_masks(self, results): | |
"""Private function to load mask annotations. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded mask annotations. | |
If ``self.poly2mask`` is set ``True``, `gt_mask` will contain | |
:obj:`PolygonMasks`. Otherwise, :obj:`BitmapMasks` is used. | |
""" | |
h, w = results['img_info']['height'], results['img_info']['width'] | |
gt_masks = results['ann_info']['masks'] | |
if self.poly2mask: | |
gt_masks = BitmapMasks( | |
[self._poly2mask(mask, h, w) for mask in gt_masks], h, w) | |
else: | |
gt_masks = PolygonMasks( | |
[self.process_polygons(polygons) for polygons in gt_masks], h, | |
w) | |
results['gt_masks'] = gt_masks | |
results['mask_fields'].append('gt_masks') | |
return results | |
def _load_semantic_seg(self, results): | |
"""Private function to load semantic segmentation annotations. | |
Args: | |
results (dict): Result dict from :obj:`dataset`. | |
Returns: | |
dict: The dict contains loaded semantic segmentation annotations. | |
""" | |
if self.file_client is None: | |
self.file_client = mmcv.FileClient(**self.file_client_args) | |
filename = osp.join(results['seg_prefix'], | |
results['ann_info']['seg_map']) | |
img_bytes = self.file_client.get(filename) | |
results['gt_semantic_seg'] = mmcv.imfrombytes( | |
img_bytes, flag='unchanged').squeeze() | |
results['seg_fields'].append('gt_semantic_seg') | |
return results | |
def __call__(self, results): | |
"""Call function to load multiple types annotations. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded bounding box, label, mask and | |
semantic segmentation annotations. | |
""" | |
if self.with_bbox: | |
results = self._load_bboxes(results) | |
if results is None: | |
return None | |
if self.with_label: | |
results = self._load_labels(results) | |
if self.with_mask: | |
results = self._load_masks(results) | |
if self.with_seg: | |
results = self._load_semantic_seg(results) | |
return results | |
def __repr__(self): | |
repr_str = self.__class__.__name__ | |
repr_str += f'(with_bbox={self.with_bbox}, ' | |
repr_str += f'with_label={self.with_label}, ' | |
repr_str += f'with_mask={self.with_mask}, ' | |
repr_str += f'with_seg={self.with_seg}, ' | |
repr_str += f'poly2mask={self.poly2mask}, ' | |
repr_str += f'file_client_args={self.file_client_args})' | |
return repr_str | |
class LoadPanopticAnnotations(LoadAnnotations): | |
"""Load multiple types of panoptic annotations. | |
Args: | |
with_bbox (bool): Whether to parse and load the bbox annotation. | |
Default: True. | |
with_label (bool): Whether to parse and load the label annotation. | |
Default: True. | |
with_mask (bool): Whether to parse and load the mask annotation. | |
Default: True. | |
with_seg (bool): Whether to parse and load the semantic segmentation | |
annotation. Default: True. | |
file_client_args (dict): Arguments to instantiate a FileClient. | |
See :class:`mmcv.fileio.FileClient` for details. | |
Defaults to ``dict(backend='disk')``. | |
""" | |
def __init__(self, | |
with_bbox=True, | |
with_label=True, | |
with_mask=True, | |
with_seg=True, | |
file_client_args=dict(backend='disk')): | |
if rgb2id is None: | |
raise RuntimeError( | |
'panopticapi is not installed, please install it by: ' | |
'pip install git+https://github.com/cocodataset/' | |
'panopticapi.git.') | |
super(LoadPanopticAnnotations, self).__init__( | |
with_bbox=with_bbox, | |
with_label=with_label, | |
with_mask=with_mask, | |
with_seg=with_seg, | |
poly2mask=True, | |
denorm_bbox=False, | |
file_client_args=file_client_args) | |
def _load_masks_and_semantic_segs(self, results): | |
"""Private function to load mask and semantic segmentation annotations. | |
In gt_semantic_seg, the foreground label is from `0` to | |
`num_things - 1`, the background label is from `num_things` to | |
`num_things + num_stuff - 1`, 255 means the ignored label (`VOID`). | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded mask and semantic segmentation | |
annotations. `BitmapMasks` is used for mask annotations. | |
""" | |
if self.file_client is None: | |
self.file_client = mmcv.FileClient(**self.file_client_args) | |
filename = osp.join(results['seg_prefix'], | |
results['ann_info']['seg_map']) | |
img_bytes = self.file_client.get(filename) | |
pan_png = mmcv.imfrombytes( | |
img_bytes, flag='color', channel_order='rgb').squeeze() | |
pan_png = rgb2id(pan_png) | |
gt_masks = [] | |
gt_seg = np.zeros_like(pan_png) + 255 # 255 as ignore | |
for mask_info in results['ann_info']['masks']: | |
mask = (pan_png == mask_info['id']) | |
gt_seg = np.where(mask, mask_info['category'], gt_seg) | |
# The legal thing masks | |
if mask_info.get('is_thing'): | |
gt_masks.append(mask.astype(np.uint8)) | |
if self.with_mask: | |
h, w = results['img_info']['height'], results['img_info']['width'] | |
gt_masks = BitmapMasks(gt_masks, h, w) | |
results['gt_masks'] = gt_masks | |
results['mask_fields'].append('gt_masks') | |
if self.with_seg: | |
results['gt_semantic_seg'] = gt_seg | |
results['seg_fields'].append('gt_semantic_seg') | |
return results | |
def __call__(self, results): | |
"""Call function to load multiple types panoptic annotations. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded bounding box, label, mask and | |
semantic segmentation annotations. | |
""" | |
if self.with_bbox: | |
results = self._load_bboxes(results) | |
if results is None: | |
return None | |
if self.with_label: | |
results = self._load_labels(results) | |
if self.with_mask or self.with_seg: | |
# The tasks completed by '_load_masks' and '_load_semantic_segs' | |
# in LoadAnnotations are merged to one function. | |
results = self._load_masks_and_semantic_segs(results) | |
return results | |
class LoadProposals: | |
"""Load proposal pipeline. | |
Required key is "proposals". Updated keys are "proposals", "bbox_fields". | |
Args: | |
num_max_proposals (int, optional): Maximum number of proposals to load. | |
If not specified, all proposals will be loaded. | |
""" | |
def __init__(self, num_max_proposals=None): | |
self.num_max_proposals = num_max_proposals | |
def __call__(self, results): | |
"""Call function to load proposals from file. | |
Args: | |
results (dict): Result dict from :obj:`mmdet.CustomDataset`. | |
Returns: | |
dict: The dict contains loaded proposal annotations. | |
""" | |
proposals = results['proposals'] | |
if proposals.shape[1] not in (4, 5): | |
raise AssertionError( | |
'proposals should have shapes (n, 4) or (n, 5), ' | |
f'but found {proposals.shape}') | |
proposals = proposals[:, :4] | |
if self.num_max_proposals is not None: | |
proposals = proposals[:self.num_max_proposals] | |
if len(proposals) == 0: | |
proposals = np.array([[0, 0, 0, 0]], dtype=np.float32) | |
results['proposals'] = proposals | |
results['bbox_fields'].append('proposals') | |
return results | |
def __repr__(self): | |
return self.__class__.__name__ + \ | |
f'(num_max_proposals={self.num_max_proposals})' | |
class FilterAnnotations: | |
"""Filter invalid annotations. | |
Args: | |
min_gt_bbox_wh (tuple[float]): Minimum width and height of ground truth | |
boxes. Default: (1., 1.) | |
min_gt_mask_area (int): Minimum foreground area of ground truth masks. | |
Default: 1 | |
by_box (bool): Filter instances with bounding boxes not meeting the | |
min_gt_bbox_wh threshold. Default: True | |
by_mask (bool): Filter instances with masks not meeting | |
min_gt_mask_area threshold. Default: False | |
keep_empty (bool): Whether to return None when it | |
becomes an empty bbox after filtering. Default: True | |
""" | |
def __init__(self, | |
min_gt_bbox_wh=(1., 1.), | |
min_gt_mask_area=1, | |
by_box=True, | |
by_mask=False, | |
keep_empty=True): | |
# TODO: add more filter options | |
assert by_box or by_mask | |
self.min_gt_bbox_wh = min_gt_bbox_wh | |
self.min_gt_mask_area = min_gt_mask_area | |
self.by_box = by_box | |
self.by_mask = by_mask | |
self.keep_empty = keep_empty | |
def __call__(self, results): | |
if self.by_box: | |
assert 'gt_bboxes' in results | |
gt_bboxes = results['gt_bboxes'] | |
instance_num = gt_bboxes.shape[0] | |
if self.by_mask: | |
assert 'gt_masks' in results | |
gt_masks = results['gt_masks'] | |
instance_num = len(gt_masks) | |
if instance_num == 0: | |
return results | |
tests = [] | |
if self.by_box: | |
w = gt_bboxes[:, 2] - gt_bboxes[:, 0] | |
h = gt_bboxes[:, 3] - gt_bboxes[:, 1] | |
tests.append((w > self.min_gt_bbox_wh[0]) | |
& (h > self.min_gt_bbox_wh[1])) | |
if self.by_mask: | |
gt_masks = results['gt_masks'] | |
tests.append(gt_masks.areas >= self.min_gt_mask_area) | |
keep = tests[0] | |
for t in tests[1:]: | |
keep = keep & t | |
keep = keep.nonzero()[0] | |
keys = ('gt_bboxes', 'gt_labels', 'gt_masks') | |
for key in keys: | |
if key in results: | |
results[key] = results[key][keep] | |
if keep.size == 0: | |
if self.keep_empty: | |
return None | |
return results | |
def __repr__(self): | |
return self.__class__.__name__ + \ | |
f'(min_gt_bbox_wh={self.min_gt_bbox_wh},' \ | |
f'min_gt_mask_area={self.min_gt_mask_area},' \ | |
f'by_box={self.by_box},' \ | |
f'by_mask={self.by_mask},' \ | |
f'always_keep={self.always_keep})' | |