import os import cv2 import numpy as np import time from tqdm import tqdm import random # from shapely.geometry import Point, Polygon from numpy.linalg import svd from collections import namedtuple from vis_common import get_logger from typing import Any, Dict, List, Optional, Type, Union logger = get_logger('v_utils') import pdb b = pdb.set_trace IMAGE_EXTS = ['jpg', 'png', 'jpeg', 'JPG', 'PNG', 'JPEG'] PALETTE = [ (0.12156862745098039, 0.4666666666666667, 0.7058823529411765), (0.6823529411764706, 0.7803921568627451, 0.9098039215686274), (1.0, 0.4980392156862745, 0.054901960784313725), (1.0, 0.7333333333333333, 0.47058823529411764), (0.17254901960784313, 0.6274509803921569, 0.17254901960784313), (0.596078431372549, 0.8745098039215686, 0.5411764705882353), (0.8392156862745098, 0.15294117647058825, 0.1568627450980392), (1.0, 0.596078431372549, 0.5882352941176471), (0.5803921568627451, 0.403921568627451, 0.7411764705882353), (0.7725490196078432, 0.6901960784313725, 0.8352941176470589), (0.5490196078431373, 0.33725490196078434, 0.29411764705882354), (0.7686274509803922, 0.611764705882353, 0.5803921568627451), (0.8901960784313725, 0.4666666666666667, 0.7607843137254902), (0.9686274509803922, 0.7137254901960784, 0.8235294117647058), (0.4980392156862745, 0.4980392156862745, 0.4980392156862745), (0.7803921568627451, 0.7803921568627451, 0.7803921568627451), (0.7372549019607844, 0.7411764705882353, 0.13333333333333333), (0.8588235294117647, 0.8588235294117647, 0.5529411764705883), (0.09019607843137255, 0.7450980392156863, 0.8117647058823529), (0.6196078431372549, 0.8549019607843137, 0.8980392156862745), ] def check_file_in_paths(paths, filename): for path in paths: file = os.path.join(path, filename) print(file) if os.path.exists(file): print(file) return True return False def clean_backslash(dir): while dir[-1] == '/': dir = dir[:-1] return dir def odgt2txt(odgt_file, txt_file, image_key='image', segment_key='segment'): import io_utils as io_uts odgt = io_uts.load_odgt(odgt_file) f = open(txt_file, 'w') for item in odgt: string = f"{item[image_key]} {item[segment_key]}\n" f.write(string) f.close() print("done") def single_thresh(args, mark_ignore=True): """ threshold 255, 128, 0 type of label for a binary label """ image_name, label_name, out_label_name = args image = cv2.imread(image_name, cv2.IMREAD_UNCHANGED) mask_org = cv2.imread(label_name, cv2.IMREAD_UNCHANGED) if not (image.shape[0] / image.shape[1] == mask_org.shape[0] / mask_org.shape[1]): # rotate match if mask_org.shape[1] / mask_org.shape[0] == image.shape[0] / image.shape[1]: mask_org = cv2.rotate(mask_org, cv2.cv2.ROTATE_90_CLOCKWISE) print(image_name, label_name, f"shape not match {mask_org.shape} vs {image.shape}") else: print(image_name, label_name, "shape not match even rotation") assert False name = basename(label_name) if mask_org.ndim == 3: mask_org = mask_org[:, :, 0] mask = np.zeros_like(mask_org) mask[mask_org > 172] = 1 if mark_ignore: ignore_region = np.logical_and( mask_org <= 172, mask_org >= 70) mask[ignore_region] = 255 cv2.imwrite(out_label_name, np.uint8(mask)) def find_file_w_exts(filename, exts, w_dot=False): appex = '.' if w_dot else '' for ext in exts: if os.path.exists(f"{filename}{appex}{ext}"): return True, f"{filename}{appex}{ext}" return False, None def seg_folder_to_txt(image_folder, label_folder, root, output_file): exts = ['jpg', 'png', 'jpeg'] image_files = list_all_files(image_folder, exts) f = open(output_file, 'w') for image_file in tqdm(image_files): image_name = basename(image_file) label_file = f"{label_folder}/{image_name}.png" assert os.path.exists(label_file), f"{image_file} {label_file}" image_file = image_file.replace(root, '.') label_file = label_file.replace(root, '.') string = f"{image_file} {label_file}\n" f.write(string) f.close() print("done") def wait_for_file(filename, step=5.0): count = 0.0 while not os.path.exists(): time.sleep(step) count += step time.sleep(step) print(f"found {filename} after {count}s") def get_trimap_by_binary(img, eradius=20, dradius=20): kernel = np.ones((radius, radius),np.uint8) erosion = cv2.erode(img, kernel, iterations = 1) dilation = cv2.dilate(img, kernel, iterations = 1) trimap = img.copy() mask = np.logical_and(dilation > 0, erosion == 0) trimap[mask] = 128 return trimap def get_matting_trimap(segment, eradius = 30, dradius = 30): # find the highest box, dilate segment dilate_ker = np.ones((dradius, dradius), np.uint8) shrink_ker = np.ones((eradius, eradius), np.uint8) segment_out = cv2.dilate(segment, dilate_ker, iterations=1) segment_in = cv2.erode(segment, shrink_ker, iterations=1) segment_image = np.zeros_like(segment, dtype=np.uint8) segment_image[segment_out > 0] = 128 segment_image[segment_in > 0] = 255 return segment_image def get_trimap_by_thresh(): pass def Mat2EulerImage(mat: np.ndarray, Image): channel = 1 if mat.ndim == 2 else mat.shape[-1] return Image( data=mat.tobytes(), rows=mat.shape[0], cols=mat.shape[1], channel=channel ) def EulerImagetoMat(res, channel=1): """ for euler thrift, usually a image is set as struct Image { 1: binary data, // cv::imencode(".png", image), should be bgr image 2: i32 rows, 3: i32 cols, 4: i32 channel } here we transform back """ data = res.data if channel > 1: return np.fromstring(data, dtype=np.uint8).reshape( (res.rows, res.cols, channel)) return np.fromstring(data, dtype=np.uint8).reshape( (res.rows, res.cols)) """ encode the name of an image with chinese """ class NameCoder(): def __init__(self, root_dir): self.root_dir = root_dir def __call__(self, name): import pinyin as py return py.get(name.replace( self.root_dir, '').replace('/', '_').replace(' ', '_'), format='strip') def basename(path): return os.path.splitext(os.path.basename(path))[0] def ext(path): return os.path.splitext(os.path.basename(path))[1][1:] def get_cur_abs_path(some_file): return os.path.dirname(os.path.abspath(some_file)) def list_all_files(directory, exts=None, recursive=True): import glob all_files = [] if exts is None: exts = IMAGE_EXTS for ext in exts: if not recursive: files = glob.glob("%s/*%s" % (directory, ext), recursive=recursive) else: files = glob.glob("%s/**/*%s" % (directory, ext), recursive=recursive) all_files = all_files + files all_files = sorted(all_files) return all_files def list_all_folders(directory): import glob folders = glob.glob(f"{directory}/*/") return folders def list_all(folder, exts=None, recur=False): if exts is None: return list_all_folders(folder) else: return list_all_files(folder, exts, recur) def split_path(folder): blocks = folder.split('/') return [name for name in blocks if name != ''] def dump_image(pred, res_file, score=True, dim='CHW'): if score: dump_prob2image(res_file, pred, dim=dim) else: res_file = res_file + '.png' cv2.imwrite(res_file, np.uint8(pred)) def dump_prob2image(filename, array, dim='CHW'): """ dump probility map to image when array: [x, height, width] (x = 1, 3, 4) """ if dim == 'CHW': array = np.transpose(np.uint8(array * 255), (1, 2, 0)) class_num = array.shape[2] # assert class_num <= 4 if class_num >= 4 : print('warning: only save the first 3 channels') array = array[:, :, :3] if class_num == 2: array = array[:, :, 1] cv2.imwrite(filename + '.png', array) def load_image2prob(filename): if not filename.endswith('.png'): filename = filename + '.png' array = cv2.imread(filename, cv2.IMREAD_UNCHANGED) array = np.transpose(array, (2, 0, 1)) / 255 return array def mask2box(mask): """ t, l, b, r y0, x0, y1, x1 """ y, x = np.where(mask > 0) return [np.min(y), np.min(x), np.max(y), np.max(x)] def dilate_mask(mask, kernel=20): mask = np.uint8(mask) kernel = np.ones((kernel, kernel), np.uint8) mask_out = cv2.dilate(mask, kernel, iterations=1) return mask_out def erode_mask(mask, kernel=20): kernel = np.ones((kernel, kernel), np.uint8) mask_out = cv2.erode(mask, kernel, iterations=1) return mask_out def pack_argument(args, arg_names): """ args: object of all arguments arg_names: list of string name for needed arguments """ kwargs = {} for arg_name in arg_names: cur_args = getattr(args, arg_name) if hasattr(args, arg_name) else None if cur_args: kwargs[arg_name] = cur_args return kwargs def line_segment_cross(seg1, seg2): """ :param seg1: [start, end] :param seg2: [start, end] :return: True if cross, false otherwise """ def ccw(A, B, C): return (C.y - A.y) * (B.x - A.x) > (B.y - A.y) * (C.x - A.x) # Return true if line segments AB and CD intersect def intersect(A, B, C, D): return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D) Point = namedtuple('Point', 'x y') A = Point(seg1[0][0], seg1[0][1]) B = Point(seg1[1][0], seg1[1][1]) C = Point(seg2[0][0], seg2[0][1]) D = Point(seg2[1][0], seg2[1][1]) return intersect(A, B, C, D) def pts_in_line(pts, lines, th=10): """ pts: [x, y] lines: [[x0, y0, x1, y1]] """ count = 0 for line in lines: x, y = pts x0, y0, x1, y1 = line dir0 = np.array([x - x0, y - y0]) dir1 = np.array([x1 - x0, y1 - y0]) diff = min(angle_diff(dir0, dir1), angle_diff(-1 * dir0, dir1)) if diff < th: count += 1 return count def out_of_bound(pt, sz): x, y = pt h, w = sz return x < 0 or y < 0 or x >= w or y >= h def pts_in_mask(pts, mask, allow_out=True): """ pts: n x 2 x, y location return len n mask """ idx = np.zeros(pts.shape[0]) > 0 for i, pt in enumerate(pts): x, y = pt if out_of_bound(pt, mask.shape): continue if mask[y, x] > 0: idx[i] = True return idx def pts_in_poly(pts, poly, sz): """ pts: n x 2 x, y location return len n mask """ mask = np.ones(sz) cv2.fillPoly(mask, pts=[np.int0(poly)], color=(1,)) return pts_in_mask(pts, mask) def line_intersect_pt(lines: np.array, randsac=True): """ lines: n x 4, [s, e] of line return: intersect_pt, is_parallel """ if lines.shape[0] < 2: raise ValueError('not enough line') num = lines.shape[0] line_id0 = 0 max_correct = 2 best_vp = None for line_id0 in range(num): for i in range(num): if i == line_id0: continue lines_cur = lines[[line_id0, i], :] N = 2 p1 = np.column_stack((lines_cur[:, :2], np.ones(N, dtype=np.float32))) p2 = np.column_stack((lines_cur[:, 2:], np.ones(N, dtype=np.float32))) cross_p = np.cross(p1, p2) vp1 = np.cross(cross_p[0], cross_p[1]) if vp1[2] < 1e-5: continue vp1 /= vp1[2] correct = pts_in_line(vp1[:2], lines) if max_correct <= correct: best_vp = vp1[:2] max_correct = correct if best_vp is not None: return best_vp, False return None, True def angle_diff(ba, bc, axis=None): norma = np.linalg.norm(ba, axis=axis) normb = np.linalg.norm(bc, axis=axis) dot_prod = np.sum(ba * bc, axis=axis) cosine_angle = dot_prod / (norma * normb) angle = np.arccos(cosine_angle) * 180.0 / np.pi return angle def on_right_side(rect, sz): # judge whether rect side h, w = sz cx = w // 2 return all([pt[0] >= cx for pt in rect]) def pts_angle(pts): """ pts [3 x 2] """ ba = pts[0] - pts[1] bc = pts[2] - pts[1] angle = angle_diff(ba, bc) return angle def sample_points(mask, num_points=100): # Get the indices where mask values are greater than 0 indices = np.argwhere(mask > 0) # Randomly select num_points indices selected_indices = np.random.choice(indices.shape[0], size=num_points, replace=False) # Get the selected points selected_points = indices[selected_indices] return selected_points def valid_para_ratio(pts, th=5): """ pts: [4 x 2] """ def valid_ratio(ratio): return 1.0 / th < ratio < th ratio0 = line_len(pts[0], pts[1]) / line_len(pts[2], pts[3]) if not valid_ratio(ratio0): return False ratio1 = line_len(pts[1], pts[2]) / line_len(pts[3], pts[0]) if not valid_ratio(ratio1): return False return True def line_len(pt0, pt1): """ pt0, 1: [1x2] """ return np.linalg.norm(pt0 - pt1) def split_list(seq, part): """ split a list to sub lists """ size = len(seq) / part + 1 if part > 0 else 1 size = int(size) return [seq[i:i+size] for i in range(0, len(seq), size)] def find_portion(mask, portion_x, portion_y, th=0): if mask.ndim > 2: raise ValueError(f"mask must be 2 dim, now {mask.ndim}") y, x = np.where(mask > th) x = np.percentile(x, portion_x) y = np.percentile(y, portion_y) return int(x), int(y) def random_split(num, portion=0.1, max_num=1000): """ num: length of list max_num is val num return: train, val list """ val_num = min(portion * num, max_num) val_num = int(val_num) idx = [i for i in range(num)] random.shuffle(idx) return idx[val_num:], idx[:val_num] def shuffle_list(list_in): return random.shuffle(list_in) def pick(lst, idx): return [lst[i] for i in idx] def mkdir_if_need(folder): if not os.path.exists(folder): os.makedirs(folder) def mkdir_if_exists(path, image_name): target_path = os.path.join(path, os.path.dirname(image_name)) if not os.path.exists(target_path): os.makedirs(target_path) def mkdir(folder, image_name=None): if image_name is not None: mkdir_if_exists(folder, image_name) return mkdir_if_need(folder) return folder return folder def save_image_w_pallete(segment, file_name): import PIL.Image as Image pallete = get_pallete(256) segmentation_result = np.uint8(segment) segmentation_result = Image.fromarray(segmentation_result) segmentation_result.putpalette(pallete) segmentation_result.save(file_name) def get_max_size(out_size, max_len): height, width = out_size scale = max(height, width) / max_len if scale > 1: height, width = np.uint32( np.array(out_size) / scale) return height ,width def get_pallete(num_cls): """ this function is to get the colormap for visualizing the segmentation mask :param num_cls: the number of visulized class :return: the pallete """ n = num_cls pallete = [0]*(n*3) for j in range(0,n): lab = j pallete[j*3+0] = 0 pallete[j*3+1] = 0 pallete[j*3+2] = 0 i = 0 while (lab > 0): pallete[j*3+0] |= (((lab >> 0) & 1) << (7-i)) pallete[j*3+1] |= (((lab >> 1) & 1) << (7-i)) pallete[j*3+2] |= (((lab >> 2) & 1) << (7-i)) i = i + 1 lab >>= 3 return pallete def color2label(label_color, color_map=None): """ Convert color image to semantic id based on color_map color_map = {$rgb: $label_id} if color map is None. Then we treat 0 as background and all none zero ids as label id """ # default bkg 255 label_color = np.int32(label_color) height, width = label_color.shape[0:2] label = label_color[:, :, 0] * (255 ** 2) + \ label_color[:, :, 1] * 255 + \ label_color[:, :, 2] label_id = np.unique(label) if color_map is None: for i, id in enumerate(label_id): if id == 0: continue mask = label == id label[mask] = i return label for rgb, i in color_map.items(): cur_num = rgb[0] * (255 ** 2) + rgb[1] * 255 + rgb[2] if cur_num in label_id: mask = (label - cur_num) != 0 label = label * mask + i * (1 - mask) return label def flow2color(flow): assert flow.shape[2] == 2 hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.float32) hsv[...,1] = 255 mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) hsv[...,0] = ang * 180 / np.pi / 2 hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX) rgb = cv2.cvtColor(np.uint8(hsv), cv2.COLOR_HSV2BGR) return hsv, rgb def colorEncode(labelmap, colors, mode='RGB'): labelmap = labelmap.astype('int') labelmap_rgb = np.zeros((labelmap.shape[0], labelmap.shape[1], 3), dtype=np.uint8) for label in np.unique(labelmap): if label < 0: continue labelmap_rgb += (labelmap == label)[:, :, np.newaxis] * \ np.tile(colors[label], (labelmap.shape[0], labelmap.shape[1], 1)) if mode == 'BGR': return labelmap_rgb[:, :, ::-1] else: return labelmap_rgb def drawBoundingbox(image, boxes, colors=None): """ boxes: t, l, b r """ if colors is None: colors = [[255, 255, 0]] * len(boxes) for color, box in zip(colors, boxes): box = box.astype(np.uint32) t, l, b, r = box[0], box[1], box[2], box[3] cv2.rectangle(image, (l, t), (r, b), color, 2) return image def round2stride(length, stride): return (length // stride) * stride def resize_rect(rect, sz_src, sz_tgt): """ :param rect: n x 4 x 2 rectangles :param sz_src: (height, width) :param sz_tgt: :return: """ if len(rect) == 0: return rect height, width = sz_src height_tgt, width_tgt = sz_tgt rect[:, :, 0] = np.int64(rect[:, :, 0] * width_tgt / width) rect[:, :, 1] = np.int64(rect[:, :, 1] * height_tgt / height) return rect def resize_lines(lines, sz_src, sz_tgt): """ :param lines: [n x 4 ] each line [start (x, y), end (x, y)] :param sz_src: :param sz_tgt: :return: """ assert lines.shape[1] == 2 lines = lines.reshape([-1, 2, 2]) lines = resize_rect(lines, sz_src, sz_tgt) lines = lines.reshape([-1, 4]) return lines def resize_LShape(lShapes, sz_src, sz_tgt): """ :param lShapes: [n x 6] :param sz_src: :param sz_tgt: :return: """ assert lShapes.shape[1] == 3 lShapes = lShapes.reshape([-1, 3, 2]) lShapes = resize_rect(lShapes, sz_src, sz_tgt) lShapes = lShapes.reshape([-1, 6]) return lShapes def resize_to_fix_side(image, size=960, fix_type='height'): if fix_type == "height": scale = size / image.shape[0] height, width = size, int(scale * image.shape[1]) elif fix_type == "width": scale = size / image.shape[1] height, width = int(scale * image.shape[0]), size else: raise ValueError("fix type must in [height, widht]") image = cv2.resize(image, (width, height)) return image def resize_like(image, src, side="all", interpolation=None): """ resize image like src """ shape = src.shape[:2] if interpolation is None: interpolation = cv2.INTER_CUBIC if side != "all": size = shape[0] if side == "height" else shape[1] image = resize_to_fix_side(image, size, fix_type=side) return image image = cv2.resize(image, (shape[1], shape[0]), interpolation=interpolation) return image def getmaxsize(shape, size=720, fixSide=False): """ input: [h, w, c] output: [w, h] """ height, width = shape[:2] scale = max(height, width) / size height, width = np.uint32(np.array(shape[:2]) / scale) if fixSide: return (width, height) else: if scale > 1: return (width, height) else: return (shape[1], shape[0]) def resize2size(images, size, interpolations=None): """ :param images: :param size: width height :param interpolations: :return: """ if interpolations is None: interpolations = [cv2.INTER_LINEAR for _ in range(len(images))] for i, (image, interpolation) in enumerate(zip(images, interpolations)): if interpolation is None: interpolation = cv2.INTER_LINEAR if image is None: print(f"{i}_th image is None") image = cv2.resize(image, tuple(size), interpolation=interpolation) images[i] = image return images def resize2maxsize(image, size=720, interpolation=None, fixSide=False): """ Constraint the maximum length of an image Args: fixSide: set image side must be the same as size """ if interpolation is None: interpolation = cv2.INTER_CUBIC image_out = image.copy() height, width = image.shape[:2] scale = max(height, width) / size if image_out.dtype == 'bool': image_out = np.uint8(image_out) height, width = np.uint32(np.array(image.shape[:2]) / scale) if fixSide: image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) else: if scale > 1: image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) if image.dtype == bool: image_out = image_out > 0 return image_out def resize2minsize(image, size=256, interpolation=None): """ Constraint the minimum length of an image """ if size is None: return image if interpolation is None: interpolation = cv2.INTER_CUBIC height, width = image.shape[:2] scale = min(height, width) / size image_out = image.copy() if image_out.dtype == 'bool': image_out = np.uint8(image_out) if scale > 1: height, width = np.uint32(np.array(image.shape[:2]) / scale) image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) if image.dtype == bool: image_out = image_out > 0 return image_out def resize2minsize(image, size=256, interpolation=None): """ Constraint the minimum length of an image """ if interpolation is None: interpolation = cv2.INTER_CUBIC height, width = image.shape[:2] scale = min(height, width) / size image_out = image.copy() if image_out.dtype == 'bool': image_out = np.uint8(image_out) if scale > 1: height, width = np.uint32(np.array(image.shape[:2]) / scale) image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) if image.dtype == bool: image_out = image_out > 0 return image_out def getimgsizeby(sz, size=960, fix_type='max', stride=1): height, width = sz if fix_type == 'min': scale = min(height, width) / size elif fix_type == "max": scale = max(height, width) / size elif fix_type == 'height': scale = height / size elif fix_type == 'width': scale = width / size height, width = np.uint32(np.float32(sz) / scale) if stride > 1: height = round2stride(height, stride) width = round2stride(width, stride) return height, width def resize2fixSize(image, size=960, fix_type='max', interpolation=None): if interpolation is None: interpolation = cv2.INTER_CUBIC height, width = getimgsizeby(image.shape[:2], size, fix_type) image_out = image.copy() if image_out.dtype == 'bool': image_out = np.uint8(image_out) image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) if image.dtype == bool: image_out = image_out > 0 return image_out def resize2range(image, max_size=720, min_size=480, interpolation=None, stride=None): """ Constraint the maximum length of an image and min size of an image if conf """ if interpolation is None: interpolation = cv2.INTER_LINEAR height, width = image.shape[:2] scale_to_max = max_size / max(height, width) scale_to_min = min(min_size / min(height, width), max_size / max(height, width)) image_out = image.copy() if scale_to_max < 1: height, width = np.uint32(np.array(image.shape[:2]) * scale_to_max) if stride is not None: height = round2stride(height, stride) width = round2stride(width, stride) image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) return image_out else: if scale_to_min > 1: height, width = np.uint32(np.array(image.shape[:2]) * scale_to_min) image_out = cv2.resize(image_out, (width, height), interpolation=interpolation) return image_out return image_out def resize2maxshape(image, shape, interpolation=None, with_scale=False, mean_value=0): """ shape is the target video shape resize an image to target shape by padding zeros when ratio is not match """ def get_start_end(scale_id, height_new, width_new): if scale_id == 0: s_v, e_v = 0, height_new s_h = int((shape[1] - width_new) / 2) e_h = s_h + width_new else: s_v = int((shape[0] - height_new) / 2) e_v = s_v + height_new s_h, e_h = 0, width_new return s_v, e_v, s_h, e_h if interpolation is None: interpolation = cv2.INTER_CUBIC shape = list(shape) image_shape = shape if image.ndim == 2 else shape + [image.shape[-1]] image_out = np.zeros(image_shape) + mean_value height, width = image.shape[:2] scale_rate = np.array([shape[0] / height, shape[1] / width]) scale_id = np.argmin(scale_rate) scale = scale_rate[scale_id] image = cv2.resize(image, (int(width * scale), int(height * scale)), interpolation=interpolation) height_new, width_new = image.shape[:2] s_v, e_v, s_h, e_h = get_start_end(scale_id, height_new, width_new) image_out[s_v:e_v, s_h:e_h] = image crop = [s_v, s_h, e_v, e_h] # top, left, bottom, right if not with_scale: return image_out else: return image_out, scale, crop def bilinear_interpolation(x, y, points): '''Interpolate (x,y) from values associated with four points. The four points are a list of four triplets: (x, y, value). The four points can be in any order. They should form a rectangle. >>> bilinear_interpolation(12, 5.5, ... [(10, 4, 100), ... (20, 4, 200), ... (10, 6, 150), ... (20, 6, 300)]) 165.0 ''' # See formula at: http://en.wikipedia.org/wiki/Bilinear_interpolation points = sorted(points) # order points by x, then by y (x1, y1, q11), (_x1, y2, q12), (x2, _y1, q21), (_x2, _y2, q22) = points if x1 != _x1 or x2 != _x2 or y1 != _y1 or y2 != _y2: raise ValueError('points do not form a rectangle') if not x1 <= x <= x2 or not y1 <= y <= y2: raise ValueError('(x, y) not within the rectangle') return (q11 * (x2 - x) * (y2 - y) + q21 * (x - x1) * (y2 - y) + q12 * (x2 - x) * (y - y1) + q22 * (x - x1) * (y - y1) ) / ((x2 - x1) * (y2 - y1) + 0.0) def dump_to_npy(arrays, file_path=None): """ dump set of images to array for local visualization arrays: the input arrays file_path: saving path """ assert isinstance(arrays, dict) for k, v in arrays.items(): np.save(os.path.join(file_path, k + '.npy'), v) def crop(image, box): """ box: t, l, b, r """ t, l, b, r = box return image[t:b, l:r] def padding_image(image_in, image_size, crop=None, interpolation=cv2.INTER_NEAREST, pad_val=0.): """Pad image to target image_size based on a given crop """ assert isinstance(pad_val, float) | isinstance(pad_val, list) if image_size[0] <= image_in.shape[0] and \ image_size[1] <= image_in.shape[1]: return image_in image = image_in.copy() in_dim = np.ndim(image) if in_dim == 2: image = image[:, :, None] if isinstance(pad_val, float): pad_val = [pad_val] * image.shape[-1] assert len(pad_val) == image.shape[-1] dim = image.shape[2] image_pad = np.ones(image_size + [dim], dtype=image_in.dtype) * \ np.array(pad_val) if not (crop is None): h, w = image_size crop_cur = np.uint32([crop[0] * h, crop[1] * w, crop[2] * h, crop[3] * w]) image = cv2.resize( image, (crop_cur[3] - crop_cur[1], crop_cur[2] - crop_cur[0]), interpolation=interpolation) else: h, w = image_in.shape[:2] # default crop is padding center hp, wp = image_pad.shape[:2] t, l = int((hp - h) / 2), int((wp - w) / 2) crop_cur = [t, l, t + h, l + w] image_pad[crop_cur[0]:crop_cur[2], crop_cur[1]:crop_cur[3], :] = image if in_dim == 2: image_pad = np.squeeze(image_pad) return image_pad def enlighting_v2(image, value=30): hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) value = (255 - np.mean(v)) * 0.6 value = int(value) lim = 255 - value v[v > lim] = 255 v[v <= lim] += value final_hsv = cv2.merge((h, s, v)) img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR) return img def enlighting(image): hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) # clahe = cv2.createCLAHE(clipLimit=30, tileGridSize=(8,8)) # v = clahe.apply(v) v = cv2.equalizeHist(v) # v = cv2.add(v, value) # v[v > 255] = 255 # v[v < 0] = 0 final_hsv = cv2.merge((h, s, v)) img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR) return img def white_balance(img): result = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) avg_a = np.average(result[:, :, 1]) avg_b = np.average(result[:, :, 2]) result[:, :, 1] = result[:, :, 1] - ((avg_a - 128) * (result[:, :, 0] / 255.0) * 1.1) result[:, :, 2] = result[:, :, 2] - ((avg_b - 128) * (result[:, :, 0] / 255.0) * 1.1) result = cv2.cvtColor(result, cv2.COLOR_LAB2BGR) return result def one_hot(label_map, class_num): shape = np.array(label_map.shape) length = np.prod(shape) label_one_hot = np.zeros((length, class_num)) label_flat = label_map.flatten() label_one_hot[range(length), label_flat] = 1 label_one_hot = label_one_hot.reshape(shape.tolist() + [class_num]) return label_one_hot def prob2label(label_prob): """Convert probability to a descrete label map """ assert label_prob.ndim == 3 return np.argmax(label_prob, axis=2) """ label_prob: [0, 1] probability map """ def prob2color(label_prob, color_map, bkg_color=[0,0,0]): """ color_map: 0-255 [[x, x, x], ...] python list """ assert isinstance(color_map, list) height, width, dim = label_prob.shape color_map = color_map[:(dim - 1)] color_map_mat = np.matrix([bkg_color] + color_map) label_prob_mat = np.matrix(label_prob.reshape((height * width, dim))) label_color = np.array(label_prob_mat * color_map_mat) label_color = label_color.reshape((height, width, -1)) return np.uint8(label_color) def mix_probimage(prob, image, alpha=0.7): """ prob: [h, w, dim] or [h, w] uint8 """ if prob.ndim == 2: prob = prob[:, :, None] if prob.dtype == 'uint8': prob = np.float32(prob) / 255.0 color_map = get_pallete(256) color_map = np.array(color_map).reshape([-1, 3])[1:, :] color_map = color_map.tolist() prob_color = prob2color(prob, color_map) image = resize_like(image, prob) mix_image = (1 - alpha) * image + alpha * prob_color return mix_image def label2color(label, color_map=None, bkg_color=[0, 0, 0]): if color_map is None: color_map = np.uint8(np.array(PALETTE) * 255) color_map = color_map.tolist() height, width = label.shape[0:2] class_num = len(color_map) + 1 label_one_hot = one_hot(label, class_num) label_color = prob2color(label_one_hot, color_map, bkg_color) return label_color def gif_to_frames(in_path, out_path, max_frame=10000): import imageio gif = imageio.get_reader(in_path, '.gif') # Here's the number you're looking for for frame_id, frame in tqdm(enumerate(gif)): filename = '%s/%04d.png'% (out_path, frame_id) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) cv2.imwrite(filename, frame) if frame_id > max_frame: break print('finished') def speedx_video(video_in, video_out, speed): import moviepy.editor as me import moviepy clip = me.VideoFileClip(video_in) clip = moviepy.video.fx.all.speedx(clip, factor=speedx) clip.write_videofile(video_out) def resize_boxes(boxes, image_shape): """ boxes: n x 4 [t, l, b, r] image_shape: height, width """ if len(boxes) == 0: return boxes boxes = np.array(boxes) boxes[:, [0, 2]] *= image_shape[0] boxes[:, [1, 3]] *= image_shape[1] return boxes def lens_blur(img, depth_in, fg_depth, fg_mask=None, NUM_LAYERS = 20): def layer_mask(dm, s, e): # copy image dimensions, but fill with zeros m = np.zeros(dm.shape) # set values above start threshold to white m[dm >= s] = 1 # set values above end threshold to black m[dm > e] = 0 return m def to_multi_mask(mask, ch=3): return np.tile(mask[:, :, None] > 0, (1, 1, ch)) depth = depth_in.copy() out = np.zeros(img.shape) min_depth = np.min(np.unique(depth)) max_depth = np.max(np.unique(depth)) min_depth = int(min_depth / max_depth * 255) fg_depth = int(fg_depth / max_depth * 255) depth = np.uint8(depth * 255 / max_depth) s = (255 - min_depth) // NUM_LAYERS layers = np.array(range(min_depth, 255, s)) for i, a in enumerate(layers[:-1]): if layers[i] < fg_depth and layers[i+1] > fg_depth: fg_depth = layers[i] break for a in layers: l_mask = layer_mask(depth, a, a+s) l_mask = to_multi_mask(l_mask) res = blur_filter(img, np.abs(a - fg_depth)) out[l_mask] = res[l_mask] if fg_mask is not None: fg_mask = np.tile(fg_mask[:, :, None] > 0, (1, 1, 3)) out[fg_mask] = img[fg_mask] return out ############################################### ### Filters ############################################### # Change blur by epsilon value (a) def blur_filter(img, a): # increase kernel effect slowly, must be odd k = (a // 10) + 1 if (a // 10) % 2 == 0 else (a // 10) + 2 # can't exceed 255 k = k if k < 255 else 255 kernel = (k, k) # blur filter o = cv2.GaussianBlur(img, kernel, 9) return o def box_center(box): """ boxes: n x 4 [t, l, b, r] """ return (box[1] + box[3]) // 2, (box[0] + box[2]) // 2 def mean_value(value, mask): """ mean value inside mat """ if value.ndim == 2: value = value[:, :, None] h, w, dim = value.shape test = value.reshape([-1, dim]) mean = np.mean(test[mask.flatten(), :], axis=0) return mean def is_neighbor_mask(mask0, mask1, min_len=200, kernel=10): # at least 200 pixel connecting edge mask = dilate_mask(mask1, kernel=kernel) intern = np.sum(np.logical_and(mask0 > 0, mask > 0)) return intern > min_len * kernel def get_salient_components(segment_in, th=0.1, min_th=25): """ :param segment_in: 0, 1 mask :param th: :return: """ segment = segment_in.copy() area_org = np.sum(segment) segment = np.uint8(segment_in * 255) ret, labels = cv2.connectedComponents(segment) if ret == 2: return [segment_in] masks = [] for i in range(1, ret): mask = labels == i area = np.sum(mask) if area < area_org * th : continue if area < min_th: continue masks.append(mask) return masks def get_component(segment, criteria='max'): """ find the largest connected component mask """ ret, labels = cv2.connectedComponents(segment) if ret == 2: return segment max_area = 0 idx = 1 for i in range(1, ret): area = np.sum(labels == i) if area > max_area: max_area = area idx = i return np.uint8(255 * (labels == idx)) def find_largest_mask(segment, ignore_ids=None): """ find the largest mask inside component """ if ignore_ids is None: ignore_ids = [] ids = np.unique(segment) max_area = 0 idx = 1 for i in ids: if i in ignore_ids: continue area = np.sum(segment == i) if area > max_area: max_area = area idx = i return idx, segment == idx def find_center_mask(segment, ignore_ids, box = None): h, w = segment.shape if box is None: box = [int(h / 4), int(w / 4), int(h * 3 / 4), int(w * 3 / 4)] idx, _ = find_largest_mask( segment[box[0]:box[2], box[1]:box[3]], ignore_ids) return idx, segment == idx def get_largest_component(segment_in, criteria='max'): segment = segment_in.copy() thresh = 0.3 segment = np.uint8(255 * (np.float32(segment) / 255.0 > thresh)) ret, labels = cv2.connectedComponents(segment) if ret == 2: return segment_in max_area = 0 idx = 1 for i in range(1, ret): area = np.sum(labels == i) if area > max_area: max_area = area idx = i mask = dilate_mask(np.uint8(labels == idx)) segment = segment_in * mask return np.uint8(segment) def fillholes(mask): """ binary mask """ des = np.uint8(mask > 0) * 255 contour, hier = cv2.findContours(des,cv2.RETR_CCOMP,cv2.CHAIN_APPROX_SIMPLE) # des = cv2.merge([des, des, des]) # cv2.drawContours(des, contour, -1, (0, 255, 0), 3) for i, cnt in enumerate(contour): cv2.drawContours(des, [cnt], -1, 255, -1) # mask = des == 0 return des > 0 def video_to_frames(in_path, out_path, max_frame=100000): """separate video to frames """ print("saving videos to frames at {}".format(out_path)) cap = cv2.VideoCapture(in_path) frame_id = 0 mkdir_if_need(out_path) # cv2.namedWindow("video") while(cap.isOpened()): ret, frame = cap.read() if not ret: break filename = out_path + '/%04d.jpg' % frame_id cv2.imwrite(filename, frame) frame_id += 1 if frame_id > max_frame: break cap.release() print("finished") def resize_video(in_path, out_path, sz, max_frame=10000): """separate video to frames Args: sz: height, width of new video """ from moviepy.editor import ImageSequenceClip, VideoFileClip print("resize videos to vidoe at {}".format(out_path)) new_height, new_width = sz assert os.path.exists(in_path), f"must exist {in_path}" cap = cv2.VideoCapture(in_path) fps = cap.get(cv2.CAP_PROP_FPS) progress_bar = tqdm(total=max_frame) progress_bar.set_description('Progress') frame_id = 0 frames = [] while(cap.isOpened()): ret, frame = cap.read() if not ret: break frame = cv2.resize(frame, (new_width, new_height)) frames.append(frame[:, :, ::-1]) frame_id += 1 progress_bar.update(frame_id) if frame_id > max_frame: break clip = ImageSequenceClip(frames, fps) clip.write_videofile(out_path, fps=fps) cap.release() print("finished") def frame_to_video_simple(frames, fps=10, video_name='video.avi', reader=cv2.IMREAD_UNCHANGED): """ Combine frames to video image_path: path of images """ import sys if video_name.endswith('.avi'): fourcc = cv2.VideoWriter_fourcc(*'XVID') elif video_name.endswith('.mp4'): fourcc = cv2.VideoWriter_fourcc(*'MP4V') is_str = False if isinstance(frames[0], str): frame = cv2.imread(frames[0], cv2.IMREAD_UNCHANGED) is_str = True else: frame = frames[0] sz = frame.shape[:2] video = cv2.VideoWriter(video_name, fourcc, fps, (sz[1], sz[0])) for i, frame in enumerate(tqdm(frames)): sys.stdout.write('\r>>process %04d / %04d' % (i, len(frames))) sys.stdout.flush() if is_str: frame = cv2.imread(frame, reader) video.write(frame) cv2.destroyAllWindows() video.release() print('save to %s' % video_name) def frame_to_video(image_path, label_path, frame_list, label_ext='', label_map_is_color=False, color_map=None, sz=None, fps=10, alpha=0.5, video_name='video.avi', exts=["jpg", "png"], is_probability=False): """ Combine frames to video to visualize image & label image image_path: path of images exts: 1st is """ def to_color_map(label): assert color_map is not None bkg = [255, 255, 255] if is_probability: if label.ndim == 2: label = np.float32(label) / 255 label = np.concatenate( [1 - label[:, :, None], label[:, :, None]], axis=2) label = prob2color(label, color_map, bkg_color=bkg) else: label[label > len(color_map)] = 0 label = label2color(label, color_map, bkg) return label[:, :, ::-1] import sys ext_image, ext_label = exts if sz is None: label = cv2.imread(f"{label_path}/{frame_list[0]}.{ext_label}", cv2.IMREAD_UNCHANGED) sz = label.shape[:2] if video_name.endswith('.avi'): fourcc = cv2.VideoWriter_fourcc(*'XVID') elif video_name.endswith('.mp4'): fourcc = cv2.VideoWriter_fourcc(*'MP4V') video = cv2.VideoWriter(video_name, fourcc, fps, (sz[1], sz[0])) for i, image_name in enumerate(frame_list): sys.stdout.write('\r>>process %04d / %04d' % (i, len(frame_list))) sys.stdout.flush() image = cv2.resize( cv2.imread(f"{image_path}/{image_name}.jpg", cv2.IMREAD_COLOR), (sz[1], sz[0])) label_name = image_name + label_ext label = cv2.resize(cv2.imread(f"{label_path}/{label_name}.{ext_label}", cv2.IMREAD_UNCHANGED), (sz[1], sz[0]), interpolation=cv2.INTER_NEAREST) if not label_map_is_color: label = to_color_map(label) frame = np.uint8(image * alpha + label * (1 - alpha)) video.write(frame) cv2.destroyAllWindows() video.release() print('save to %s' % video_name) def video_to_frame(video_path, image_folder_path=None, sample_rate=1, max_len=None, holder=None, ext="jpg"): """ holder: the holder of image list """ if image_folder_path is not None: mkdir_if_need(image_folder_path) if video_path.split('.')[-1] == 'gif': gif_to_frames(video_path, image_folder_path) return vidcap = cv2.VideoCapture(video_path) success, image = vidcap.read() assert success, video_path sz = image.shape[:2] count = 0 while success: if count % sample_rate == 0: image_path = f'{image_folder_path}/{count:04}.{ext}' if max_len is not None: image = resize2maxsize(image, max_len) # height, width = image.shape[:2] # length = int(height / 2) # image = image[:length, :, :] if image_folder_path is not None: cv2.imwrite(image_path, image) # save frame as JPEG file if holder is not None: holder.append(image) success, image = vidcap.read() count += 1 print('success split %s' % video_path) fps = vidcap.get(cv2.CAP_PROP_FPS) return fps, sz def box_intersect(box0, box1): # top, left, bottom, right box = [max(box0[0], box1[0]), max(box0[1], box1[1]), min(box0[2], box1[2]), min(box0[3], box1[3])] return box def timefunc(f): def f_timer(*args, **kwargs): start = time.time() result = f(*args, **kwargs) end = time.time() logger.debug(f.__name__, 'took', end - start, 'second') return result return f_timer def test_one_hot(): label = np.array([[1, 2], [3, 4]]) label_one_hot = one_hot(label, 5) print(label_one_hot) def test_resize2range(): test = np.ones([100, 200]) test2 = resize2range(test, 200, 50) print(test2.shape) def test_prob2image(): test = np.random.random_sample((3, 10, 10)) dump_prob2image('test', test) res = load_image2prob('test') np.testing.assert_allclose(test, res, rtol=0.5, atol=1e-02) def shape_match(images): assert len(images) > 1 shape = images[0].shape[:2] for image in images[1:]: cur_shape = image.shape[:2] if np.sum(np.abs(np.array(shape) - \ np.array(cur_shape))): return False return True def append_apex(filename, appex): filename = filename.split('.') prefix = '.'.join(filename[:-1]) filetype = filename[-1] return '%s_%s.%s' % (prefix, appex, filetype) def get_obj_center(mask, th=0): """ mask: 0 """ y, x = np.where(mask > th) if len(y) == 0: return -1 , -1 x, y = np.mean(x), np.mean(y) return int(x), int(y) def poly_area(poly): """ Args: poly: [n x 2] np.array [x, y] """ return PolyArea(poly[:, 0], poly[:, 1]) def PolyArea(x, y): return 0.5*np.abs(np.dot(x, np.roll(y, 1))-np.dot(y, np.roll(x,1))) def rect_size(rect): return np.linalg.norm(rect[0, :] - rect[2, :]) def avg_size(rects, option='median'): sizes = np.zeros(len(rects)) for i, rect in enumerate(rects): sizes[i] = rect_size(rect) if option == 'median': return np.median(sizes) if option == 'mean': return np.mean(sizes) return None def poly_ratio(rect, type='min'): if type == 'avg': l1 = np.linalg.norm(rect[0, :] - rect[1, :]) l2 = np.linalg.norm(rect[1, :] - rect[2, :]) l3 = np.linalg.norm(rect[2, :] - rect[3, :]) l4 = np.linalg.norm(rect[3, :] - rect[0, :]) return (l1 + l3) / (l2 + l4) ratio = 0 for i in range(4): s = i t = (i + 1) % 4 e = (i + 2) % 4 l1 = np.linalg.norm(rect[s, :] - rect[t, :]) l2 = np.linalg.norm(rect[t, :] - rect[e, :]) cur_ratio = max(l1 / (l2 + 1e-10), l2 / (l1 + 1e-10)) if cur_ratio > ratio: ratio = cur_ratio return ratio def rect_ratio(rect): """ x / y :param rect: :return: """ x_diff = np.max(rect[:, 0]) - np.min(rect[:, 0]) y_diff = np.max(rect[:, 1]) - np.min(rect[:, 1]) return max(x_diff / y_diff, y_diff / x_diff) def rect_in_size(rect, image_sz, num_th=4): """rectangle inside image """ h, w = image_sz def pt_in_size(pt): return 0 <= pt[0] < w and 0 <= pt[1] < h valid = [False for i in range(rect.shape[0])] for i, pt in enumerate(rect): if pt_in_size(pt): valid[i] = True return np.sum(valid) >= num_th def valid_rect(rect): l, r, t, b = rect return l < r and t < b def compute_normal_deg_absvar(normal, mask): normal_cur = normal * mask[:, :, None] mean_normal = np.sum(normal_cur, axis=(0, 1)) / np.sum(mask) inner = np.sum(mean_normal[None, None, :] * normal_cur, axis=2) s = np.clip(np.abs(inner), 0, 1) diff = np.rad2deg(np.arccos(s)) var = np.sum(diff * mask) / np.sum(mask) return var def compute_ignore_mask(x, ignore_value=None): mask = 1 if ignore_value is None: return mask dim = x.ndim if x.ndim == 2: x = x[:, :, None] if not isinstance(ignore_value, list): ignore_value = [ignore_value] * x.shape[-1] for i, value in enumerate(ignore_value): cur_mask = x[:, :, i] == value mask = mask * cur_mask if dim == 2: x = x.squeeze(-1) return mask def weight_reduce(res, weights): """ """ dim = res[0].ndim result = 0 weight_all = 0 for i, x in enumerate(res): if dim == 2: x = x[:, :, None] weight = weights[i] result = result + (x * weight[:, :, None]) weight_all = weight_all + weight if dim == 2: result = result.squeeze(-1) return result / np.maximum(weight_all[:, :, None], 1e-6) def mask_assign(x, mask, target): dim = x.ndim if dim == 2: x = x[:, :, None] for i in range(x.shape[-1]): cache = x[:, :, i] cache_tgt = target[:, :, i] cache[mask] = cache_tgt[mask] x[:, :, i] = cache if dim == 2: x = x.squeeze(-1) return x def overlap_poly(poly0, poly1, mask=None): sz = None if mask is None: h = max(np.max(poly0[:, 1]), np.max(poly1[:, 1])) w = max(np.max(poly0[:, 0]), np.max(poly1[:, 0])) sz = [h + 1, w + 1] else: sz = mask.shape[:2] vis_map0 = np.zeros(sz) cv2.fillPoly(vis_map0, pts=[np.int0(poly0)], color=(1,)) vis_map1 = np.zeros(sz) cv2.fillPoly(vis_map1, pts=[np.int0(poly1)], color=(1,)) inter_area = np.sum(vis_map0 * vis_map1), return inter_area, inter_area / np.sum(vis_map0), inter_area / np.sum(vis_map1) def overlap_rect_mask(rect, mask): """ ratio that mask is in rectangle """ vis_map = np.zeros(mask.shape) cv2.fillPoly(vis_map, pts=[np.int0(rect)], color=(1,)) overlap = np.sum(np.int32(mask > 0) * np.int32(vis_map > 0)) ratio = overlap / np.sum(vis_map > 0) return ratio def pt_in_poly(pt, poly): """ poly: list of pt """ from shapely.geometry import Point from shapely.geometry.polygon import Polygon point = Point(pt[0], pt[1]) polygon = Polygon(poly) return polygon.contains(point) def pt_in_poly_w_mask(pt, poly, sz, margin=None): """ margin: ratio of area for expand """ mask = np.zeros(np.int0(sz)) cv2.fillPoly(mask, pts=[np.int0(poly)], color=(255,)) if margin is not None: rectArea = PolyArea(poly[:, 0], poly[:, 1]) pixel = np.int0(margin * np.sqrt(rectArea)) mask = dilate_mask(mask, pixel) pt = np.int0(pt) return mask[pt[1], pt[0]] > 0 def is_overlap(r_cur, r_over, ths=None): """ whether two rects are overlapping r_cur: [l, r, t, b] """ if ths is None: ths = [0, 0] w_th, h_th = ths l, r, t, b = r_cur l0, r0, t0, b0 = r_over if l >= (r0 + w_th) or r <= (l0 - w_th): return False if b <= (t0 - h_th) or t >= (b0 + h_th): return False return True def rect_from_poly(poly): min_x, max_x = np.min(poly[:, 0]), np.max(poly[:, 0]) min_y, max_y = np.min(poly[:, 1]), np.max(poly[:, 1]) return min_x, max_x, min_y, max_y def rotate_image_if_needed(image): from PIL import Image, ExifTags if hasattr(image, '_getexif'): # only present in JPEGs for orientation in ExifTags.TAGS.keys(): if ExifTags.TAGS[orientation]=='Orientation': break e = image._getexif() # returns None if no EXIF data if e is not None: exif=dict(e.items()) if orientation in exif: orientation = exif[orientation] if orientation == 3: image = image.transpose(Image.ROTATE_180) elif orientation == 6: image = image.transpose(Image.ROTATE_270) elif orientation == 8: image = image.transpose(Image.ROTATE_90) return image def is_night_scene(image, prob_map, sky_prob_threshold=200, brightness_threshold=100): """ Return True if it's a night scene image image: original image prob_map: the probability map of image segmentation (red: sky; green: building; blue: background, value from 0 to 255) sky_prob_threshold: pixel val > sky_prob_threshold will be segmented as sky brightness_threshold: val < brightness_threshold will be considered as night scene """ rotate_image_if_needed(image) image = np.array(image.convert('L')) sky, building, background = prob_map.split() # calculate average brightness of the sky: sky_mask = np.array(sky) sky_brightness = (sky_mask > sky_prob_threshold) * image if (np.count_nonzero(sky_brightness) == 0): return False else: avg_sky_brightness = sky_brightness[np.nonzero(sky_brightness)].mean() return avg_sky_brightness < brightness_threshold def detect_lines(img, fg_mask=None, length_thresh=None): """ Detects lines using OpenCV LSD Detector Return: n x 4 line start, line end """ # Convert to grayscale if required if len(img.shape) == 3: img_copy = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) else: img_copy = img h, w = img.shape[:2] if length_thresh is None: length_thresh = int(max(h, w) * 0.04) # Create LSD detector with default parameters lsd = cv2.createLineSegmentDetector(0) # Detect lines in the image # Returns a NumPy array of type N x 1 x 4 of float32 # such that the 4 numbers in the last dimension are (x1, y1, x2, y2) # These denote the start and end positions of a line lines = lsd.detect(img_copy)[0] # Remove singleton dimension lines = lines[:, 0] # Filter out the lines whose length is lower than the threshold dx = lines[:, 2] - lines[:, 0] dy = lines[:, 3] - lines[:, 1] lengths = np.sqrt(dx * dx + dy * dy) mask = lengths >= length_thresh lines = lines[mask] # todo remove lines at boundary if fg_mask: fg_mask = cv2.distanceTransform(fg_mask, distanceType=cv2.DIST_C, maskSize=5).astype(np.float32) select_id = np.ones((len(lines),)) for ind, l in enumerate(lines): ll = np.int0(l) dist = (fg_mask[ll[1], ll[0]] + fg_mask[ll[3], ll[2]]) * 0.5 if dist < 8: select_id[ind] = 0 lines = lines[select_id > 0] return lines def get_a_key(dict_data: Dict[str, Any]): """ Get first iterated key value from a dictionary. Args: dict_data (Dict[str, Any]): dict with string keys. Returns: Optional[str]: str key if non-empty, else None. """ if dict_data: key = next(iter(dict_data)) return key else: return None def shift_to_center(image, mask, shape=None): """ shift image object to center at mask center """ if shape is None: shape = image.shape[:2] assert mask.shape[0] == shape[0] cy, cx = shape[0] // 2, shape[1] // 2 positions = np.nonzero(mask) top = positions[0].min() bottom = positions[0].max() left = positions[1].min() right = positions[1].max() new_l = cx - (right - left) // 2 new_r = new_l + right - left new_top = cy - (bottom - top) // 2 new_bottom = new_top + bottom - top new_im = np.zeros(image.shape) new_im[new_top:new_bottom, new_l:new_r, :] = \ image[top:bottom, left:right, :] return new_im def ndarray_to_list(in_dict: dict): for key, item in in_dict.items(): if isinstance(item, np.ndarray): in_dict[key] = item.tolist() if isinstance(item, dict): in_dict[key] = ndarray_to_list(item) return in_dict """ encode image to string and decode it back """ def encode_b64(mat, format='.png'): mat = cv2.imencode(format, mat)[1] return base64.b64encode(mat).decode('utf-8') def decode64(string): jpg_original = base64.b64decode(string) jpg_as_np = np.frombuffer(jpg_original, dtype=np.uint8) img = cv2.imdecode(jpg_as_np, cv2.IMREAD_UNCHANGED) return img def remap_texture(triangle1, triangle2, texture): import numpy as np import cv2 # Convert input triangles to numpy arrays tri1 = np.array(triangle1, dtype=np.float32) tri2 = np.array(triangle2, dtype=np.float32) # Find the bounding rectangle of each triangle rect1 = cv2.boundingRect(tri1) rect2 = cv2.boundingRect(tri2) # Offset points by left top corner of the respective rectangles tri1_rect = np.float32(tri1 - rect1[:2]) tri2_rect = np.float32(tri2 - rect2[:2]) # Apply the affine transformation to map the texture from triangle1 to triangle2 warp_mat = cv2.getAffineTransform(tri1_rect, tri2_rect) warped_texture = cv2.warpAffine(texture, warp_mat, (rect2[2], rect2[3])) # Create a mask for the destination triangle mask = np.zeros((rect2[3], rect2[2], 3), dtype=np.uint8) cv2.fillConvexPoly(mask, np.int32(tri2_rect), (1.0, 1.0, 1.0), 16, 0) # Apply the mask to the warped texture remapped_texture = warped_texture * mask return remapped_texture, mask def fuse_rgb_mask(image, mask): """ image: h, w, [3,4] rgb or rgba image mask: h, w, [1,3] mask """ if isinstance(image, str): image = cv2.imread(image, cv2.IMREAD_UNCHANGED) if isinstance(mask, str): mask = cv2.imread(mask, cv2.IMREAD_UNCHANGED) if not shape_match([image, mask]): image = cv2.resize(image, (mask.shape[1], mask.shape[0])) if image.shape[-1] == 4: image = image[:, :, :3] if mask.shape[-1] == 3: mask = mask[:, :, 0] mask = mask[:, :, None] if mask.max() == 1: mask = mask * 255 return np.concatenate([image, mask], axis=2) def test_remap_texture(): # Define test input values triangle1 = [(0, 0), (50, 0), (0, 50)] triangle2 = [(0, 0), (100, 0), (0, 100)] texture = np.ones((50, 50, 3), dtype=np.uint8) * 255 # Call the remap_texture function with the test input values remapped_texture = remap_texture(triangle1, triangle2, texture) # Check if the output is as expected assert remapped_texture.shape == (100, 100, 3), "Remapped texture shape is incorrect" assert np.all(remapped_texture[:50, :50] == texture), "Texture not correctly remapped in the destination triangle" # Print a success message if the test passes print("Test passed: remap_texture function works as expected") def test_line_seg_cross(): seg1 = np.array([[0, 0], [1, 1]]) seg2 = np.array([[1, 0], [0, 1]]) print(line_segment_cross(seg1, seg2)) seg1 = np.array([[0, 0], [1, 1]]) seg2 = np.array([[1, 0], [1.5, 2]]) print(line_segment_cross(seg1, seg2)) if __name__ == '__main__': # test_one_hot() # test_resize2range() # test_prob2image() # test_line_seg_cross() # test = np.array([[0, 2], [1, 1], [1, 0], [0, 0]]) # area = PolyArea(test[:, 0], test[:, 1]) # print(area) # test_remap_texture() # pt = np.array([0.5, 0.5]) # rect = np.array([[0, 1], [1, 1], [1, 0], [0, 0]]) # print(pt_in_poly(pt, rect)) # test_file = "/opt/tiger/mzy-project/temp/BuildingAR/facader/test.png" # test_out = "/opt/tiger/mzy-project/temp/BuildingAR/facader/test2.png" # image = cv2.imread(test_file, cv2.IMREAD_UNCHANGED) # image = fillholes(image) # print(np.unique(image)) # cv2.imwrite(test_out, image * 255) # test = np.array([[0, 2], [1, 1], [1, 0], [0, 0]]) # print(overlap_poly(test, test)) # area = PolyArea(test[:, 0], test[s:, 1]) # print(area) # import plot_utils as p_uts # image = np.zeros((480, 640, 3)) # lines = np.array([[500.5 , 299.6 , 409.375, 235.375], # [504.575, 309.325, 415.625, 244.575]]) # pt, _ = line_intersect_pt(lines) # print(pt) # cv2.circle(image, np.int32(pt), 1, (255, 0, 0), 2) # image = p_uts.drawLines(image, lines.reshape([-1, 2, 2])) # cv2.imwrite('test.png', image) paths = "/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/tce/tce_tools/bin:/home/tiger/.local/bin:/opt/common_tools:/usr/local/go/bin:/opt/tiger/mlx_deploy/vscode/code-server-4.7.1-linux-amd64/lib/vscode/bin/remote-cli:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/opt/tiger/spark_deploy/spark-3.0/spark-stable/bin:/opt/mlx_deploy/miniconda3/envs/mlx/bin:/opt/tiger/mlx_deploy:/workspace:/opt/tiger/consul_deploy/bin/go:/root/miniconda3/bin:/root/miniconda3/condabin:/usr/local/cuda/bin:/workspace:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/tiger/ss_bin:/usr/local/jdk/bin:/usr/sbin:/opt/tiger/ss_lib/bin:/opt/tiger/ss_lib/python_package/lib/python2.7/site-packages/django/bin:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/opt/tiger/yarn_deploy/jdk/bin:/opt/tiger/hadoop_deploy/jython-2.5.2/bin:/usr/local/bvc/bin:/opt/tiger/arnold/bin:/workspace/bernard/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/workspace:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin:/opt/tiger/nastk/bin:/workspace://bin:/opt/tiger/ss_bin:/opt/tiger/ss_lib/bin:/opt/common_tools:/opt/tiger/yarn_deploy/hadoop/bin:/opt/tiger/yarn_deploy/hive/bin" paths = paths.split(":") check_file_in_paths(paths, "docker")