Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import re | |
import numpy as np | |
import cv2 | |
import json | |
import yaml | |
import vis_utils as v_uts | |
import struct | |
from cv_base import ( | |
Faces, Aux, Obj, DEFAULT_MATERIAL | |
) | |
hasTorch = True | |
try: | |
import torch | |
except: | |
hasTorch = False | |
import functools | |
import pandas as pd | |
from tqdm import tqdm | |
from PIL import Image | |
try: | |
from plyfile import PlyData | |
except: | |
"no ply" | |
import pdb | |
b=pdb.set_trace | |
def default(x, val): | |
return val if x is None else x | |
class IOShop: | |
def __init__(self, name, **kwargs): | |
ioFuncs = {'depth': DepthIO, | |
'image': ImageIO, | |
'flow': FlowIO, | |
'segment': SegmentIO, | |
'prob': ProbIO, | |
'video': VideoIO} | |
self.io = ioFuncs[name](**kwargs) | |
def load(self, file_name, **kwargs): | |
return self.io.load(file_name, **kwargs) | |
def dump(self, file_name, file, **kwargs): | |
self.io.dump(file_name, file, **kwargs) | |
class BaseIO: | |
def __init__(self, appex='jpg'): | |
self.type = 'image' | |
self.appex = appex | |
def load(self, file_name): | |
file_name = '%s.%s' % (file_name, self.appex) | |
image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED) | |
assert not (image is None), '%s not exists' % file_name | |
return image | |
def dump(self, file_name, file): | |
v_uts.mkdir_if_need(os.path.dirname(file_name)) | |
file_name = '%s.%s' % (file_name, self.appex) | |
cv2.imwrite(file_name, file) | |
class ImageIO(BaseIO): | |
def __init__(self, appex='jpg'): | |
super(ImageIO, self).__init__(appex=appex) | |
self.type = 'image' | |
def load(self, file_name): | |
if file_name.endswith('heic') or file_name.endswith('HEIC'): | |
byte = read2byte(file_name) | |
image = decodeImage(byte) | |
else: | |
image = super(ImageIO, self).load(file_name) | |
return image | |
def imwrite(file_name, data, order='rgb'): | |
cv2.imwrite(file_name, data[:, :, ::-1]) | |
class SegmentIO(BaseIO): | |
def __init__(self): | |
super(SegmentIO, self).__init__(appex='png') | |
self.type = 'segment' | |
class ProbIO(BaseIO): | |
def __init__(self): | |
super(ProbIO, self).__init__() | |
self.type = 'prob' | |
self.max_class = 4 | |
def load(self, file_name, channels=None): | |
image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED) | |
channels = default(channels, self.max_class) | |
output = np.zeros(image.shape[:2]) | |
# for i in range(channels): | |
def dump(self, file_name, file): | |
""" | |
height, width, channel | |
""" | |
output = np.zeros((height, width), dtype=np.uint16) | |
h, w, c = file.shape | |
for i in range(c): | |
output = output + np.uint16(file[:, :, i] * 255) + i * 256 | |
cv2.imwrite(file_name, output.astype('uint16')) | |
class MeshIO(BaseIO): | |
def __init__(self): | |
super().__init__(appex='obj') | |
self.type = 'mesh' | |
def dump_obj(self, filename, obj): | |
export_obj(filename, obj) | |
def load_obj(self, filename): | |
return load_obj(filename) | |
def normalize_normal(mat): | |
mat = (mat / 255.0 * 2.0 - 1.0).astype('float32') | |
l1 = np.linalg.norm(mat, axis=2) | |
for j in range(3): | |
mat[:,:,j] /= (l1 + 1e-9) | |
return mat | |
class NormalIO(BaseIO): | |
def __init__(self, xyz='rgb'): | |
""" | |
rgb: means the normal saved in the order of x: r ... | |
""" | |
self._xyz = xyz | |
def read(self, filename): | |
normal = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
if self._xyz == 'rgb': | |
normal = normal[:, :, ::-1] | |
normal = normalize_normal(normal) | |
return normal | |
class DepthIO(BaseIO): | |
def __init__(self, bit=8): | |
super(DepthIO, self).__init__(appex='pfm') | |
assert bit in [8, 16] | |
scale = {8: 1, 16: 2} | |
self.bits = scale[bit] | |
self.dump_vis = True | |
def load(self, path): | |
"""Read pfm file. | |
Args: | |
path (str): path to file | |
Returns: | |
tuple: (data, scale) | |
""" | |
path = '%s.%s' % (path, self.appex) | |
with open(path, "rb") as file: | |
color = None | |
width = None | |
height = None | |
scale = None | |
endian = None | |
header = file.readline().rstrip() | |
if header.decode("ascii") == "PF": | |
color = True | |
elif header.decode("ascii") == "Pf": | |
color = False | |
else: | |
raise Exception("Not a PFM file: " + path) | |
dim_match = re.match(r"^(\d+)\s(\d+)\s$", file.readline().decode("ascii")) | |
if dim_match: | |
width, height = list(map(int, dim_match.groups())) | |
else: | |
raise Exception("Malformed PFM header.") | |
scale = float(file.readline().decode("ascii").rstrip()) | |
if scale < 0: | |
# little-endian | |
endian = "<" | |
scale = -scale | |
else: | |
# big-endian | |
endian = ">" | |
data = np.fromfile(file, endian + "f") | |
shape = (height, width, 3) if color else (height, width) | |
data = np.reshape(data, shape) | |
data = np.flipud(data) | |
return data, scale | |
def dump(self, path, image, scale=1): | |
"""Write pfm file. | |
Args: | |
path (str): pathto file | |
image (array): data | |
scale (int, optional): Scale. Defaults to 1. | |
""" | |
v_uts.mkdir_if_need(os.path.dirname(path)) | |
path = path + '.pfm' | |
with open(path, "wb") as file: | |
color = None | |
if image.dtype.name != "float32": | |
raise Exception("Image dtype must be float32.") | |
image = np.flipud(image) | |
if len(image.shape) == 3 and image.shape[2] == 3: # color image | |
color = True | |
elif ( | |
len(image.shape) == 2 or len(image.shape) == 3 and image.shape[2] == 1 | |
): # greyscale | |
color = False | |
else: | |
raise Exception("Image must have H x W x 3, H x W x 1 or H x W dimensions.") | |
file.write("PF\n" if color else "Pf\n".encode()) | |
file.write("%d %d\n".encode() % (image.shape[1], image.shape[0])) | |
endian = image.dtype.byteorder | |
if endian == "<" or endian == "=" and sys.byteorder == "little": | |
scale = -scale | |
file.write("%f\n".encode() % scale) | |
image.tofile(file) | |
if self.dump_vis: | |
self.dump_visualize(path[:-4], image, self.bits) | |
def to8UC3(depth, scale=1000): | |
""" | |
Convert depth image to 8UC3 format. | |
""" | |
h, w = depth.shape | |
max_depth = (256.0 ** 3 - 1) / scale | |
# Clip depth values exceeding the maximum depth | |
depth = np.clip(depth, 0, max_depth) | |
# Scale the depth values | |
value = depth * scale | |
# Split the depth values into three channels | |
ch = np.zeros((h, w, 3), dtype=np.uint8) | |
ch[:, :, 0] = np.uint8(value / (256 ** 2)) | |
ch[:, :, 1] = np.uint8((value % (256 ** 2)) / 256) | |
ch[:, :, 2] = np.uint8(value % 256) | |
return ch | |
def read8UC3(depth, scale=1000): | |
""" | |
Convert 8UC3 image to scaled depth representation. | |
""" | |
if isinstance(depth, str): | |
depth = cv2.imread(depth, cv2.IMREAD_UNCHANGED) | |
# Merge the three channels into a single depth value | |
depth_uint16 = depth[:, :, 0] * (256 ** 2) + \ | |
depth[:, :, 1] * 256 + depth[:, :, 2] | |
# Convert depth to the scaled representation | |
depth = depth_uint16.astype(np.float32) / scale | |
return depth | |
def dump_visualize(path, depth, bits=1): | |
depth_min = depth.min() | |
depth_max = depth.max() | |
max_val = (2**(8*bits))-1 | |
if depth_max - depth_min > np.finfo("float").eps: | |
out = max_val * (depth - depth_min) / (depth_max - depth_min) | |
else: | |
out = 0 | |
if bits == 1: | |
cv2.imwrite(path + ".png", out.astype("uint8")) | |
elif bits == 2: | |
cv2.imwrite(path + ".png", out.astype("uint16")) | |
return | |
def load_png(path): | |
depth = cv2.imread(path, cv2.IMREAD_UNCHANGED) | |
return depth | |
def dump_png(path, depth, bits=2, max_depth=20.0): | |
assert (path.endswith(".png")) | |
max_val = (2**(8*bits))-1 | |
depth = depth / max_depth * max_val | |
cv2.imwrite(path, depth.astype("uint16")) | |
def read_depth(filename, scale=6000, sz=None, is_disparity=False): | |
if not hasTorch: | |
return None | |
depth = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
depth = np.float32(depth) / scale | |
if sz: | |
h, w = sz | |
depth = cv2.resize(depth, (w, h), | |
interpolation=cv2.INTER_NEAREST) | |
depth = torch.from_numpy(depth) | |
if is_disparity: # convert to depth | |
depth = 1.0 / torch.clamp(depth, min=1e-10) | |
return depth | |
def write_depth(path, depth, grayscale, bits=1): | |
"""Write depth map to png file. | |
Args: | |
path (str): filepath without extension | |
depth (array): depth | |
grayscale (bool): use a grayscale colormap? | |
""" | |
if not grayscale: | |
bits = 1 | |
if not np.isfinite(depth).all(): | |
depth=np.nan_to_num(depth, nan=0.0, posinf=0.0, neginf=0.0) | |
print("WARNING: Non-finite depth values present") | |
depth_min = depth.min() | |
depth_max = depth.max() | |
max_val = (2**(8*bits))-1 | |
if depth_max - depth_min > np.finfo("float").eps: | |
out = max_val * (depth - depth_min) / (depth_max - depth_min) | |
else: | |
out = np.zeros(depth.shape, dtype=depth.dtype) | |
if not grayscale: | |
out = cv2.applyColorMap(np.uint8(out), cv2.COLORMAP_INFERNO) | |
if bits == 1: | |
cv2.imwrite(path + ".png", out.astype("uint8")) | |
elif bits == 2: | |
cv2.imwrite(path + ".png", out.astype("uint16")) | |
return | |
class NormalIO(BaseIO): | |
def __init__(self): | |
super(NormalIO, self).__init__(appex='npy') | |
self.dump_vis = False | |
def read_normal(filename, sz=None, to_torch=False): | |
if not hasTorch: | |
return None | |
if not os.path.exists(filename): | |
h, w = sz | |
return torch.ones((h, w, 3)) * 0.3 | |
image = cv2.imread(filename)[:, :, ::-1] | |
image = np.float32(image) | |
image = (image / 127.5 - 1) | |
if sz: | |
h, w = sz | |
image = cv2.resize(image, (w, h), | |
interpolation=cv2.INTER_NEAREST) | |
return torch.from_numpy(image) | |
def to8UC3(self, normal): | |
return np.uint8((normal + 1) * 127.5) | |
class FlowIO(BaseIO): | |
def __init__(self): | |
super(FlowIO, self).__init__(appex='npy') | |
self.dump_vis = False | |
def normalize(self, flow, shape=None): | |
if shape is None: | |
shape = flow.shape[:2] | |
flow[:, :, 0] /= shape[1] | |
flow[:, :, 1] /= shape[0] | |
return flow | |
def denormalize(self, flow, shape=None): | |
if shape is None: | |
shape = flow.shape[:2] | |
flow[:, :, 0] *= shape[1] | |
flow[:, :, 1] *= shape[0] | |
return flow | |
def visualization(self, flow): | |
pass | |
def load(self, path, shape=None): | |
path = path + '.npy' | |
flow = np.load(path) | |
flow = self.denormalize(flow, shape) | |
assert flow is not None | |
return flow | |
def dump(self, path, flow): | |
v_uts.mkdir_if_need(os.path.dirname(path)) | |
path = path + '.npy' | |
flow = self.normalize(flow) | |
np.save(path, flow) | |
if self.dump_vis: | |
self.dump_visualize(path[:-4], flow) | |
def dump_visualize(self, path, flow): | |
_, flow_c = v_uts.flow2color(flow) | |
cv2.imwrite(path + '.png', flow_c) | |
class VideoIO(BaseIO): | |
def __init__(self, longside_len=None): | |
super(VideoIO, self).__init__() | |
self.longside_len = longside_len | |
def get_fps(self, path): | |
vidcap = cv2.VideoCapture(path) | |
return vidcap.get(cv2.CAP_PROP_FPS) | |
def load_first_frame(self, path): | |
import skvideo.io as vio | |
video = vio.vreader(path) | |
frame = next(video) | |
if self.longside_len is not None: | |
frame = v_uts.resize2maxsize(frame, self.longside_len) | |
return frame | |
def load(self, path, sample_rate=1, max_len=1e10, | |
load_to_dir=False, | |
dir_name=None, | |
pre_len=5, | |
save_transform=None): | |
import skvideo.io as vio | |
def default_transform(x): | |
if x.ndim == 2: | |
return x | |
if x.ndim == 3 and x.shape[2] == 3: | |
return x[:, :, ::-1] | |
return x | |
frames = [] | |
reader = vio.vreader(path) | |
if load_to_dir: | |
v_uts.mkdir(dir_name) | |
if save_transform is None: | |
save_transform = lambda x : x | |
for count, frame in enumerate(reader): | |
if count == max_len: | |
break | |
if count % sample_rate == 0: | |
if self.longside_len is not None: | |
frame = v_uts.resize2maxsize( | |
frame, self.longside_len) | |
if load_to_dir: | |
img_file = f"{dir_name}/{count:05}.png" | |
frame = save_transform(frame) | |
cv2.imwrite(img_file, frame) | |
else: | |
frames.append(frame) | |
if not load_to_dir: | |
return frames | |
def load_till_end(self, path, sample_rate=1): | |
import skvideo.io as vio | |
frames = [] | |
reader = vio.vreader(path) | |
count = 0 | |
while True: | |
try: | |
frame = next(reader) | |
except: | |
break | |
if count % sample_rate == 0: | |
if self.longside_len is not None: | |
frame = v_uts.resize2maxsize( | |
frame, self.longside_len) | |
frames.append(frame) | |
count += 1 | |
return frames | |
def load_w_cv(self, path, out_dir, sample_rate = 1, ext="jpg"): | |
v_uts.video_to_frame(path, | |
out_dir, | |
max_len=self.longside_len, | |
sample_rate=sample_rate, | |
ext=ext) | |
def dump_to_images(self, frames, image_path): | |
v_uts.mkdir_if_need(image_path) | |
for count, frame in tqdm(enumerate(frames)): | |
image_file = '%s/%04d.jpg' % (image_path, count) | |
cv2.imwrite(image_file, frame[:, :, ::-1]) | |
def dump(self, path, frames, fps=30, lossless=False): | |
from moviepy.editor import ImageSequenceClip, VideoFileClip | |
if isinstance(frames[0], str): | |
frame_np = [] | |
for frame in tqdm(frames): | |
cur_frame = cv2.imread(frame, cv2.IMREAD_UNCHANGED)[:, :, ::-1] | |
frame_np.append(cur_frame) | |
frames = frame_np | |
clip = ImageSequenceClip(frames, fps) | |
if lossless: | |
assert path.endswith('avi') | |
clip.write_videofile(path, codec='png') | |
else: | |
clip.write_videofile(path, fps=fps) | |
def dump_skv(self, path, frames, fps=30): | |
if frames[0].ndim == 2: | |
frames = [cv2.cvtColor(frame,cv2.COLOR_GRAY2RGB) for frame in frames] | |
else: | |
frames = [frame[:, :, ::-1] for frame in frames] | |
v_uts.frame_to_video_simple(frames, fps, video_name=path) | |
# import skvideo.io as vio | |
# fps = str(int(fps)) | |
# vid_out = vio.FFmpegWriter(path, | |
# inputdict={'-r': fps}, | |
# outputdict={ | |
# '-vcodec': 'libx264', | |
# '-pix_fmt': 'yuv420p', | |
# '-r': fps, | |
# }, | |
# verbosity=1) | |
# for idx, frame in enumerate(frames): | |
# vid_out.writeFrame(frame) | |
# vid_out.close() | |
def resave_video(self, video_file, start, end, | |
outvideo_file): | |
""" | |
:param start: sec start | |
:param end: sec end | |
:return: | |
""" | |
fps = self.get_fps(video_file) | |
frames = self.load(video_file) | |
start_frame = int(start * fps) | |
end_frame = int(end * fps) | |
frames = frames[start_frame:end_frame] | |
self.dump_skv(outvideo_file, frames, fps) | |
def frame2video(self, folder, output, ext=".jpg"): | |
image_files = v_uts.list_all_files(folder, exts=[ext]) | |
frames = [] | |
for name in tqdm(image_files): | |
frames.append(cv2.imread(name)[:, :, ::-1]) | |
self.dump(output, frames) | |
class NpEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.integer): | |
return int(obj) | |
if isinstance(obj, np.floating): | |
return float(obj) | |
if isinstance(obj, np.ndarray): | |
return obj.tolist() | |
return super(NpEncoder, self).default(obj) | |
def read2byte(filename): | |
with open(filename, 'rb') as f: | |
file_data = f.read() | |
return file_data | |
def decodeImage(bytesIo): | |
import whatimage | |
import pyheif | |
from PIL import Image | |
fmt = whatimage.identify_image(bytesIo) | |
if fmt in ['heic', 'avif']: | |
i = pyheif.read_heif(bytesIo) | |
# Convert to other file format like jpeg | |
pi = Image.frombytes( | |
mode=i.mode, size=i.size, data=i.data) | |
image = np.asarray(pi) | |
image = image[:, :, ::-1] # to BGR | |
return image | |
else: | |
return None | |
def image2Normal(imagePath): | |
from skimage import io | |
normal = io.imread(imagePath) | |
normal = ((np.float32(normal) / 255.0) * 2 - 1.0 ) | |
return normal | |
def normal2Image(normal): | |
nm_pred_val = (normal + 1.) / 2. | |
nm_pred_val = np.uint8(nm_pred_val*255.) | |
return nm_pred_val | |
def dump_normal(filename, normal): | |
normal = normal2Image(normal) | |
cv2.imwrite(filename + '.png', array) | |
def dump_prob2image(filename, array): | |
""" | |
dump probility map to image when | |
array: [x, height, width] (x = 1, 3, 4) | |
""" | |
class_num = array.shape[0] | |
# assert class_num <= 4 | |
if class_num >= 4 : | |
print('warning: only save the first 3 channels') | |
array = array[:3, :, :] | |
if class_num == 2: | |
raise ValueError('not implement') | |
array = np.transpose(np.uint8(array * 255), (1, 2, 0)) | |
if filename.endswith('.png'): | |
cv2.imwrite(filename, array) | |
return | |
cv2.imwrite(filename + '.png', array) | |
assert os.path.exists(filename) | |
def load_image2prob(filename): | |
if not filename.endswith('.png'): | |
filename = filename + '.png' | |
array = cv2.imread(filename, cv2.IMREAD_UNCHANGED) | |
array = np.transpose(array, (2, 0, 1)) / 255 | |
return array | |
def shape_match(images): | |
assert len(images) > 1 | |
shape = images[0].shape[:2] | |
for image in images[1:]: | |
cur_shape = image.shape[:2] | |
if np.sum(np.abs(np.array(shape) - \ | |
np.array(cur_shape))): | |
return False | |
return True | |
def append_apex(filename, appex): | |
filename = filename.split('.') | |
prefix = '.'.join(filename[:-1]) | |
filetype = filename[-1] | |
return '%s_%s.%s' % (prefix, appex, filetype) | |
def load_json(json_file): | |
with open(json_file) as f: | |
res = json.load(f) | |
return res | |
def dump_numpy(filename, x: np.ndarray): | |
np.savetxt(filename, x, delimiter=' ', fmt='%1.6f') | |
def dump_json(filename, odgt, w_np=False): | |
with open(filename, 'w') as f: | |
if not w_np: | |
json.dump(odgt, f, indent=4) | |
else: | |
json.dump(odgt, f, indent=4, cls=NpEncoder) | |
def dump_jsonl(filename, odgt): | |
with open(filename, 'w') as file: | |
for entry in odgt: | |
json.dump(entry, file) | |
file.write('\n') | |
def dump_pair_data(image_list, | |
label_list, | |
outfile, | |
root='', | |
data_type='txt', | |
fields=None): | |
if fields is None: | |
fields = ["image", "segment"] | |
if data_type == 'txt': | |
fp = open(outfile, 'w') | |
for imagefile, labelfile in zip(image_list, label_list): | |
imagefile = imagefile.replace(root, '.') | |
labelfile = labelfile.replace(root, '.') | |
fp.write('%s %s\n' % (imagefile, labelfile)) | |
fp.close() | |
elif data_type == "odgt": | |
odgt = [] | |
for imagefile, labelfile in zip(image_list, label_list): | |
imagefile = imagefile.replace(root, '.') | |
labelfile = labelfile.replace(root, '.') | |
item = {fields[0]: imagefile, | |
fields[1]: labelfile} | |
odgt.append(item) | |
dump_json(outfile, odgt) | |
def save_xlsx(filename, dicts, sheets=None): | |
""" | |
Save a list of dicts to an xlsx file. | |
""" | |
with pd.ExcelWriter(filename, mode='w') as writer: | |
if sheets is None: | |
df1 = pd.DataFrame(dicts) | |
df1.to_excel(writer, index=False) | |
return | |
for sheet in sheets: | |
df1 = pd.DataFrame(dicts[sheet]) | |
df1.to_excel(writer, sheet_name=sheet, index=False) | |
def load_xlsx(filename, sheets=None): | |
assert os.path.exists(filename) , f"File not found: {filename}" | |
if sheets is None: | |
df = pd.read_excel(filename) | |
dict = {} | |
for column in df.columns: | |
dict[column] = df[column].tolist() | |
else: | |
dict = {} | |
for sheet in sheets: | |
df = pd.read_excel(filename, sheet_name=sheet) | |
cur_dict = {} | |
for column in df.columns: | |
cur_dict[column] = df[column].tolist() | |
print(cur_dict.keys()) | |
dict[sheet] = cur_dict | |
print(dict.keys()) | |
return dict | |
def dump_lines(filename, file_list): | |
f = open(filename, 'w') | |
tbar = tqdm(file_list) | |
for i, elements in enumerate(tbar): | |
if isinstance(elements, (tuple, list)): | |
line = ' '.join(elements) | |
elif isinstance(elements, str): | |
line = elements | |
appex = '' if i == len(file_list) - 1 else '\n' | |
f.write('%s%s' % (line, appex)) | |
f.close() | |
def load_lines(txt_file): | |
lines = [line.strip() for line in open(txt_file, 'r')] | |
return lines | |
def load_jsonl(jsonl_file): | |
# List to hold all JSON objects | |
data = [] | |
# Open the file and read line by line | |
with open(jsonl_file, 'r') as file: | |
for line in file: | |
# Each line is a JSON object, parse it and append to the list | |
json_object = json.loads(line) | |
data.append(json_object) | |
return data | |
def load_yaml(yaml_file): | |
with open(yaml_file, "r") as f: | |
yaml_dict = yaml.safe_load(f) | |
return yaml_dict | |
def load_odgt(odgt): | |
try: | |
samples = [json.loads(x.rstrip()) \ | |
for x in open(odgt, 'r')][0] | |
except: | |
samples = load_json(odgt) | |
print(samples[0].keys()) | |
return samples | |
def fuse_odgt(odgt_files): | |
""" | |
odgt_files: | |
""" | |
odgt_full = [] | |
for odgt_file in odgt_files: | |
odgt = load_odgt(odgt_file) | |
odgt_full = odgt_full + odgt | |
return odgt_full | |
def load_video_first_frame(video_name): | |
cap = cv2.VideoCapture(video_name) | |
if(cap.isOpened()): | |
ret, frame = cap.read() | |
else: | |
raise ValueError("can not read %s" % video_name) | |
return frame | |
def load_lines(txt_file): | |
lines = [line.strip() for line in open(txt_file, 'r')] | |
return lines | |
def load_csv(csv_file): | |
import csv | |
lines = [] | |
with open(csv_file) as f: | |
reader = csv.reader(f, delimiter=',') | |
for row in reader: | |
lines.append(row) | |
return lines[1:] | |
# cat multi files in to a single file | |
def cat_files(files, output): | |
all_lines = [] | |
for filename in files: | |
lines = load_lines(filename) | |
all_lines = all_lines + lines | |
dump_lines(output, all_lines) | |
class SkipExist: | |
def __init__(self, | |
processor, | |
ioType='image', | |
need_res=False, | |
rerun=False): | |
self.ioType = ioType | |
self.io = IOShop(self.ioType).io | |
self.processor = processor | |
self.rerun = rerun | |
self.need_res = need_res | |
def __call__(self, *args, **kwargs): | |
assert 'filename' in kwargs | |
true_file = '%s.%s' % (kwargs['filename'], self.io.appex) | |
if os.path.exists(true_file): | |
if self.need_res: | |
res = self.io.load(kwargs['filename']) | |
return res | |
else: | |
filename = kwargs['filename'] | |
del kwargs['filename'] | |
res = self.processor(*args, **kwargs) | |
self.io.dump(filename, res) | |
def dump_pkl(filename, data): | |
import pickle as pkl | |
with open(filename, "wb") as fl: | |
pkl.dump(data, fl) | |
def load_pkl(filename): | |
import pickle as pkl | |
with open(filename, 'rb') as fl: | |
res = pkl.load(fl) | |
return res | |
def write_pointcloud(filename, xyz_points, faces=None, rgb_points=None): | |
""" | |
creates a .pkl file of the point clouds generated | |
""" | |
assert xyz_points.shape[1] == 3,'Input XYZ points should be Nx3 float array' | |
if rgb_points is None: | |
rgb_points = np.ones(xyz_points.shape).astype(np.uint8) * 255 | |
else: | |
rgb_points = rgb_points.astype(np.uint8) | |
assert xyz_points.shape == rgb_points.shape,\ | |
f'Input RGB colors should be Nx3 {rgb_points.shape} float array \ | |
and have same size as input XYZ points {xyz_points.shape}' | |
# Write header of .ply file | |
fid = open(filename,'wb') | |
fid.write(bytes('ply\n', 'utf-8')) | |
fid.write(bytes('format binary_little_endian 1.0\n', 'utf-8')) | |
fid.write(bytes('element vertex %d\n'%xyz_points.shape[0], 'utf-8')) | |
fid.write(bytes('property float x\n', 'utf-8')) | |
fid.write(bytes('property float y\n', 'utf-8')) | |
fid.write(bytes('property float z\n', 'utf-8')) | |
fid.write(bytes('property uchar red\n', 'utf-8')) | |
fid.write(bytes('property uchar green\n', 'utf-8')) | |
fid.write(bytes('property uchar blue\n', 'utf-8')) | |
fid.write(bytes('end_header\n', 'utf-8')) | |
# Write 3D points to .ply file | |
for i in range(xyz_points.shape[0]): | |
fid.write(bytearray(struct.pack("fffccc",xyz_points[i,0],xyz_points[i,1],xyz_points[i,2], | |
rgb_points[i,0].tostring(),rgb_points[i,1].tostring(), | |
rgb_points[i,2].tostring()))) | |
if faces is not None: | |
for face in faces: | |
fid.write(struct.pack("<B", face[0])) | |
fid.write(struct.pack("<{}i".format(face[0]), *face[1])) | |
fid.close() | |
def read_ply(filename): | |
# Load the PLY file | |
ply_data = PlyData.read(filename) | |
# Access the vertex data | |
vertex_data = ply_data['vertex'] | |
# Extract x, y, z coordinates as a numpy array | |
points = np.vstack((vertex_data['x'], vertex_data['y'], vertex_data['z'])).T | |
return points | |
def load_obj(file_path): | |
verts = [] | |
normals = [] | |
uvs = [] | |
material_colors = [] | |
texture_images = [] | |
texture_atlas = [] | |
faces_verts = [] | |
faces_normals = [] | |
faces_textures = [] | |
faces_materials = [] | |
with open(file_path, 'r') as file: | |
for line in file: | |
if line.startswith('v '): | |
vertex = [float(v) for v in line.split()[1:]] | |
verts.append(vertex) | |
elif line.startswith('vn '): | |
normal = [float(n) for n in line.split()[1:]] | |
normals.append(normal) | |
elif line.startswith('vt '): | |
uv = [float(u) for u in line.split()[1:]] | |
uvs.append(uv) | |
elif line.startswith("mtllib "): | |
mtl_name = line.split()[1] | |
elif line.startswith('vc '): | |
color = [float(c) for c in line.split()[1:]] | |
material_colors.append(color) | |
elif line.startswith('usemtl '): | |
material = line.split()[1] | |
texture_images.append(material) | |
elif line.startswith('f '): | |
face_data = line.split()[1:] | |
face_verts = [] | |
face_normals = [] | |
face_textures = [] | |
for face in face_data: | |
res = face.split('/') | |
vert = res[0] | |
face_verts.append(int(vert)) | |
if len(res) == 2: | |
texture = res[1] | |
face_textures.append(int(texture)) | |
if len(res) == 3: | |
normal = res[2] | |
face_normals.append(int(normal)) | |
faces_verts.append(face_verts) | |
faces_normals.append(face_normals) | |
faces_textures.append(face_textures) | |
faces_materials.append(len(texture_images) - 1) | |
mtl_file = f"{os.path.dirname(file_path)}/{mtl_name}" | |
with open(mtl_file, 'r') as file: | |
for line in file: | |
if line.startswith("map_Kd"): | |
image_name = line.split()[1] | |
break | |
assert len(texture_images) == 1 | |
texture_name = texture_images[0] | |
image = cv2.imread(f"{os.path.dirname(file_path)}/{image_name}") | |
properties = Aux( | |
normals=np.array(normals), | |
verts_uvs=np.array(uvs), | |
material_colors=DEFAULT_MATERIAL, | |
texture_images={texture_name: np.float32(image)/ 255.0}, | |
texture_atlas=None) | |
faces_verts=np.array(faces_verts) | |
num_faces = faces_verts.shape[0] | |
faces = Faces( | |
verts_idx=faces_verts, | |
normals_idx=np.ones(faces_verts.shape) * -1, | |
textures_idx=np.array(faces_textures), | |
materials_idx=np.zeros(num_faces)) | |
obj = Obj(np.array(verts), faces, properties) | |
return obj | |
def export_obj(filename, obj, | |
include_normals=False, | |
include_textures=True): | |
""" | |
Export the given object to an .obj file with optional normals and textures. | |
Args: | |
filename (str): Path to the output .obj file (without the extension). | |
obj (namedtuple): Object containing vertices, faces, and properties. | |
include_normals (bool): Flag to include normals in the .obj file. | |
include_textures (bool): Flag to include textures in the .obj file. | |
""" | |
material_name = list(obj.properties.texture_images.keys())[0] | |
# Write obj file | |
name = os.path.basename(filename) | |
with open(filename + ".obj", "w") as f: | |
f.write("\n") | |
if include_textures: | |
f.write(f"mtllib {name}.mtl\n") | |
f.write("\n") | |
for vert in obj.verts: | |
x, y, z = vert | |
f.write(f"v {x} {y} {z}\n") | |
if include_textures: | |
for uv in obj.properties.verts_uvs: | |
x, y = uv | |
f.write(f"vt {x} {y}\n") | |
f.write(f"usemtl {material_name}\n") | |
num_faces = obj.faces.verts_idx.shape[0] | |
for i in range(num_faces): | |
f0, f1, f2 = obj.faces.verts_idx[i] | |
if include_textures: | |
t0, t1, t2 = obj.faces.textures_idx[i] | |
if t0 == -1: | |
f.write(f"f {f0} {f1} {f2}\n") | |
continue | |
f.write(f"f {f0}/{t0} {f1}/{t1} {f2}/{t2}\n") | |
else: | |
f.write(f"f {f0} {f1} {f2}\n") | |
# Write mtl file | |
if include_textures: | |
output_dir = os.path.dirname(filename) | |
with open(f"{output_dir}/{name}.mtl", "w") as f: | |
f.write(f"newmtl {material_name}\n") | |
f.write(f"map_Kd {name}.png\n") | |
material_colors = obj.properties.material_colors[material_name] | |
r, g, b = material_colors["ambient_color"] | |
f.write(f"Ka {r} {g} {b}\n") | |
r, g, b = material_colors["diffuse_color"] | |
f.write(f"Kd {r} {g} {b}\n") | |
r, g, b = material_colors["specular_color"] | |
f.write(f"Ks {r} {g} {b}\n") | |
s = material_colors["shininess"] | |
f.write(f"Ns {s}\n") | |
# Save texture image | |
image = obj.properties.texture_images[material_name] * 255 | |
texture_img = f"{output_dir}/{name}.png" | |
cv2.imwrite(texture_img, image) | |
return | |
def resave_to_video(): | |
folder = "/Users/peng/Downloads/DenseAR/Mesh/" | |
vname = "0037438511" | |
image_num = 125 | |
frames = [] | |
d_frames = [] | |
crop = [0, 650, 1080, 1270] | |
for i in tqdm(range(image_num)): | |
name = f"{folder}/{vname}/{i}.jpg" | |
d_name = f"{folder}/{vname}/{i}.tiff" | |
img = np.array(Image.open(name)) | |
depth = np.array(Image.open(d_name)) | |
if img is None: | |
continue | |
img = img[crop[0]:crop[2], crop[1]:crop[3]] | |
depth = depth[crop[0]:crop[2], crop[1]:crop[3]] | |
depth = 1.0 / np.maximum(depth, 1e-10) | |
depth = p_uts.depth2color(depth, max_d=50) | |
frames.append(img) | |
d_frames.append(depth) | |
vio = io_uts.VideoIO() | |
video_file = f"{folder}/{vname}.mp4" | |
d_video_file = f"{folder}/{vname}_d.mp4" | |
vio.dump_skv(video_file, frames, fps=24) | |
vio.dump_skv(d_video_file, d_frames, fps=24) | |
def test_depth_8uc3_encode(): | |
depth = np.random.rand(480, 640) * 200 | |
dio = DepthIO() | |
depth_encode = dio.to8UC3(depth) | |
depth_decode = dio.read8UC3(depth_encode) | |
print(depth, depth_decode) | |
assert np.sum(np.abs(depth - depth_decode)) / (480 * 640) < 1e-3 | |
########### copy from gta code ################ | |
def build_mesh(w, h): | |
w = np.linspace(-1.0, 1.0, num=w, dtype=np.float32) | |
h = np.linspace(1.0, -1.0, num=h, dtype=np.float32) | |
return np.stack(np.meshgrid(w, h), axis=0) | |
def build_proj_matrix(fov, aspect): | |
proj = np.zeros((4, 4)) | |
proj[0, 0] = 1.0 / np.tan(np.radians(fov / 2)) / aspect | |
proj[1, 1] = 1.0 / np.tan(np.radians(fov / 2)) | |
proj[2, 2] = 0.00001502 # reverse-engineered get from shader | |
proj[2, 3] = 0.15000225 # reverse-engineered get from shader | |
proj[3, 2] = -1.0 | |
return proj | |
def zbuffer_to_depth(zbuffer, fov): | |
height, width = zbuffer.shape[:2] | |
aspect = width / height | |
mesh = build_mesh(width, height) | |
if len(zbuffer.shape) != 3: | |
zbuffer = np.expand_dims(zbuffer, 0) | |
pcloud = np.concatenate((mesh, zbuffer, np.ones_like(zbuffer)), 0) | |
pcloud = pcloud.reshape(4, height * width) | |
proj_matrix = build_proj_matrix(fov, aspect) | |
pcloud = np.linalg.inv(proj_matrix) @ pcloud | |
depth = -pcloud[2] / pcloud[3] | |
focal_cv = proj_matrix[0, 0] * width / 2.0 | |
return depth.reshape(height, width), focal_cv | |
def test_zbuffer_to_depth(): | |
# root = "E:/Dataset/GTA/Stereo_0/" | |
# name = root + "1-130423915874" | |
name = "E:/depth_video/0036696165/1" | |
config = load_json(name + ".json") | |
fov = config["fov"] | |
zbuffer = cv2.imread(name + ".tiff", cv2.IMREAD_UNCHANGED) | |
depth, focal = zbuffer_to_depth(zbuffer, fov) | |
print(depth) | |
def fuse_frames_of_depth_video(): | |
""" | |
frames: list of images or video | |
""" | |
def frame_to_video(video_dir, video_name): | |
frames = v_uts.list_all_files(video_dir, exts=['jpg']) | |
rgb_video = f"{video_name}.mp4" | |
depth_video = f"{video_name}_d.avi" | |
cam_file = f"{video_name}.json" | |
dio = DepthIO() | |
imgs = [] | |
depths = [] | |
cams = [] | |
print("seq len:", len(frames)) | |
for i, frame in tqdm(enumerate(frames)): | |
name = f"{video_dir}/{i}.jpg" | |
d_name = f"{video_dir}/{i}.tiff" | |
c_name = f"{video_dir}/{i}.json" | |
img = np.array(Image.open(name)) | |
depth = np.array(Image.open(d_name)) | |
cam = load_json(c_name) | |
depth, focal = zbuffer_to_depth(depth, cam['fov']) | |
depth = dio.to8UC3(depth) | |
imgs.append(img) | |
depths.append(depth) | |
cam['focal'] = focal | |
cams.append(cam) | |
# if i > 30: | |
# break | |
vio = VideoIO() | |
vio.dump(rgb_video, imgs) | |
vio.dump(depth_video, depths, lossless=True) | |
dump_json(cam_file, cams) | |
folder = "E:/depth_video/" | |
output = "E:/depth_video_resave/" | |
v_uts.mkdir_if_need(output) | |
folder_names = v_uts.list_all_folders(folder) | |
for folder_name in tqdm(folder_names[1:]): | |
folder_name = folder_name.replace('\\', '/') | |
vid_name = folder_name.split('/')[-2] | |
print(folder_name, vid_name) | |
output_video = f"{output}/{vid_name}" | |
frame_to_video(folder_name, video_name=output_video) | |
# break | |
def save_xlsx(filename, dicts, sheets=None): | |
with pd.ExcelWriter(filename, mode='w') as writer: | |
if sheets is None: | |
df1 = pd.DataFrame(dicts) | |
df1.to_excel(writer, index=False) | |
return | |
for sheet in sheets: | |
df1 = pd.DataFrame(dicts[sheet]) | |
df1.to_excel(writer, sheet_name=sheet, index=False) | |
def load_xlsx(filename, sheets=None): | |
assert os.path.exists(filename) , f"File not found: {filename}" | |
if sheets is None: | |
df = pd.read_excel(filename) | |
dict = {} | |
for column in df.columns: | |
dict[column] = df[column].tolist() | |
else: | |
dict = {} | |
for sheet in sheets: | |
df = pd.read_excel(filename, sheet_name=sheet) | |
cur_dict = {} | |
for column in df.columns: | |
cur_dict[column] = df[column].tolist() | |
print(cur_dict.keys()) | |
dict[sheet] = cur_dict | |
print(dict.keys()) | |
return dict | |
def get_sheet_list(dict, sheets=None, key="url"): | |
images_list = [dict[key]] if sheets is None else [dict[sheet_name][key] for sheet_name in sheets] | |
images_full = [] | |
for images, sheet in zip(images_list, sheets): | |
print(f"{sheet}: {len(images)}") | |
images_full = images_full + images | |
return images_full | |
def test_load_save_obj(): | |
image_name = "000000243355_zebra" | |
obj = f"./unit_test/{image_name}.obj" | |
obj = load_obj(obj) | |
export_obj(f"./unit_test/{image_name}_resave", obj) | |
if __name__ == '__main__': | |
# test = [(1,2), (3,4)] | |
# dump_pkl('test.pkl', test) | |
# print(load_pkl('test.pkl')) | |
# xyz = np.random.rand(1000, 3) | |
# write_pointcloud("test.ply", xyz) | |
# xyz = np.random.rand(1000, 3) | |
# write_pointcloud("test.ply", xyz) | |
# pass | |
# test_depth_8uc3_encode() | |
# test_zbuffer_to_depth() | |
# fuse_frames_of_depth_video() | |
test_load_save_obj() | |