annotation / io_utils.py
MudeHui's picture
Add application file
1fb65ae
import os
import sys
import re
import numpy as np
import cv2
import json
import yaml
import vis_utils as v_uts
import struct
from cv_base import (
Faces, Aux, Obj, DEFAULT_MATERIAL
)
hasTorch = True
try:
import torch
except:
hasTorch = False
import functools
import pandas as pd
from tqdm import tqdm
from PIL import Image
try:
from plyfile import PlyData
except:
"no ply"
import pdb
b=pdb.set_trace
def default(x, val):
return val if x is None else x
class IOShop:
def __init__(self, name, **kwargs):
ioFuncs = {'depth': DepthIO,
'image': ImageIO,
'flow': FlowIO,
'segment': SegmentIO,
'prob': ProbIO,
'video': VideoIO}
self.io = ioFuncs[name](**kwargs)
def load(self, file_name, **kwargs):
return self.io.load(file_name, **kwargs)
def dump(self, file_name, file, **kwargs):
self.io.dump(file_name, file, **kwargs)
class BaseIO:
def __init__(self, appex='jpg'):
self.type = 'image'
self.appex = appex
def load(self, file_name):
file_name = '%s.%s' % (file_name, self.appex)
image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED)
assert not (image is None), '%s not exists' % file_name
return image
def dump(self, file_name, file):
v_uts.mkdir_if_need(os.path.dirname(file_name))
file_name = '%s.%s' % (file_name, self.appex)
cv2.imwrite(file_name, file)
class ImageIO(BaseIO):
def __init__(self, appex='jpg'):
super(ImageIO, self).__init__(appex=appex)
self.type = 'image'
def load(self, file_name):
if file_name.endswith('heic') or file_name.endswith('HEIC'):
byte = read2byte(file_name)
image = decodeImage(byte)
else:
image = super(ImageIO, self).load(file_name)
return image
@staticmethod
def imwrite(file_name, data, order='rgb'):
cv2.imwrite(file_name, data[:, :, ::-1])
class SegmentIO(BaseIO):
def __init__(self):
super(SegmentIO, self).__init__(appex='png')
self.type = 'segment'
class ProbIO(BaseIO):
def __init__(self):
super(ProbIO, self).__init__()
self.type = 'prob'
self.max_class = 4
def load(self, file_name, channels=None):
image = cv2.imread(file_name, cv2.IMREAD_UNCHANGED)
channels = default(channels, self.max_class)
output = np.zeros(image.shape[:2])
# for i in range(channels):
def dump(self, file_name, file):
"""
height, width, channel
"""
output = np.zeros((height, width), dtype=np.uint16)
h, w, c = file.shape
for i in range(c):
output = output + np.uint16(file[:, :, i] * 255) + i * 256
cv2.imwrite(file_name, output.astype('uint16'))
class MeshIO(BaseIO):
def __init__(self):
super().__init__(appex='obj')
self.type = 'mesh'
def dump_obj(self, filename, obj):
export_obj(filename, obj)
def load_obj(self, filename):
return load_obj(filename)
def normalize_normal(mat):
mat = (mat / 255.0 * 2.0 - 1.0).astype('float32')
l1 = np.linalg.norm(mat, axis=2)
for j in range(3):
mat[:,:,j] /= (l1 + 1e-9)
return mat
class NormalIO(BaseIO):
def __init__(self, xyz='rgb'):
"""
rgb: means the normal saved in the order of x: r ...
"""
self._xyz = xyz
def read(self, filename):
normal = cv2.imread(filename, cv2.IMREAD_UNCHANGED)
if self._xyz == 'rgb':
normal = normal[:, :, ::-1]
normal = normalize_normal(normal)
return normal
class DepthIO(BaseIO):
def __init__(self, bit=8):
super(DepthIO, self).__init__(appex='pfm')
assert bit in [8, 16]
scale = {8: 1, 16: 2}
self.bits = scale[bit]
self.dump_vis = True
def load(self, path):
"""Read pfm file.
Args:
path (str): path to file
Returns:
tuple: (data, scale)
"""
path = '%s.%s' % (path, self.appex)
with open(path, "rb") as file:
color = None
width = None
height = None
scale = None
endian = None
header = file.readline().rstrip()
if header.decode("ascii") == "PF":
color = True
elif header.decode("ascii") == "Pf":
color = False
else:
raise Exception("Not a PFM file: " + path)
dim_match = re.match(r"^(\d+)\s(\d+)\s$", file.readline().decode("ascii"))
if dim_match:
width, height = list(map(int, dim_match.groups()))
else:
raise Exception("Malformed PFM header.")
scale = float(file.readline().decode("ascii").rstrip())
if scale < 0:
# little-endian
endian = "<"
scale = -scale
else:
# big-endian
endian = ">"
data = np.fromfile(file, endian + "f")
shape = (height, width, 3) if color else (height, width)
data = np.reshape(data, shape)
data = np.flipud(data)
return data, scale
def dump(self, path, image, scale=1):
"""Write pfm file.
Args:
path (str): pathto file
image (array): data
scale (int, optional): Scale. Defaults to 1.
"""
v_uts.mkdir_if_need(os.path.dirname(path))
path = path + '.pfm'
with open(path, "wb") as file:
color = None
if image.dtype.name != "float32":
raise Exception("Image dtype must be float32.")
image = np.flipud(image)
if len(image.shape) == 3 and image.shape[2] == 3: # color image
color = True
elif (
len(image.shape) == 2 or len(image.shape) == 3 and image.shape[2] == 1
): # greyscale
color = False
else:
raise Exception("Image must have H x W x 3, H x W x 1 or H x W dimensions.")
file.write("PF\n" if color else "Pf\n".encode())
file.write("%d %d\n".encode() % (image.shape[1], image.shape[0]))
endian = image.dtype.byteorder
if endian == "<" or endian == "=" and sys.byteorder == "little":
scale = -scale
file.write("%f\n".encode() % scale)
image.tofile(file)
if self.dump_vis:
self.dump_visualize(path[:-4], image, self.bits)
@staticmethod
def to8UC3(depth, scale=1000):
"""
Convert depth image to 8UC3 format.
"""
h, w = depth.shape
max_depth = (256.0 ** 3 - 1) / scale
# Clip depth values exceeding the maximum depth
depth = np.clip(depth, 0, max_depth)
# Scale the depth values
value = depth * scale
# Split the depth values into three channels
ch = np.zeros((h, w, 3), dtype=np.uint8)
ch[:, :, 0] = np.uint8(value / (256 ** 2))
ch[:, :, 1] = np.uint8((value % (256 ** 2)) / 256)
ch[:, :, 2] = np.uint8(value % 256)
return ch
@staticmethod
def read8UC3(depth, scale=1000):
"""
Convert 8UC3 image to scaled depth representation.
"""
if isinstance(depth, str):
depth = cv2.imread(depth, cv2.IMREAD_UNCHANGED)
# Merge the three channels into a single depth value
depth_uint16 = depth[:, :, 0] * (256 ** 2) + \
depth[:, :, 1] * 256 + depth[:, :, 2]
# Convert depth to the scaled representation
depth = depth_uint16.astype(np.float32) / scale
return depth
@staticmethod
def dump_visualize(path, depth, bits=1):
depth_min = depth.min()
depth_max = depth.max()
max_val = (2**(8*bits))-1
if depth_max - depth_min > np.finfo("float").eps:
out = max_val * (depth - depth_min) / (depth_max - depth_min)
else:
out = 0
if bits == 1:
cv2.imwrite(path + ".png", out.astype("uint8"))
elif bits == 2:
cv2.imwrite(path + ".png", out.astype("uint16"))
return
@staticmethod
def load_png(path):
depth = cv2.imread(path, cv2.IMREAD_UNCHANGED)
return depth
@staticmethod
def dump_png(path, depth, bits=2, max_depth=20.0):
assert (path.endswith(".png"))
max_val = (2**(8*bits))-1
depth = depth / max_depth * max_val
cv2.imwrite(path, depth.astype("uint16"))
@staticmethod
def read_depth(filename, scale=6000, sz=None, is_disparity=False):
if not hasTorch:
return None
depth = cv2.imread(filename, cv2.IMREAD_UNCHANGED)
depth = np.float32(depth) / scale
if sz:
h, w = sz
depth = cv2.resize(depth, (w, h),
interpolation=cv2.INTER_NEAREST)
depth = torch.from_numpy(depth)
if is_disparity: # convert to depth
depth = 1.0 / torch.clamp(depth, min=1e-10)
return depth
def write_depth(path, depth, grayscale, bits=1):
"""Write depth map to png file.
Args:
path (str): filepath without extension
depth (array): depth
grayscale (bool): use a grayscale colormap?
"""
if not grayscale:
bits = 1
if not np.isfinite(depth).all():
depth=np.nan_to_num(depth, nan=0.0, posinf=0.0, neginf=0.0)
print("WARNING: Non-finite depth values present")
depth_min = depth.min()
depth_max = depth.max()
max_val = (2**(8*bits))-1
if depth_max - depth_min > np.finfo("float").eps:
out = max_val * (depth - depth_min) / (depth_max - depth_min)
else:
out = np.zeros(depth.shape, dtype=depth.dtype)
if not grayscale:
out = cv2.applyColorMap(np.uint8(out), cv2.COLORMAP_INFERNO)
if bits == 1:
cv2.imwrite(path + ".png", out.astype("uint8"))
elif bits == 2:
cv2.imwrite(path + ".png", out.astype("uint16"))
return
class NormalIO(BaseIO):
def __init__(self):
super(NormalIO, self).__init__(appex='npy')
self.dump_vis = False
@staticmethod
def read_normal(filename, sz=None, to_torch=False):
if not hasTorch:
return None
if not os.path.exists(filename):
h, w = sz
return torch.ones((h, w, 3)) * 0.3
image = cv2.imread(filename)[:, :, ::-1]
image = np.float32(image)
image = (image / 127.5 - 1)
if sz:
h, w = sz
image = cv2.resize(image, (w, h),
interpolation=cv2.INTER_NEAREST)
return torch.from_numpy(image)
def to8UC3(self, normal):
return np.uint8((normal + 1) * 127.5)
class FlowIO(BaseIO):
def __init__(self):
super(FlowIO, self).__init__(appex='npy')
self.dump_vis = False
def normalize(self, flow, shape=None):
if shape is None:
shape = flow.shape[:2]
flow[:, :, 0] /= shape[1]
flow[:, :, 1] /= shape[0]
return flow
def denormalize(self, flow, shape=None):
if shape is None:
shape = flow.shape[:2]
flow[:, :, 0] *= shape[1]
flow[:, :, 1] *= shape[0]
return flow
def visualization(self, flow):
pass
def load(self, path, shape=None):
path = path + '.npy'
flow = np.load(path)
flow = self.denormalize(flow, shape)
assert flow is not None
return flow
def dump(self, path, flow):
v_uts.mkdir_if_need(os.path.dirname(path))
path = path + '.npy'
flow = self.normalize(flow)
np.save(path, flow)
if self.dump_vis:
self.dump_visualize(path[:-4], flow)
def dump_visualize(self, path, flow):
_, flow_c = v_uts.flow2color(flow)
cv2.imwrite(path + '.png', flow_c)
class VideoIO(BaseIO):
def __init__(self, longside_len=None):
super(VideoIO, self).__init__()
self.longside_len = longside_len
def get_fps(self, path):
vidcap = cv2.VideoCapture(path)
return vidcap.get(cv2.CAP_PROP_FPS)
def load_first_frame(self, path):
import skvideo.io as vio
video = vio.vreader(path)
frame = next(video)
if self.longside_len is not None:
frame = v_uts.resize2maxsize(frame, self.longside_len)
return frame
def load(self, path, sample_rate=1, max_len=1e10,
load_to_dir=False,
dir_name=None,
pre_len=5,
save_transform=None):
import skvideo.io as vio
def default_transform(x):
if x.ndim == 2:
return x
if x.ndim == 3 and x.shape[2] == 3:
return x[:, :, ::-1]
return x
frames = []
reader = vio.vreader(path)
if load_to_dir:
v_uts.mkdir(dir_name)
if save_transform is None:
save_transform = lambda x : x
for count, frame in enumerate(reader):
if count == max_len:
break
if count % sample_rate == 0:
if self.longside_len is not None:
frame = v_uts.resize2maxsize(
frame, self.longside_len)
if load_to_dir:
img_file = f"{dir_name}/{count:05}.png"
frame = save_transform(frame)
cv2.imwrite(img_file, frame)
else:
frames.append(frame)
if not load_to_dir:
return frames
def load_till_end(self, path, sample_rate=1):
import skvideo.io as vio
frames = []
reader = vio.vreader(path)
count = 0
while True:
try:
frame = next(reader)
except:
break
if count % sample_rate == 0:
if self.longside_len is not None:
frame = v_uts.resize2maxsize(
frame, self.longside_len)
frames.append(frame)
count += 1
return frames
def load_w_cv(self, path, out_dir, sample_rate = 1, ext="jpg"):
v_uts.video_to_frame(path,
out_dir,
max_len=self.longside_len,
sample_rate=sample_rate,
ext=ext)
def dump_to_images(self, frames, image_path):
v_uts.mkdir_if_need(image_path)
for count, frame in tqdm(enumerate(frames)):
image_file = '%s/%04d.jpg' % (image_path, count)
cv2.imwrite(image_file, frame[:, :, ::-1])
def dump(self, path, frames, fps=30, lossless=False):
from moviepy.editor import ImageSequenceClip, VideoFileClip
if isinstance(frames[0], str):
frame_np = []
for frame in tqdm(frames):
cur_frame = cv2.imread(frame, cv2.IMREAD_UNCHANGED)[:, :, ::-1]
frame_np.append(cur_frame)
frames = frame_np
clip = ImageSequenceClip(frames, fps)
if lossless:
assert path.endswith('avi')
clip.write_videofile(path, codec='png')
else:
clip.write_videofile(path, fps=fps)
def dump_skv(self, path, frames, fps=30):
if frames[0].ndim == 2:
frames = [cv2.cvtColor(frame,cv2.COLOR_GRAY2RGB) for frame in frames]
else:
frames = [frame[:, :, ::-1] for frame in frames]
v_uts.frame_to_video_simple(frames, fps, video_name=path)
# import skvideo.io as vio
# fps = str(int(fps))
# vid_out = vio.FFmpegWriter(path,
# inputdict={'-r': fps},
# outputdict={
# '-vcodec': 'libx264',
# '-pix_fmt': 'yuv420p',
# '-r': fps,
# },
# verbosity=1)
# for idx, frame in enumerate(frames):
# vid_out.writeFrame(frame)
# vid_out.close()
def resave_video(self, video_file, start, end,
outvideo_file):
"""
:param start: sec start
:param end: sec end
:return:
"""
fps = self.get_fps(video_file)
frames = self.load(video_file)
start_frame = int(start * fps)
end_frame = int(end * fps)
frames = frames[start_frame:end_frame]
self.dump_skv(outvideo_file, frames, fps)
def frame2video(self, folder, output, ext=".jpg"):
image_files = v_uts.list_all_files(folder, exts=[ext])
frames = []
for name in tqdm(image_files):
frames.append(cv2.imread(name)[:, :, ::-1])
self.dump(output, frames)
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super(NpEncoder, self).default(obj)
def read2byte(filename):
with open(filename, 'rb') as f:
file_data = f.read()
return file_data
def decodeImage(bytesIo):
import whatimage
import pyheif
from PIL import Image
fmt = whatimage.identify_image(bytesIo)
if fmt in ['heic', 'avif']:
i = pyheif.read_heif(bytesIo)
# Convert to other file format like jpeg
pi = Image.frombytes(
mode=i.mode, size=i.size, data=i.data)
image = np.asarray(pi)
image = image[:, :, ::-1] # to BGR
return image
else:
return None
def image2Normal(imagePath):
from skimage import io
normal = io.imread(imagePath)
normal = ((np.float32(normal) / 255.0) * 2 - 1.0 )
return normal
def normal2Image(normal):
nm_pred_val = (normal + 1.) / 2.
nm_pred_val = np.uint8(nm_pred_val*255.)
return nm_pred_val
def dump_normal(filename, normal):
normal = normal2Image(normal)
cv2.imwrite(filename + '.png', array)
def dump_prob2image(filename, array):
"""
dump probility map to image when
array: [x, height, width] (x = 1, 3, 4)
"""
class_num = array.shape[0]
# assert class_num <= 4
if class_num >= 4 :
print('warning: only save the first 3 channels')
array = array[:3, :, :]
if class_num == 2:
raise ValueError('not implement')
array = np.transpose(np.uint8(array * 255), (1, 2, 0))
if filename.endswith('.png'):
cv2.imwrite(filename, array)
return
cv2.imwrite(filename + '.png', array)
assert os.path.exists(filename)
def load_image2prob(filename):
if not filename.endswith('.png'):
filename = filename + '.png'
array = cv2.imread(filename, cv2.IMREAD_UNCHANGED)
array = np.transpose(array, (2, 0, 1)) / 255
return array
def shape_match(images):
assert len(images) > 1
shape = images[0].shape[:2]
for image in images[1:]:
cur_shape = image.shape[:2]
if np.sum(np.abs(np.array(shape) - \
np.array(cur_shape))):
return False
return True
def append_apex(filename, appex):
filename = filename.split('.')
prefix = '.'.join(filename[:-1])
filetype = filename[-1]
return '%s_%s.%s' % (prefix, appex, filetype)
def load_json(json_file):
with open(json_file) as f:
res = json.load(f)
return res
def dump_numpy(filename, x: np.ndarray):
np.savetxt(filename, x, delimiter=' ', fmt='%1.6f')
def dump_json(filename, odgt, w_np=False):
with open(filename, 'w') as f:
if not w_np:
json.dump(odgt, f, indent=4)
else:
json.dump(odgt, f, indent=4, cls=NpEncoder)
def dump_jsonl(filename, odgt):
with open(filename, 'w') as file:
for entry in odgt:
json.dump(entry, file)
file.write('\n')
def dump_pair_data(image_list,
label_list,
outfile,
root='',
data_type='txt',
fields=None):
if fields is None:
fields = ["image", "segment"]
if data_type == 'txt':
fp = open(outfile, 'w')
for imagefile, labelfile in zip(image_list, label_list):
imagefile = imagefile.replace(root, '.')
labelfile = labelfile.replace(root, '.')
fp.write('%s %s\n' % (imagefile, labelfile))
fp.close()
elif data_type == "odgt":
odgt = []
for imagefile, labelfile in zip(image_list, label_list):
imagefile = imagefile.replace(root, '.')
labelfile = labelfile.replace(root, '.')
item = {fields[0]: imagefile,
fields[1]: labelfile}
odgt.append(item)
dump_json(outfile, odgt)
def save_xlsx(filename, dicts, sheets=None):
"""
Save a list of dicts to an xlsx file.
"""
with pd.ExcelWriter(filename, mode='w') as writer:
if sheets is None:
df1 = pd.DataFrame(dicts)
df1.to_excel(writer, index=False)
return
for sheet in sheets:
df1 = pd.DataFrame(dicts[sheet])
df1.to_excel(writer, sheet_name=sheet, index=False)
def load_xlsx(filename, sheets=None):
assert os.path.exists(filename) , f"File not found: {filename}"
if sheets is None:
df = pd.read_excel(filename)
dict = {}
for column in df.columns:
dict[column] = df[column].tolist()
else:
dict = {}
for sheet in sheets:
df = pd.read_excel(filename, sheet_name=sheet)
cur_dict = {}
for column in df.columns:
cur_dict[column] = df[column].tolist()
print(cur_dict.keys())
dict[sheet] = cur_dict
print(dict.keys())
return dict
def dump_lines(filename, file_list):
f = open(filename, 'w')
tbar = tqdm(file_list)
for i, elements in enumerate(tbar):
if isinstance(elements, (tuple, list)):
line = ' '.join(elements)
elif isinstance(elements, str):
line = elements
appex = '' if i == len(file_list) - 1 else '\n'
f.write('%s%s' % (line, appex))
f.close()
def load_lines(txt_file):
lines = [line.strip() for line in open(txt_file, 'r')]
return lines
def load_jsonl(jsonl_file):
# List to hold all JSON objects
data = []
# Open the file and read line by line
with open(jsonl_file, 'r') as file:
for line in file:
# Each line is a JSON object, parse it and append to the list
json_object = json.loads(line)
data.append(json_object)
return data
def load_yaml(yaml_file):
with open(yaml_file, "r") as f:
yaml_dict = yaml.safe_load(f)
return yaml_dict
def load_odgt(odgt):
try:
samples = [json.loads(x.rstrip()) \
for x in open(odgt, 'r')][0]
except:
samples = load_json(odgt)
print(samples[0].keys())
return samples
def fuse_odgt(odgt_files):
"""
odgt_files:
"""
odgt_full = []
for odgt_file in odgt_files:
odgt = load_odgt(odgt_file)
odgt_full = odgt_full + odgt
return odgt_full
def load_video_first_frame(video_name):
cap = cv2.VideoCapture(video_name)
if(cap.isOpened()):
ret, frame = cap.read()
else:
raise ValueError("can not read %s" % video_name)
return frame
def load_lines(txt_file):
lines = [line.strip() for line in open(txt_file, 'r')]
return lines
def load_csv(csv_file):
import csv
lines = []
with open(csv_file) as f:
reader = csv.reader(f, delimiter=',')
for row in reader:
lines.append(row)
return lines[1:]
# cat multi files in to a single file
def cat_files(files, output):
all_lines = []
for filename in files:
lines = load_lines(filename)
all_lines = all_lines + lines
dump_lines(output, all_lines)
class SkipExist:
def __init__(self,
processor,
ioType='image',
need_res=False,
rerun=False):
self.ioType = ioType
self.io = IOShop(self.ioType).io
self.processor = processor
self.rerun = rerun
self.need_res = need_res
def __call__(self, *args, **kwargs):
assert 'filename' in kwargs
true_file = '%s.%s' % (kwargs['filename'], self.io.appex)
if os.path.exists(true_file):
if self.need_res:
res = self.io.load(kwargs['filename'])
return res
else:
filename = kwargs['filename']
del kwargs['filename']
res = self.processor(*args, **kwargs)
self.io.dump(filename, res)
def dump_pkl(filename, data):
import pickle as pkl
with open(filename, "wb") as fl:
pkl.dump(data, fl)
def load_pkl(filename):
import pickle as pkl
with open(filename, 'rb') as fl:
res = pkl.load(fl)
return res
def write_pointcloud(filename, xyz_points, faces=None, rgb_points=None):
"""
creates a .pkl file of the point clouds generated
"""
assert xyz_points.shape[1] == 3,'Input XYZ points should be Nx3 float array'
if rgb_points is None:
rgb_points = np.ones(xyz_points.shape).astype(np.uint8) * 255
else:
rgb_points = rgb_points.astype(np.uint8)
assert xyz_points.shape == rgb_points.shape,\
f'Input RGB colors should be Nx3 {rgb_points.shape} float array \
and have same size as input XYZ points {xyz_points.shape}'
# Write header of .ply file
fid = open(filename,'wb')
fid.write(bytes('ply\n', 'utf-8'))
fid.write(bytes('format binary_little_endian 1.0\n', 'utf-8'))
fid.write(bytes('element vertex %d\n'%xyz_points.shape[0], 'utf-8'))
fid.write(bytes('property float x\n', 'utf-8'))
fid.write(bytes('property float y\n', 'utf-8'))
fid.write(bytes('property float z\n', 'utf-8'))
fid.write(bytes('property uchar red\n', 'utf-8'))
fid.write(bytes('property uchar green\n', 'utf-8'))
fid.write(bytes('property uchar blue\n', 'utf-8'))
fid.write(bytes('end_header\n', 'utf-8'))
# Write 3D points to .ply file
for i in range(xyz_points.shape[0]):
fid.write(bytearray(struct.pack("fffccc",xyz_points[i,0],xyz_points[i,1],xyz_points[i,2],
rgb_points[i,0].tostring(),rgb_points[i,1].tostring(),
rgb_points[i,2].tostring())))
if faces is not None:
for face in faces:
fid.write(struct.pack("<B", face[0]))
fid.write(struct.pack("<{}i".format(face[0]), *face[1]))
fid.close()
def read_ply(filename):
# Load the PLY file
ply_data = PlyData.read(filename)
# Access the vertex data
vertex_data = ply_data['vertex']
# Extract x, y, z coordinates as a numpy array
points = np.vstack((vertex_data['x'], vertex_data['y'], vertex_data['z'])).T
return points
def load_obj(file_path):
verts = []
normals = []
uvs = []
material_colors = []
texture_images = []
texture_atlas = []
faces_verts = []
faces_normals = []
faces_textures = []
faces_materials = []
with open(file_path, 'r') as file:
for line in file:
if line.startswith('v '):
vertex = [float(v) for v in line.split()[1:]]
verts.append(vertex)
elif line.startswith('vn '):
normal = [float(n) for n in line.split()[1:]]
normals.append(normal)
elif line.startswith('vt '):
uv = [float(u) for u in line.split()[1:]]
uvs.append(uv)
elif line.startswith("mtllib "):
mtl_name = line.split()[1]
elif line.startswith('vc '):
color = [float(c) for c in line.split()[1:]]
material_colors.append(color)
elif line.startswith('usemtl '):
material = line.split()[1]
texture_images.append(material)
elif line.startswith('f '):
face_data = line.split()[1:]
face_verts = []
face_normals = []
face_textures = []
for face in face_data:
res = face.split('/')
vert = res[0]
face_verts.append(int(vert))
if len(res) == 2:
texture = res[1]
face_textures.append(int(texture))
if len(res) == 3:
normal = res[2]
face_normals.append(int(normal))
faces_verts.append(face_verts)
faces_normals.append(face_normals)
faces_textures.append(face_textures)
faces_materials.append(len(texture_images) - 1)
mtl_file = f"{os.path.dirname(file_path)}/{mtl_name}"
with open(mtl_file, 'r') as file:
for line in file:
if line.startswith("map_Kd"):
image_name = line.split()[1]
break
assert len(texture_images) == 1
texture_name = texture_images[0]
image = cv2.imread(f"{os.path.dirname(file_path)}/{image_name}")
properties = Aux(
normals=np.array(normals),
verts_uvs=np.array(uvs),
material_colors=DEFAULT_MATERIAL,
texture_images={texture_name: np.float32(image)/ 255.0},
texture_atlas=None)
faces_verts=np.array(faces_verts)
num_faces = faces_verts.shape[0]
faces = Faces(
verts_idx=faces_verts,
normals_idx=np.ones(faces_verts.shape) * -1,
textures_idx=np.array(faces_textures),
materials_idx=np.zeros(num_faces))
obj = Obj(np.array(verts), faces, properties)
return obj
def export_obj(filename, obj,
include_normals=False,
include_textures=True):
"""
Export the given object to an .obj file with optional normals and textures.
Args:
filename (str): Path to the output .obj file (without the extension).
obj (namedtuple): Object containing vertices, faces, and properties.
include_normals (bool): Flag to include normals in the .obj file.
include_textures (bool): Flag to include textures in the .obj file.
"""
material_name = list(obj.properties.texture_images.keys())[0]
# Write obj file
name = os.path.basename(filename)
with open(filename + ".obj", "w") as f:
f.write("\n")
if include_textures:
f.write(f"mtllib {name}.mtl\n")
f.write("\n")
for vert in obj.verts:
x, y, z = vert
f.write(f"v {x} {y} {z}\n")
if include_textures:
for uv in obj.properties.verts_uvs:
x, y = uv
f.write(f"vt {x} {y}\n")
f.write(f"usemtl {material_name}\n")
num_faces = obj.faces.verts_idx.shape[0]
for i in range(num_faces):
f0, f1, f2 = obj.faces.verts_idx[i]
if include_textures:
t0, t1, t2 = obj.faces.textures_idx[i]
if t0 == -1:
f.write(f"f {f0} {f1} {f2}\n")
continue
f.write(f"f {f0}/{t0} {f1}/{t1} {f2}/{t2}\n")
else:
f.write(f"f {f0} {f1} {f2}\n")
# Write mtl file
if include_textures:
output_dir = os.path.dirname(filename)
with open(f"{output_dir}/{name}.mtl", "w") as f:
f.write(f"newmtl {material_name}\n")
f.write(f"map_Kd {name}.png\n")
material_colors = obj.properties.material_colors[material_name]
r, g, b = material_colors["ambient_color"]
f.write(f"Ka {r} {g} {b}\n")
r, g, b = material_colors["diffuse_color"]
f.write(f"Kd {r} {g} {b}\n")
r, g, b = material_colors["specular_color"]
f.write(f"Ks {r} {g} {b}\n")
s = material_colors["shininess"]
f.write(f"Ns {s}\n")
# Save texture image
image = obj.properties.texture_images[material_name] * 255
texture_img = f"{output_dir}/{name}.png"
cv2.imwrite(texture_img, image)
return
def resave_to_video():
folder = "/Users/peng/Downloads/DenseAR/Mesh/"
vname = "0037438511"
image_num = 125
frames = []
d_frames = []
crop = [0, 650, 1080, 1270]
for i in tqdm(range(image_num)):
name = f"{folder}/{vname}/{i}.jpg"
d_name = f"{folder}/{vname}/{i}.tiff"
img = np.array(Image.open(name))
depth = np.array(Image.open(d_name))
if img is None:
continue
img = img[crop[0]:crop[2], crop[1]:crop[3]]
depth = depth[crop[0]:crop[2], crop[1]:crop[3]]
depth = 1.0 / np.maximum(depth, 1e-10)
depth = p_uts.depth2color(depth, max_d=50)
frames.append(img)
d_frames.append(depth)
vio = io_uts.VideoIO()
video_file = f"{folder}/{vname}.mp4"
d_video_file = f"{folder}/{vname}_d.mp4"
vio.dump_skv(video_file, frames, fps=24)
vio.dump_skv(d_video_file, d_frames, fps=24)
def test_depth_8uc3_encode():
depth = np.random.rand(480, 640) * 200
dio = DepthIO()
depth_encode = dio.to8UC3(depth)
depth_decode = dio.read8UC3(depth_encode)
print(depth, depth_decode)
assert np.sum(np.abs(depth - depth_decode)) / (480 * 640) < 1e-3
########### copy from gta code ################
@functools.lru_cache()
def build_mesh(w, h):
w = np.linspace(-1.0, 1.0, num=w, dtype=np.float32)
h = np.linspace(1.0, -1.0, num=h, dtype=np.float32)
return np.stack(np.meshgrid(w, h), axis=0)
def build_proj_matrix(fov, aspect):
proj = np.zeros((4, 4))
proj[0, 0] = 1.0 / np.tan(np.radians(fov / 2)) / aspect
proj[1, 1] = 1.0 / np.tan(np.radians(fov / 2))
proj[2, 2] = 0.00001502 # reverse-engineered get from shader
proj[2, 3] = 0.15000225 # reverse-engineered get from shader
proj[3, 2] = -1.0
return proj
def zbuffer_to_depth(zbuffer, fov):
height, width = zbuffer.shape[:2]
aspect = width / height
mesh = build_mesh(width, height)
if len(zbuffer.shape) != 3:
zbuffer = np.expand_dims(zbuffer, 0)
pcloud = np.concatenate((mesh, zbuffer, np.ones_like(zbuffer)), 0)
pcloud = pcloud.reshape(4, height * width)
proj_matrix = build_proj_matrix(fov, aspect)
pcloud = np.linalg.inv(proj_matrix) @ pcloud
depth = -pcloud[2] / pcloud[3]
focal_cv = proj_matrix[0, 0] * width / 2.0
return depth.reshape(height, width), focal_cv
def test_zbuffer_to_depth():
# root = "E:/Dataset/GTA/Stereo_0/"
# name = root + "1-130423915874"
name = "E:/depth_video/0036696165/1"
config = load_json(name + ".json")
fov = config["fov"]
zbuffer = cv2.imread(name + ".tiff", cv2.IMREAD_UNCHANGED)
depth, focal = zbuffer_to_depth(zbuffer, fov)
print(depth)
def fuse_frames_of_depth_video():
"""
frames: list of images or video
"""
def frame_to_video(video_dir, video_name):
frames = v_uts.list_all_files(video_dir, exts=['jpg'])
rgb_video = f"{video_name}.mp4"
depth_video = f"{video_name}_d.avi"
cam_file = f"{video_name}.json"
dio = DepthIO()
imgs = []
depths = []
cams = []
print("seq len:", len(frames))
for i, frame in tqdm(enumerate(frames)):
name = f"{video_dir}/{i}.jpg"
d_name = f"{video_dir}/{i}.tiff"
c_name = f"{video_dir}/{i}.json"
img = np.array(Image.open(name))
depth = np.array(Image.open(d_name))
cam = load_json(c_name)
depth, focal = zbuffer_to_depth(depth, cam['fov'])
depth = dio.to8UC3(depth)
imgs.append(img)
depths.append(depth)
cam['focal'] = focal
cams.append(cam)
# if i > 30:
# break
vio = VideoIO()
vio.dump(rgb_video, imgs)
vio.dump(depth_video, depths, lossless=True)
dump_json(cam_file, cams)
folder = "E:/depth_video/"
output = "E:/depth_video_resave/"
v_uts.mkdir_if_need(output)
folder_names = v_uts.list_all_folders(folder)
for folder_name in tqdm(folder_names[1:]):
folder_name = folder_name.replace('\\', '/')
vid_name = folder_name.split('/')[-2]
print(folder_name, vid_name)
output_video = f"{output}/{vid_name}"
frame_to_video(folder_name, video_name=output_video)
# break
def save_xlsx(filename, dicts, sheets=None):
with pd.ExcelWriter(filename, mode='w') as writer:
if sheets is None:
df1 = pd.DataFrame(dicts)
df1.to_excel(writer, index=False)
return
for sheet in sheets:
df1 = pd.DataFrame(dicts[sheet])
df1.to_excel(writer, sheet_name=sheet, index=False)
def load_xlsx(filename, sheets=None):
assert os.path.exists(filename) , f"File not found: {filename}"
if sheets is None:
df = pd.read_excel(filename)
dict = {}
for column in df.columns:
dict[column] = df[column].tolist()
else:
dict = {}
for sheet in sheets:
df = pd.read_excel(filename, sheet_name=sheet)
cur_dict = {}
for column in df.columns:
cur_dict[column] = df[column].tolist()
print(cur_dict.keys())
dict[sheet] = cur_dict
print(dict.keys())
return dict
def get_sheet_list(dict, sheets=None, key="url"):
images_list = [dict[key]] if sheets is None else [dict[sheet_name][key] for sheet_name in sheets]
images_full = []
for images, sheet in zip(images_list, sheets):
print(f"{sheet}: {len(images)}")
images_full = images_full + images
return images_full
def test_load_save_obj():
image_name = "000000243355_zebra"
obj = f"./unit_test/{image_name}.obj"
obj = load_obj(obj)
export_obj(f"./unit_test/{image_name}_resave", obj)
if __name__ == '__main__':
# test = [(1,2), (3,4)]
# dump_pkl('test.pkl', test)
# print(load_pkl('test.pkl'))
# xyz = np.random.rand(1000, 3)
# write_pointcloud("test.ply", xyz)
# xyz = np.random.rand(1000, 3)
# write_pointcloud("test.ply", xyz)
# pass
# test_depth_8uc3_encode()
# test_zbuffer_to_depth()
# fuse_frames_of_depth_video()
test_load_save_obj()