import os |
import requests |
import tempfile |
import time |
from moviepy.audio.AudioClip import AudioArrayClip |
from moviepy.editor import * |
import cv2 |
import numpy as np |
import io |
import base64 |
import json |
from io import BytesIO |
import pandas as pd |
from PIL import Image |
import os |
from mutagen.mp3 import MP3 |
azure_speech_key = os.getenv('azure_speech_key') |
azure_service_region = os.getenv('azure_service_region') |
my_openai_key = os.getenv('my_openai_key') |
speech_synthesis_voice_name = "zh-CN-YunhaoNeural" |
print("azure key是",azure_speech_key) |
print("azure_service_region是",azure_service_region) |
print("my_openai_key",my_openai_key) |
def get_image_copywrite(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框识别后得到的文案str(原文即可),也可能是none": |
dialog_texts = '' |
associate_dialog_img = get_associate_dialog(image_path=image_path,dialog_cut_path=dialog_cut_path) |
if len(associate_dialog_img)!=0: |
for dialog_img_path in associate_dialog_img: |
cur_dialog_texts = get_sorted_dialog_text(dialog_img_path) |
if cur_dialog_texts is not None: |
for dialog_text in cur_dialog_texts: |
dialog_texts += dialog_text |
dialog_texts += '\n' |
else: |
print(dialog_img_path+"识别是空-可能是有问题") |
return dialog_texts |
return None |
def get_associate_dialog(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框list,也可能是空的list": |
image_name = os.path.splitext(os.path.basename(image_path))[0] |
image_name_format = '{:03d}'.format(int(image_name)) |
associated_dialogs = [] |
for root, _, files in os.walk(dialog_cut_path): |
for file in files: |
if file.startswith(image_name_format) and file.endswith('.jpg'): |
associated_dialogs.append(os.path.join(root, file)) |
return associated_dialogs |
def get_sorted_dialog_text(image_path:"包含后缀的文件路径")->"返回排序后的text list(一列或者几列话,反正是一个框的内容,几句不清楚,一个框的list当一次文案就行) 或者失败请求返回none": |
image_bytes = open(image_path, 'rb') |
headers = { |
'authority': 'rogerxavier-fastapi-t5-magi.hf.space', |
'scheme': 'https', |
'Accept': '*/*', |
'Accept-Encoding': 'gzip, deflate, br, zstd', |
'Accept-Language': 'zh-CN,zh;q=0.9', |
'Cookie': 'spaces-jwt=eyJhbGciOiJFZERTQSJ9.eyJyZWFkIjp0cnVlLCJwZXJtaXNzaW9ucyI6eyJyZXBvLmNvbnRlbnQucmVhZCI6dHJ1ZX0sIm9uQmVoYWxmT2YiOnsia2luZCI6InVzZXIiLCJfaWQiOiI2NDJhNTNiNTE2ZDRkODI5M2M5YjdiNzgiLCJ1c2VyIjoicm9nZXJ4YXZpZXIifSwiaWF0IjoxNzE2Njg3MzU3LCJzdWIiOiIvc3BhY2VzL3JvZ2VyeGF2aWVyL29jcl93aXRoX2Zhc3RhcGkiLCJleHAiOjE3MTY3NzM3NTcsImlzcyI6Imh0dHBzOi8vaHVnZ2luZ2ZhY2UuY28ifQ._sGdEgC-ijbIhLmB6iNSBQ_xHNzb4Ydb9mD0L3ByRmJSbB9ccfGbRgtNmkV1JLLldHp_VEKUSQt9Mwq_q4aGAQ', |
'Dnt': '1', |
'Priority': 'u=1, i', |
'Sec-Ch-Ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', |
'Sec-Ch-Ua-Mobile': '?0', |
'Sec-Ch-Ua-Platform': '"Windows"', |
'Sec-Fetch-Dest': 'empty', |
'Sec-Fetch-Mode': 'cors', |
'Sec-Fetch-Site': 'same-origin', |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36' |
} |
files = { |
"image": image_bytes, |
} |
try: |
resp = requests.post("https://rogerxavier-ocr-with-fastapi.hf.space/getCoordinates", files=files,headers=headers) |
boxCoordinates , boxInfo = resp.json()[0],resp.json()[1] |
centers = [((box[0][0] + box[2][0]) / 2, (box[0][1] + box[2][1]) / 2) for box in boxCoordinates] |
sorted_indices = sorted(range(len(centers)), key=lambda i: (-centers[i][0], centers[i][1])) |
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] |
sorted_text = [boxInfo['Text'][str(i)] for i in sorted_indices] |
for i in range(len(sorted_indices) - 1): |
if centers[sorted_indices[i]][0] - centers[sorted_indices[i+1]][0] < (sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0]) / 3: |
if sorted_coordinates[i][0][1] > sorted_coordinates[i+1][2][1]: |
sorted_indices[i], sorted_indices[i+1] = sorted_indices[i+1], sorted_indices[i] |
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] |
sorted_text = [boxInfo['Text'][str(i)] for i in sorted_indices] |
print(sorted_coordinates) |
print(sorted_text) |
return sorted_text |
except Exception as e: |
print("图片请求出现问题") |
print(e) |
return None |
def get_audio_data(text:str)-> "返回audio data io句柄, duration": |
speech_key = azure_speech_key |
service_region = azure_service_region |
voiceText = text |
url = f"https://{service_region}.tts.speech.microsoft.com/cognitiveservices/v1" |
headers = { |
"Ocp-Apim-Subscription-Key": speech_key, |
"Content-Type": "application/ssml+xml", |
"X-Microsoft-OutputFormat": "audio-16khz-128kbitrate-mono-mp3", |
"User-Agent": "curl" |
} |
ssml_text = ''' |
<speak version='1.0' xml:lang='zh-CN'> |
<voice xml:lang='zh-CN' xml:gender='male' name='{voiceName}'> |
{voiceText} |
</voice> |
</speak> |
'''.format(voiceName=speech_synthesis_voice_name,voiceText = voiceText) |
response = requests.post(url, headers=headers, data=ssml_text.encode('utf-8')) |
if response.status_code == 200: |
with tempfile.NamedTemporaryFile(dir='/mp3_out/',delete=False) as temp_file: |
temp_file.write(response.content) |
temp_file.close() |
audio = MP3(temp_file.name) |
audio_duration_seconds = audio.info.length |
file_name = temp_file.name |
return file_name, audio_duration_seconds |
else: |
print("Error: Failed to synthesize audio. Status code:", response.status_code) |
def zero_pad(s, length): |
return s.zfill(length) |
def gpt_polish(text:str)->"通过gpt润色str文案并返回str新文案,或者gpt请求失败none": |
api_key = my_openai_key |
headers = { |
'Authorization': f'Bearer {api_key}', |
'Content-Type': 'application/json', |
} |
data = { |
'model': 'gpt-3.5-turbo', |
'messages': [ |
{'role': 'system', 'content': "你是一个assistant,能够根据user发送的漫画中提取的对话文字,生成一个短视频中一帧的文案(1-2句话)"}, |
{'role': 'user', 'content': text} |
] |
} |
try: |
response = requests.post('https://api.yingwu.lol/v1/chat/completions', headers=headers, data=json.dumps(data)) |
print("gpt请求的结果是",response.text) |
print("润色后文案是:"+response.json()['choices'][0]['message']['content']) |
return response.json()['choices'][0]['message']['content'] |
except Exception as e: |
print("gpt润色文案失败:") |
print(e) |
return None |
if __name__ == '__main__': |
img_path = 'manga1' |
dialog_img_path = 'manga12' |
subdir_path = os.path.join(os.getcwd(), img_path) |
image_files = [] |
for root, dirs, files in os.walk(subdir_path): |
for file in files: |
if file.endswith(".jpg") or file.endswith(".png"): |
image_files.append(os.path.relpath(os.path.join(root, file))) |
image_files.sort( |
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) |
dialog_subdir_path = os.path.join(os.getcwd(), dialog_img_path) |
dialog_image_files = [] |
for root, dirs, files in os.walk(dialog_subdir_path): |
for file in files: |
if file.endswith(".jpg") or file.endswith(".png"): |
dialog_image_files.append(os.path.relpath(os.path.join(root, file))) |
dialog_image_files.sort( |
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) |
filename = '' |
image = Image.open(image_files[0]) |
width, height = 1125, 1600 |
fps = 30 |
font_path = '1.ttf' |
video_clips = [] |
filtered_image_files = [] |
for image_path in image_files: |
dialog_list = get_associate_dialog(image_path, dialog_img_path) |
if dialog_list: |
filtered_image_files.append(image_path) |
image_files = filtered_image_files |
for idx, image_file in enumerate(image_files): |
print("现在处理的图片是"+image_file) |
img = cv2.imread(image_file) |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
cur_copywrite = get_image_copywrite(image_file,dialog_img_path) |
cur_copywrite = gpt_polish(cur_copywrite) |
if cur_copywrite is not None: |
filename, duration = get_audio_data(cur_copywrite) |
print("存放临时mp3文件的路径是",filename) |
clip = ImageClip(img).set_duration(duration).resize((width, height)) |
txt_clip = TextClip(cur_copywrite, fontsize=40, color='white', bg_color='black', |
font=font_path) |
txt_clip = txt_clip.set_pos(('center', 'bottom')).set_duration(duration) |
audio_clip = AudioFileClip(filename) |
clip = clip.set_audio(audio_clip) |
clip = CompositeVideoClip([clip, txt_clip]) |
video_clips.append(clip) |
else: |
pass |
video = concatenate_videoclips(video_clips) |
video.write_videofile('mp4_out/output_video.mp4', fps=fps,temp_audiofile="mp3_out/temp.mp3") |
print("删除临时mp3文件", filename) |
os.remove(filename) |