|
import os |
|
import subprocess |
|
|
|
import cv2 |
|
from imageio_ffmpeg import get_ffmpeg_exe |
|
from mmengine.logging import print_log |
|
from moviepy.editor import VideoFileClip |
|
from scenedetect import FrameTimecode |
|
|
|
|
|
def iterate_files(folder_path): |
|
for root, dirs, files in os.walk(folder_path): |
|
|
|
|
|
|
|
|
|
|
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
|
|
yield file_path |
|
|
|
|
|
for subdir in dirs: |
|
subdir_path = os.path.join(root, subdir) |
|
|
|
iterate_files(subdir_path) |
|
|
|
|
|
def iterate_folders(folder_path): |
|
for root, dirs, files in os.walk(folder_path): |
|
for subdir in dirs: |
|
subdir_path = os.path.join(root, subdir) |
|
yield subdir_path |
|
|
|
iterate_folders(subdir_path) |
|
|
|
|
|
def clone_folder_structure(root_src, root_dst, verbose=False): |
|
src_path_list = iterate_folders(root_src) |
|
src_relpath_list = [os.path.relpath(x, root_src) for x in src_path_list] |
|
|
|
os.makedirs(root_dst, exist_ok=True) |
|
dst_path_list = [os.path.join(root_dst, x) for x in src_relpath_list] |
|
for folder_path in dst_path_list: |
|
os.makedirs(folder_path, exist_ok=True) |
|
if verbose: |
|
print(f"Create folder: '{folder_path}'") |
|
|
|
|
|
def count_files(root, suffix=".mp4"): |
|
files_list = iterate_files(root) |
|
cnt = len([x for x in files_list if x.endswith(suffix)]) |
|
return cnt |
|
|
|
|
|
def check_mp4_integrity(file_path, verbose=True, logger=None): |
|
try: |
|
VideoFileClip(file_path) |
|
if verbose: |
|
print_log(f"The MP4 file '{file_path}' is intact.", logger=logger) |
|
return True |
|
except Exception as e: |
|
if verbose: |
|
print_log(f"Error: {e}", logger=logger) |
|
print_log(f"The MP4 file '{file_path}' is not intact.", logger=logger) |
|
return False |
|
|
|
|
|
def count_frames(video_path): |
|
cap = cv2.VideoCapture(video_path) |
|
|
|
if not cap.isOpened(): |
|
print(f"Error: Could not open video file '{video_path}'") |
|
return |
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
print(f"Total frames in the video '{video_path}': {total_frames}") |
|
|
|
cap.release() |
|
|
|
|
|
def split_video( |
|
sample_path, |
|
scene_list, |
|
save_dir, |
|
target_fps=30, |
|
min_seconds=1, |
|
max_seconds=10, |
|
shorter_size=512, |
|
verbose=False, |
|
logger=None, |
|
): |
|
FFMPEG_PATH = get_ffmpeg_exe() |
|
|
|
save_path_list = [] |
|
for idx, scene in enumerate(scene_list): |
|
s, t = scene |
|
fps = s.framerate |
|
max_duration = FrameTimecode(timecode="00:00:00", fps=fps) |
|
max_duration.frame_num = round(fps * max_seconds) |
|
duration = min(max_duration, t - s) |
|
if duration.get_frames() < round(min_seconds * fps): |
|
continue |
|
|
|
|
|
fname = os.path.basename(sample_path) |
|
fname_wo_ext = os.path.splitext(fname)[0] |
|
|
|
save_path = os.path.join(save_dir, f"{fname_wo_ext}_scene-{idx}.mp4") |
|
|
|
|
|
cmd = [FFMPEG_PATH] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cmd += ["-i", sample_path] |
|
|
|
|
|
cmd += ["-nostdin", "-y", "-ss", str(s.get_seconds()), "-t", str(duration.get_seconds())] |
|
|
|
|
|
|
|
cmd += ["-r", f"{target_fps}"] |
|
|
|
|
|
cmd += ["-vf", f"scale='if(gt(iw,ih),-2,{shorter_size})':'if(gt(iw,ih),{shorter_size},-2)'"] |
|
|
|
|
|
cmd += ["-map", "0", save_path] |
|
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
|
stdout, stderr = proc.communicate() |
|
if verbose: |
|
stdout = stdout.decode("utf-8") |
|
print_log(stdout, logger=logger) |
|
|
|
save_path_list.append(sample_path) |
|
print_log(f"Video clip saved to '{save_path}'", logger=logger) |
|
|
|
return save_path_list |
|
|