File size: 6,996 Bytes
89cf463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import gradio as gr
import cv2
import os
import numpy as np
import numexpr as ne
from concurrent.futures import ThreadPoolExecutor

from face_feature.hifi_image_api import HifiImage
from HifiFaceAPI_parallel_trt_roi_realtime_sr_api import HifiFaceRealTime
from face_lib.face_swap import HifiFace
from face_restore.gfpgan_onnx_api import GFPGAN
from face_restore.xseg_onnx_api import XSEG
from face_detect.face_align_68 import face_alignment_landmark
from face_detect.face_detect import FaceDetect
from options.hifi_test_options import HifiTestOptions
from color_transfer import color_transfer

opt = HifiTestOptions().parse()
processor = None

def initialize_processor():
    global processor
    if processor is None:
        processor = FaceSwapProcessor(crop_size=opt.input_size)

class FaceSwapProcessor:
    def __init__(self, crop_size=256):
        self.hi = HifiImage(crop_size=crop_size)
        self.xseg = XSEG(model_type='xseg_0611', provider='gpu')
        self.hf = HifiFace(model_name='er8_bs1', provider='gpu')
        self.scrfd_detector = FaceDetect(mode='scrfd_500m', tracking_thres=0.15)
        self.face_alignment = face_alignment_landmark(lm_type=68)
        self.gfp = GFPGAN(model_type='GFPGANv1.4', provider='gpu')
        self.crop_size = crop_size

    def reverse2wholeimage_hifi_trt_roi(self, swaped_img, mat_rev, img_mask, frame, roi_img, roi_box):
        target_image = cv2.warpAffine(swaped_img, mat_rev, roi_img.shape[:2][::-1], borderMode=cv2.BORDER_REPLICATE)[
                       ...,
                       ::-1]
        local_dict = {
            'img_mask': img_mask,
            'target_image': target_image,
            'roi_img': roi_img,
        }
        img = ne.evaluate('img_mask * (target_image * 255)+(1 - img_mask) * roi_img', local_dict=local_dict,
                          global_dict=None)
        img = img.astype(np.uint8)
        frame[roi_box[1]:roi_box[3], roi_box[0]:roi_box[2]] = img
        return frame

    def process_frame(self, frame, image_latent, use_gfpgan, sr_weight, use_color_trans, color_trans_mode):
        _, bboxes, kpss = self.scrfd_detector.get_bboxes(frame, max_num=0)
        rois, faces, Ms, masks = self.face_alignment.forward(
            frame, bboxes, kpss, limit=5, min_face_size=30,
            crop_size=(self.crop_size, self.crop_size), apply_roi=True
        )

        if len(faces) == 0:
            return frame
        elif len(faces) == 1:
            face = np.array(faces[0])
            mat = Ms[0]
            roi_box = rois[0]
        else:
            max_index = np.argmax([roi[2] * roi[3] for roi in rois])  # Get the largest face
            face = np.array(faces[max_index])
            mat = Ms[max_index]
            roi_box = rois[max_index]

        roi_img = frame[roi_box[1]:roi_box[3], roi_box[0]:roi_box[2]]
        face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)

        mask_out, swap_face_out = self.hf.forward(face, image_latent[0].reshape(1, -1))
        mask_out = self.xseg.forward(swap_face_out)[None, None]

        mask = cv2.warpAffine(mask_out[0][0].astype(np.float32), mat, roi_img.shape[:2][::-1])
        mask[mask > 0.2] = 1
        mask = mask[:, :, np.newaxis].astype(np.uint8)
        swap_face = swap_face_out[0].transpose((1, 2, 0)).astype(np.float32)
        target_face = (face.copy() / 255).astype(np.float32)

        if use_gfpgan:
            sr_face = self.gfp.forward(swap_face)
            if sr_weight != 1.0:
                sr_face = cv2.addWeighted(sr_face, sr_weight, swap_face, 1.0 - sr_weight, 0)
            if use_color_trans:
                transed_face = color_transfer(color_trans_mode, (sr_face + 1) / 2, target_face)
                swap_face = (transed_face * 2) - 1
            else:
                swap_face = sr_face
        elif use_color_trans:
            transed_face = color_transfer(color_trans_mode, (swap_face + 1) / 2, target_face)
            swap_face = (transed_face * 2) - 1

        swap_face = ((swap_face + 1) / 2)

        frame_out = self.reverse2wholeimage_hifi_trt_roi(
            swap_face, mat, mask,
            frame, roi_img, roi_box
        )

        return frame_out

def process_image_video(image, video_path, use_gfpgan, sr_weight, use_color_trans, color_trans_mode):
    global processor
    initialize_processor()

    src_latent, _ = processor.hi.get_face_feature(image)
    image_latent = [src_latent]

    video = cv2.VideoCapture(video_path)
    video_fps = video.get(cv2.CAP_PROP_FPS)
    video_size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)),
                  int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    output_dir = 'data/output/'
    if not os.path.exists(output_dir):
        os.mkdir(output_dir)
    swap_video_path = output_dir + 'temp.mp4'
    videoWriter = cv2.VideoWriter(swap_video_path, cv2.VideoWriter_fourcc(*'mp4v'), video_fps, video_size)

    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        futures = []
        while True:
            ret, frame = video.read()
            if not ret:
                break
            future = executor.submit(processor.process_frame, frame, image_latent, use_gfpgan, sr_weight,
                                     use_color_trans, color_trans_mode)
            futures.append(future)

        for future in futures:
            processed_frame = future.result()
            if processed_frame is not None:
                videoWriter.write(processed_frame)

    video.release()
    videoWriter.release()

    add_audio_to_video(video_path, swap_video_path)

    return swap_video_path


def add_audio_to_video(original_video_path, swapped_video_path):
    audio_file_path = original_video_path.split('.')[0] + '.wav'
    if not os.path.exists(audio_file_path):
        os.system(f'ffmpeg -y -hide_banner -loglevel error -i "{original_video_path}" -f wav -vn "{audio_file_path}"')

    temp_output_path = swapped_video_path.replace('.mp4', '_with_audio.mp4')
    os.system(
        f'ffmpeg -y -hide_banner -loglevel error -i "{swapped_video_path}" -i "{audio_file_path}" -c:v copy -c:a aac "{temp_output_path}"')

    os.remove(swapped_video_path)
    os.rename(temp_output_path, swapped_video_path)


# Gradio interface setup
iface = gr.Interface(
    fn=process_image_video,
    inputs=[
        gr.Image(type="pil", label="Source Image"),
        gr.Video(label="Input Video"),
        gr.Checkbox(label="Use GFPGAN [Super-Resolution]"),
        gr.Slider(minimum=0.1, maximum=1.0, step=0.1, label="SR Weight [only support GFPGAN enabled]", value=1.0),
        gr.Checkbox(label="Use Color Transfer"),
        gr.Dropdown(choices=["rct", "lct", "mkl", "idt", "sot"],
                    label="Color Transfer Mode [only support Color-Transfer enabled]", value="rct")
    ],
    outputs=gr.Video(label="Output Video"),
    title="Video Generation",
    description="Upload an image and a video, and the system will generate a new video based on the input."
)

if __name__ == "__main__":
    iface.launch()