|
|
|
import os |
|
import random |
|
|
|
|
|
|
|
|
|
|
|
import requests |
|
|
|
import tempfile |
|
import time |
|
import re |
|
from moviepy.audio.AudioClip import AudioArrayClip |
|
from moviepy.editor import * |
|
import cv2 |
|
import numpy as np |
|
import io |
|
import base64 |
|
import json |
|
from io import BytesIO |
|
import pandas as pd |
|
from PIL import Image |
|
import os |
|
from mutagen.mp3 import MP3 |
|
|
|
|
|
azure_speech_key = os.getenv('azure_speech_key') |
|
azure_service_region = os.getenv('azure_service_region') |
|
my_openai_key = os.getenv('my_openai_key') |
|
speech_synthesis_voice_name = "zh-CN-YunhaoNeural" |
|
print("azure key是",azure_speech_key) |
|
print("azure_service_region是",azure_service_region) |
|
print("my_openai_key",my_openai_key) |
|
|
|
|
|
def get_image_copywrite(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框识别后得到的文案str(原文即可),也可能是none": |
|
def extract_chinese(text:str)->str: |
|
|
|
chinese_pattern = re.compile("[\u4e00-\u9fa5]+") |
|
chinese_text = "" |
|
for char in text: |
|
if char == '\n' or re.match(chinese_pattern, char): |
|
chinese_text += char |
|
return chinese_text |
|
|
|
dialog_texts = '' |
|
associate_dialog_img = get_associate_dialog(image_path=image_path,dialog_cut_path=dialog_cut_path) |
|
if len(associate_dialog_img)!=0: |
|
|
|
for dialog_img_path in associate_dialog_img: |
|
cur_dialog_texts = get_sorted_dialog_text(dialog_img_path) |
|
if cur_dialog_texts is not None: |
|
for dialog_text in cur_dialog_texts: |
|
|
|
dialog_texts += extract_chinese(dialog_text) |
|
|
|
else: |
|
print(dialog_img_path+"识别是空-可能是有问题") |
|
return dialog_texts |
|
return None |
|
|
|
|
|
def get_associate_dialog(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框list,也可能是空的list": |
|
image_name = os.path.splitext(os.path.basename(image_path))[0] |
|
image_name_format = '{:03d}'.format(int(image_name)) |
|
|
|
associated_dialogs = [] |
|
for root, _, files in os.walk(dialog_cut_path): |
|
for file in files: |
|
if file.startswith(image_name_format) and file.endswith('.jpg'): |
|
associated_dialogs.append(os.path.join(root, file)) |
|
|
|
return associated_dialogs |
|
|
|
|
|
def merge_sublists(lists): |
|
merged = [] |
|
for sublist in lists: |
|
found = False |
|
for m in merged: |
|
if any(elem in sublist for elem in m): |
|
m.extend(elem for elem in sublist if elem not in m) |
|
found = True |
|
break |
|
if not found: |
|
merged.append(sublist) |
|
return merged |
|
|
|
|
|
|
|
|
|
|
|
def find_associate_text(sorted_indices,centers,sorted_coordinates,boxInfo): |
|
associate_text_list = [] |
|
related_groups = [] |
|
for i in range(len(sorted_indices) - 1): |
|
for j in range(i+1 , len(sorted_indices)): |
|
if (abs(centers[sorted_indices[i]][1] - centers[sorted_indices[j]][1]) < abs( |
|
(sorted_coordinates[i][2][1] - sorted_coordinates[i][0][1])) / 3) \ |
|
and (abs(centers[sorted_indices[i]][0] - centers[sorted_indices[j]][0]) < abs( |
|
(sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0])) * 1.5): |
|
|
|
|
|
found = False |
|
for group in related_groups: |
|
if i in group or j in group: |
|
group.add(i) |
|
group.add(j) |
|
found = True |
|
break |
|
if not found: |
|
related_groups.append({i, j}) |
|
|
|
|
|
for group in related_groups: |
|
text_group = [] |
|
for idx in group: |
|
text_group.append(boxInfo['Text'][str(sorted_indices[idx])]) |
|
associate_text_list.append(text_group) |
|
|
|
return merge_sublists(associate_text_list),related_groups |
|
|
|
|
|
|
|
|
|
|
|
|
|
def sort_associate_text_list(sorted_indices:list,related_groups:list,boxCoordinates,centers)->list: |
|
sorted_groups = [] |
|
|
|
for group in related_groups: |
|
group = list(group) |
|
isVertical = False |
|
isCross = False |
|
|
|
for idx in group: |
|
if (boxCoordinates[sorted_indices[idx]][2][0] - boxCoordinates[sorted_indices[idx]][0][0]) > ( |
|
boxCoordinates[sorted_indices[idx]][2][1] - boxCoordinates[sorted_indices[idx]][0][1]): |
|
|
|
isCross =True |
|
pass |
|
else: |
|
|
|
isVertical = True |
|
pass |
|
if isVertical: |
|
group.sort(key=lambda idx: centers[sorted_indices[idx]][0], reverse=True) |
|
if isCross: |
|
group.sort(key=lambda idx: centers[sorted_indices[idx]][1], reverse=False) |
|
sorted_groups.append(group) |
|
|
|
return sorted_groups |
|
|
|
|
|
|
|
|
|
def sort_dialog_list(sorted_indices:list,related_groups:list,sorted_coordinates)->list: |
|
sorted_groups = [] |
|
related_groups_copy = related_groups.copy() |
|
sorted_indices_copy = sorted_indices.copy() |
|
added = {} |
|
|
|
|
|
for i in range(len(sorted_indices) - 1): |
|
if (( |
|
sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0]) < ( |
|
sorted_coordinates[i][2][1] - sorted_coordinates[i][0][1])): |
|
|
|
|
|
pass |
|
else: |
|
sorted_indices_copy.remove(i) |
|
|
|
|
|
for idx in sorted_indices_copy: |
|
added[idx] = False |
|
for group in related_groups_copy: |
|
if idx in group: |
|
sorted_groups.append(group) |
|
related_groups_copy.remove(group) |
|
added[idx] = True |
|
break |
|
if not added[idx]: |
|
sorted_groups.append(idx) |
|
|
|
filtered_data = [] |
|
data = sorted_groups |
|
for item in data: |
|
if isinstance(item, list): |
|
|
|
filtered_data.append(item) |
|
else: |
|
|
|
is_in_sublist = False |
|
for sublist in data: |
|
if isinstance(sublist, list) and item in sublist: |
|
is_in_sublist = True |
|
break |
|
if not is_in_sublist: |
|
filtered_data.append([item]) |
|
|
|
|
|
return filtered_data |
|
|
|
|
|
def get_sorted_dialog_text(image_path:"包含后缀的文件路径")->"返回排序后的text list(一列或者几列话,反正是一个框的内容,几句不清楚,一个框的list当一次文案就行) 或者失败请求返回none": |
|
image_bytes = open(image_path, 'rb') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
files = { |
|
"image": image_bytes, |
|
} |
|
try: |
|
|
|
resp = requests.post("https://rogerxavier-ocr-with-fastapi.hf.space/getCoordinates", files=files) |
|
|
|
boxCoordinates , boxInfo = resp.json()[0],resp.json()[1] |
|
print("ofa ocr识别漫画块成功返回") |
|
print("boxCoordinates是",boxCoordinates) |
|
print("boxInfo是",boxInfo) |
|
|
|
|
|
centers = [((box[0][0] + box[2][0]) / 2, (box[0][1] + box[2][1]) / 2) for box in boxCoordinates] |
|
|
|
sorted_indices = sorted(range(len(centers)), key=lambda i: ( centers[i][1],-centers[i][0])) |
|
|
|
|
|
|
|
|
|
|
|
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] |
|
|
|
|
|
|
|
associate_text_list,related_groups = find_associate_text(sorted_indices,centers,sorted_coordinates,boxInfo) |
|
|
|
|
|
|
|
|
|
related_groups = sort_associate_text_list(sorted_indices,related_groups,boxCoordinates,centers) |
|
|
|
|
|
|
|
|
|
related_groups_in_sorted_indices = [] |
|
for group in related_groups: |
|
related_groups_in_sorted_indices_item = [] |
|
for idx in group: |
|
related_groups_in_sorted_indices_item.append(sorted_indices[idx]) |
|
related_groups_in_sorted_indices.append(related_groups_in_sorted_indices_item) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
related_groups = sort_dialog_list(sorted_indices,related_groups,sorted_coordinates) |
|
|
|
|
|
|
|
|
|
|
|
|
|
flattened_list = [num for sublist in related_groups for num in sublist] |
|
added_indices = set() |
|
sorted_text = [] |
|
for i in flattened_list: |
|
for sublist in related_groups: |
|
if i in sublist: |
|
if i == sublist[-1] and i not in added_indices: |
|
sorted_text.append(boxInfo['Text'][str(sorted_indices[i])] + '\n') |
|
added_indices.add(i) |
|
elif i not in added_indices: |
|
sorted_text.append(boxInfo['Text'][str(sorted_indices[i])]) |
|
added_indices.add(i) |
|
|
|
|
|
|
|
|
|
|
|
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] |
|
print(sorted_coordinates) |
|
print(sorted_text) |
|
return sorted_text |
|
except Exception as e: |
|
print("ofa ocr图片请求出现问题") |
|
print(e) |
|
return None |
|
|
|
|
|
|
|
|
|
def get_audio_data(text:str)-> "返回audio data io句柄, duration(也有可能包含无效字符导致生成音频400错误)": |
|
|
|
speech_key = azure_speech_key |
|
service_region = azure_service_region |
|
|
|
voiceText = text |
|
url = f"https://{service_region}.tts.speech.microsoft.com/cognitiveservices/v1" |
|
|
|
headers = { |
|
"Ocp-Apim-Subscription-Key": speech_key, |
|
"Content-Type": "application/ssml+xml", |
|
"X-Microsoft-OutputFormat": "audio-16khz-128kbitrate-mono-mp3", |
|
"User-Agent": "curl" |
|
} |
|
|
|
ssml_text = ''' |
|
<speak version='1.0' xml:lang='zh-CN'> |
|
<voice xml:lang='zh-CN' xml:gender='male' name='{voiceName}'> |
|
{voiceText} |
|
</voice> |
|
</speak> |
|
'''.format(voiceName=speech_synthesis_voice_name,voiceText = voiceText) |
|
|
|
response = requests.post(url, headers=headers, data=ssml_text.encode('utf-8')) |
|
|
|
if response.status_code == 200: |
|
|
|
try: |
|
with tempfile.NamedTemporaryFile(dir='/mp3_out/',delete=False) as temp_file: |
|
temp_file.write(response.content) |
|
temp_file.close() |
|
audio = MP3(temp_file.name) |
|
|
|
audio_duration_seconds = audio.info.length |
|
|
|
file_name = temp_file.name |
|
return file_name, audio_duration_seconds |
|
except Exception as e: |
|
print("可能遇到mp3 can not sync to MPEG frame错误,总之音频能获取到但是不能识别",e) |
|
return None,None |
|
|
|
else: |
|
print("Error: Failed to synthesize audio. Status code:", response.status_code) |
|
return None,None |
|
|
|
|
|
|
|
|
|
|
|
|
|
def zero_pad(s, length): |
|
return s.zfill(length) |
|
|
|
|
|
def gpt_polish(text:str)->"通过gpt润色str文案并返回str新文案,或者gpt请求失败none": |
|
|
|
api_key = my_openai_key |
|
|
|
|
|
headers = { |
|
'Authorization': f'Bearer {api_key}', |
|
'Content-Type': 'application/json', |
|
} |
|
|
|
|
|
data = { |
|
'model': 'gpt-3.5-turbo', |
|
'messages': [ |
|
{'role': 'system', 'content': "你是一个assistant,能够根据user发送的漫画中提取的对话文字,生成一个短视频中一帧的文案(1-2句话)"}, |
|
{'role': 'user', 'content': text} |
|
] |
|
} |
|
try: |
|
response = requests.post('https://api.yingwu.lol/v1/chat/completions', headers=headers, data=json.dumps(data)) |
|
print("gpt请求的结果是",response.text) |
|
print("润色后文案是:"+response.json()['choices'][0]['message']['content']) |
|
return response.json()['choices'][0]['message']['content'] |
|
except Exception as e: |
|
print("gpt润色文案失败:") |
|
print(e) |
|
return None |
|
if __name__ == '__main__': |
|
|
|
img_path = 'manga1' |
|
|
|
dialog_img_path = 'manga12' |
|
|
|
|
|
subdir_path = os.path.join(os.getcwd(), img_path) |
|
|
|
image_files = [] |
|
for root, dirs, files in os.walk(subdir_path): |
|
for file in files: |
|
if file.endswith(".jpg") or file.endswith(".png"): |
|
image_files.append(os.path.relpath(os.path.join(root, file))) |
|
|
|
image_files.sort( |
|
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) |
|
|
|
dialog_subdir_path = os.path.join(os.getcwd(), dialog_img_path) |
|
|
|
dialog_image_files = [] |
|
for root, dirs, files in os.walk(dialog_subdir_path): |
|
for file in files: |
|
if file.endswith(".jpg") or file.endswith(".png"): |
|
dialog_image_files.append(os.path.relpath(os.path.join(root, file))) |
|
|
|
dialog_image_files.sort( |
|
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) |
|
|
|
|
|
|
|
|
|
|
|
filename = '' |
|
|
|
|
|
|
|
|
|
image=Image.open(random.choice(image_files)) |
|
width, height = image.size |
|
|
|
|
|
save_path = os.path.join("cover", "0.jpg") |
|
|
|
image.save(save_path) |
|
|
|
|
|
|
|
fps = 30 |
|
font_path = '1.ttf' |
|
|
|
video_clips = [] |
|
|
|
|
|
|
|
|
|
|
|
filtered_image_files = [] |
|
for image_path in image_files: |
|
dialog_list = get_associate_dialog(image_path, dialog_img_path) |
|
if dialog_list: |
|
filtered_image_files.append(image_path) |
|
|
|
image_files = filtered_image_files |
|
|
|
|
|
|
|
for idx, image_file in enumerate(image_files): |
|
print("现在处理的图片是"+image_file) |
|
|
|
img = cv2.imread(image_file) |
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
cur_copywrite = get_image_copywrite(image_file,dialog_img_path) |
|
|
|
|
|
if cur_copywrite is not None: |
|
|
|
|
|
|
|
filename, duration = get_audio_data(cur_copywrite) |
|
if filename is not None: |
|
print("存放临时mp3文件的路径是",filename) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
clip = ImageClip(img).set_duration(duration).resize((width, height)) |
|
|
|
|
|
|
|
audio_clip = AudioFileClip(filename) |
|
clip = clip.set_audio(audio_clip) |
|
video_clips.append(clip) |
|
|
|
else: |
|
pass |
|
|
|
|
|
video = concatenate_videoclips(video_clips) |
|
|
|
video.write_videofile('mp4_out/output_video.mp4', fps=fps,temp_audiofile="mp3_out/temp.mp3") |
|
|
|
print("删除临时mp3文件", filename) |
|
os.remove(filename) |
|
|
|
|
|
|
|
|