File size: 6,996 Bytes
89cf463 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import gradio as gr
import cv2
import os
import numpy as np
import numexpr as ne
from concurrent.futures import ThreadPoolExecutor
from face_feature.hifi_image_api import HifiImage
from HifiFaceAPI_parallel_trt_roi_realtime_sr_api import HifiFaceRealTime
from face_lib.face_swap import HifiFace
from face_restore.gfpgan_onnx_api import GFPGAN
from face_restore.xseg_onnx_api import XSEG
from face_detect.face_align_68 import face_alignment_landmark
from face_detect.face_detect import FaceDetect
from options.hifi_test_options import HifiTestOptions
from color_transfer import color_transfer
opt = HifiTestOptions().parse()
processor = None
def initialize_processor():
global processor
if processor is None:
processor = FaceSwapProcessor(crop_size=opt.input_size)
class FaceSwapProcessor:
def __init__(self, crop_size=256):
self.hi = HifiImage(crop_size=crop_size)
self.xseg = XSEG(model_type='xseg_0611', provider='gpu')
self.hf = HifiFace(model_name='er8_bs1', provider='gpu')
self.scrfd_detector = FaceDetect(mode='scrfd_500m', tracking_thres=0.15)
self.face_alignment = face_alignment_landmark(lm_type=68)
self.gfp = GFPGAN(model_type='GFPGANv1.4', provider='gpu')
self.crop_size = crop_size
def reverse2wholeimage_hifi_trt_roi(self, swaped_img, mat_rev, img_mask, frame, roi_img, roi_box):
target_image = cv2.warpAffine(swaped_img, mat_rev, roi_img.shape[:2][::-1], borderMode=cv2.BORDER_REPLICATE)[
...,
::-1]
local_dict = {
'img_mask': img_mask,
'target_image': target_image,
'roi_img': roi_img,
}
img = ne.evaluate('img_mask * (target_image * 255)+(1 - img_mask) * roi_img', local_dict=local_dict,
global_dict=None)
img = img.astype(np.uint8)
frame[roi_box[1]:roi_box[3], roi_box[0]:roi_box[2]] = img
return frame
def process_frame(self, frame, image_latent, use_gfpgan, sr_weight, use_color_trans, color_trans_mode):
_, bboxes, kpss = self.scrfd_detector.get_bboxes(frame, max_num=0)
rois, faces, Ms, masks = self.face_alignment.forward(
frame, bboxes, kpss, limit=5, min_face_size=30,
crop_size=(self.crop_size, self.crop_size), apply_roi=True
)
if len(faces) == 0:
return frame
elif len(faces) == 1:
face = np.array(faces[0])
mat = Ms[0]
roi_box = rois[0]
else:
max_index = np.argmax([roi[2] * roi[3] for roi in rois]) # Get the largest face
face = np.array(faces[max_index])
mat = Ms[max_index]
roi_box = rois[max_index]
roi_img = frame[roi_box[1]:roi_box[3], roi_box[0]:roi_box[2]]
face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
mask_out, swap_face_out = self.hf.forward(face, image_latent[0].reshape(1, -1))
mask_out = self.xseg.forward(swap_face_out)[None, None]
mask = cv2.warpAffine(mask_out[0][0].astype(np.float32), mat, roi_img.shape[:2][::-1])
mask[mask > 0.2] = 1
mask = mask[:, :, np.newaxis].astype(np.uint8)
swap_face = swap_face_out[0].transpose((1, 2, 0)).astype(np.float32)
target_face = (face.copy() / 255).astype(np.float32)
if use_gfpgan:
sr_face = self.gfp.forward(swap_face)
if sr_weight != 1.0:
sr_face = cv2.addWeighted(sr_face, sr_weight, swap_face, 1.0 - sr_weight, 0)
if use_color_trans:
transed_face = color_transfer(color_trans_mode, (sr_face + 1) / 2, target_face)
swap_face = (transed_face * 2) - 1
else:
swap_face = sr_face
elif use_color_trans:
transed_face = color_transfer(color_trans_mode, (swap_face + 1) / 2, target_face)
swap_face = (transed_face * 2) - 1
swap_face = ((swap_face + 1) / 2)
frame_out = self.reverse2wholeimage_hifi_trt_roi(
swap_face, mat, mask,
frame, roi_img, roi_box
)
return frame_out
def process_image_video(image, video_path, use_gfpgan, sr_weight, use_color_trans, color_trans_mode):
global processor
initialize_processor()
src_latent, _ = processor.hi.get_face_feature(image)
image_latent = [src_latent]
video = cv2.VideoCapture(video_path)
video_fps = video.get(cv2.CAP_PROP_FPS)
video_size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
output_dir = 'data/output/'
if not os.path.exists(output_dir):
os.mkdir(output_dir)
swap_video_path = output_dir + 'temp.mp4'
videoWriter = cv2.VideoWriter(swap_video_path, cv2.VideoWriter_fourcc(*'mp4v'), video_fps, video_size)
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = []
while True:
ret, frame = video.read()
if not ret:
break
future = executor.submit(processor.process_frame, frame, image_latent, use_gfpgan, sr_weight,
use_color_trans, color_trans_mode)
futures.append(future)
for future in futures:
processed_frame = future.result()
if processed_frame is not None:
videoWriter.write(processed_frame)
video.release()
videoWriter.release()
add_audio_to_video(video_path, swap_video_path)
return swap_video_path
def add_audio_to_video(original_video_path, swapped_video_path):
audio_file_path = original_video_path.split('.')[0] + '.wav'
if not os.path.exists(audio_file_path):
os.system(f'ffmpeg -y -hide_banner -loglevel error -i "{original_video_path}" -f wav -vn "{audio_file_path}"')
temp_output_path = swapped_video_path.replace('.mp4', '_with_audio.mp4')
os.system(
f'ffmpeg -y -hide_banner -loglevel error -i "{swapped_video_path}" -i "{audio_file_path}" -c:v copy -c:a aac "{temp_output_path}"')
os.remove(swapped_video_path)
os.rename(temp_output_path, swapped_video_path)
# Gradio interface setup
iface = gr.Interface(
fn=process_image_video,
inputs=[
gr.Image(type="pil", label="Source Image"),
gr.Video(label="Input Video"),
gr.Checkbox(label="Use GFPGAN [Super-Resolution]"),
gr.Slider(minimum=0.1, maximum=1.0, step=0.1, label="SR Weight [only support GFPGAN enabled]", value=1.0),
gr.Checkbox(label="Use Color Transfer"),
gr.Dropdown(choices=["rct", "lct", "mkl", "idt", "sot"],
label="Color Transfer Mode [only support Color-Transfer enabled]", value="rct")
],
outputs=gr.Video(label="Output Video"),
title="Video Generation",
description="Upload an image and a video, and the system will generate a new video based on the input."
)
if __name__ == "__main__":
iface.launch() |