Spaces:
Sleeping
Sleeping
# rogerxavier-ocr-with-fastapi.hf.space | |
import os | |
##这个模型目前只适合确定文本框顺序后再识别,因为如果后面的 | |
##完整图片处理的反例 现在处理的图片是10\0.jpg | |
# [[[953, 743], [987, 743], [987, 867], [953, 867]], [[917, 745], [951, 745], [951, 867], [917, 867]], [[881, 741], [918, 742], [915, 898], [877, 897]], [[843, 743], [879, 743], [879, 809], [843, 809]], [[629, 1058], [669, 1058], [669, 1210], [629, 1210]], [[549, 1227], [583, 1227], [583, 1381], [549, 1381]], [[535, 115], [563, 115], [563, 145], [535, 145]], [[535, 147], [563, 147], [563, 213], [535, 213]], [[507, 443], [539, 443], [539, 579], [507, 579]], [[505, 115], [533, 115], [533, 197], [505, 197]], [[511, 1225], [547, 1225], [547, 1321], [511, 1321]], [[475, 117], [503, 117], [503, 265], [475, 265]], [[467, 421], [503, 421], [503, 575], [467, 575]], [[419, 235], [447, 235], [447, 337], [419, 337]], [[387, 236], [417, 237], [414, 339], [385, 338]], [[209, 796], [242, 797], [239, 921], [206, 920]], [[175, 173], [205, 173], [205, 225], [175, 225]], [[177, 231], [205, 231], [205, 285], [177, 285]], [[103, 1153], [129, 1153], [129, 1223], [103, 1223]], [[41, 100], [108, 101], [104, 549], [36, 548]]] | |
# ['就算是你', '没有圣剑', '也不可能有', '胜算', '就算如此', '我也不觉得', '做', ':做个', '·就不觉得', '老好人', '你可怕', '也要有个限度', '我很恐怖吗', '该说真是', '无药可救', '说的是呢', '这个', '但是', '为何?', '第二话让人怜爱'] | |
import requests | |
import tempfile | |
import time | |
from moviepy.audio.AudioClip import AudioArrayClip | |
from moviepy.editor import * | |
import cv2 | |
import numpy as np | |
import io | |
import base64 | |
import json | |
from io import BytesIO | |
import pandas as pd | |
from PIL import Image | |
import os | |
from mutagen.mp3 import MP3 #读取音频获取时长 | |
azure_speech_key = os.getenv('azure_speech_key') | |
azure_service_region = os.getenv('azure_service_region') | |
my_openai_key = os.getenv('my_openai_key') | |
speech_synthesis_voice_name = "zh-CN-YunhaoNeural" ##云皓 | |
print("azure key是",azure_speech_key) | |
print("azure_service_region是",azure_service_region) | |
print("my_openai_key",my_openai_key) | |
#通过去水印完整漫画图片->获取相应的对话框图片->获取对话框文字->返回对话框文字 | |
def get_image_copywrite(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框识别后得到的文案str(原文即可),也可能是none": | |
dialog_texts = '' | |
associate_dialog_img = get_associate_dialog(image_path=image_path,dialog_cut_path=dialog_cut_path) | |
if len(associate_dialog_img)!=0: | |
#如果有对应的对话框 | |
for dialog_img_path in associate_dialog_img: | |
cur_dialog_texts = get_sorted_dialog_text(dialog_img_path)#一个对话框的文字list | |
if cur_dialog_texts is not None: | |
for dialog_text in cur_dialog_texts: | |
dialog_texts += dialog_text | |
#因为已经在数组中加入了\n 换行,这里就不用加了 | |
else: | |
print(dialog_img_path+"识别是空-可能是有问题") | |
return dialog_texts | |
return None#不规范图片不请求,直接返回none | |
#通过传入无水印漫画图片对话框路径,得到关联的对话框图片list | |
def get_associate_dialog(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框list,也可能是空的list": | |
image_name = os.path.splitext(os.path.basename(image_path))[0] | |
image_name_format = '{:03d}'.format(int(image_name)) | |
associated_dialogs = [] | |
for root, _, files in os.walk(dialog_cut_path): | |
for file in files: | |
if file.startswith(image_name_format) and file.endswith('.jpg'): | |
associated_dialogs.append(os.path.join(root, file)) | |
return associated_dialogs | |
#通过对话框图片路径,获取对话框文字list | |
def get_sorted_dialog_text(image_path:"包含后缀的文件路径")->"返回排序后的text list(一列或者几列话,反正是一个框的内容,几句不清楚,一个框的list当一次文案就行) 或者失败请求返回none": | |
image_bytes = open(image_path, 'rb') | |
headers = { | |
'authority': 'rogerxavier-fastapi-t5-magi.hf.space', | |
'scheme': 'https', | |
'Accept': '*/*', | |
'Accept-Encoding': 'gzip, deflate, br, zstd', | |
'Accept-Language': 'zh-CN,zh;q=0.9', | |
'Cookie': 'spaces-jwt=eyJhbGciOiJFZERTQSJ9.eyJyZWFkIjp0cnVlLCJwZXJtaXNzaW9ucyI6eyJyZXBvLmNvbnRlbnQucmVhZCI6dHJ1ZX0sIm9uQmVoYWxmT2YiOnsia2luZCI6InVzZXIiLCJfaWQiOiI2NDJhNTNiNTE2ZDRkODI5M2M5YjdiNzgiLCJ1c2VyIjoicm9nZXJ4YXZpZXIifSwiaWF0IjoxNzE2Njg3MzU3LCJzdWIiOiIvc3BhY2VzL3JvZ2VyeGF2aWVyL29jcl93aXRoX2Zhc3RhcGkiLCJleHAiOjE3MTY3NzM3NTcsImlzcyI6Imh0dHBzOi8vaHVnZ2luZ2ZhY2UuY28ifQ._sGdEgC-ijbIhLmB6iNSBQ_xHNzb4Ydb9mD0L3ByRmJSbB9ccfGbRgtNmkV1JLLldHp_VEKUSQt9Mwq_q4aGAQ', | |
'Dnt': '1', | |
'Priority': 'u=1, i', | |
'Sec-Ch-Ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', | |
'Sec-Ch-Ua-Mobile': '?0', | |
'Sec-Ch-Ua-Platform': '"Windows"', | |
'Sec-Fetch-Dest': 'empty', | |
'Sec-Fetch-Mode': 'cors', | |
'Sec-Fetch-Site': 'same-origin', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36' | |
} | |
files = { | |
"image": image_bytes, | |
} | |
try: | |
resp = requests.post("https://rogerxavier-ocr-with-fastapi.hf.space/getCoordinates", files=files,headers=headers)#还是有header才能跑 | |
#先json转换,0为坐标list合集,1为 boxid和text合集 | |
boxCoordinates , boxInfo = resp.json()[0],resp.json()[1] #分别是list和dict类型 | |
print("ofa ocr识别漫画块成功返回") | |
# 计算文本框的中心点,以便按照从右往左,从上往下的顺序进行排序 | |
centers = [((box[0][0] + box[2][0]) / 2, (box[0][1] + box[2][1]) / 2) for box in boxCoordinates] | |
# 按照中心点的坐标从右往左,从上往下的顺序对文本框坐标进行排序 | |
sorted_indices = sorted(range(len(centers)), key=lambda i: (-centers[i][0], centers[i][1])) | |
# 获取排序后的文本框坐标和对应的文字 | |
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] | |
sorted_text = [boxInfo['Text'][str(i)] for i in sorted_indices] | |
# 根据x方向偏差对小于1/3宽度之间的不同文本框进行重排 | |
for i in range(len(sorted_indices) - 1): | |
if centers[sorted_indices[i]][0] - centers[sorted_indices[i+1]][0] < (sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0]) / 3: | |
if sorted_coordinates[i][0][1] > sorted_coordinates[i+1][2][1]: | |
#if这里看y轴了 | |
sorted_indices[i], sorted_indices[i+1] = sorted_indices[i+1], sorted_indices[i] | |
# 根据x方向和文本框宽度对大于一个标准宽度之间的不同文本框断句 | |
for i in range(len(sorted_indices) - 1): | |
if centers[sorted_indices[i]][0] - centers[sorted_indices[i + 1]][0] > ( | |
sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0]) * 1.5: | |
# 如果相邻文本框的横坐标距离大于一个标准宽度的2/3,进行断句 | |
sorted_text[i] += '\n' | |
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices] | |
print(sorted_coordinates) | |
print(sorted_text) | |
return sorted_text | |
except Exception as e: | |
print("ofa ocr图片请求出现问题") | |
print(e) | |
return None | |
#通过文字获取音频 | |
def get_audio_data(text:str)-> "返回audio data io句柄, duration(也有可能包含无效字符导致生成音频400错误)": | |
# Creates an instance of a speech config with specified subscription key and service region. | |
speech_key = azure_speech_key | |
service_region = azure_service_region | |
voiceText = text | |
url = f"https://{service_region}.tts.speech.microsoft.com/cognitiveservices/v1" | |
headers = { | |
"Ocp-Apim-Subscription-Key": speech_key, | |
"Content-Type": "application/ssml+xml", | |
"X-Microsoft-OutputFormat": "audio-16khz-128kbitrate-mono-mp3", | |
"User-Agent": "curl" | |
} | |
ssml_text = ''' | |
<speak version='1.0' xml:lang='zh-CN'> | |
<voice xml:lang='zh-CN' xml:gender='male' name='{voiceName}'> | |
{voiceText} | |
</voice> | |
</speak> | |
'''.format(voiceName=speech_synthesis_voice_name,voiceText = voiceText) | |
response = requests.post(url, headers=headers, data=ssml_text.encode('utf-8')) | |
if response.status_code == 200: | |
# 创建临时文件 -当前路径下面 | |
try: | |
with tempfile.NamedTemporaryFile(dir='/mp3_out/',delete=False) as temp_file: | |
temp_file.write(response.content) | |
temp_file.close() | |
audio = MP3(temp_file.name) | |
# 获取音频时长(单位为秒) | |
audio_duration_seconds = audio.info.length #int即可 | |
# 在这里完成您对文件的操作,比如返回文件名 | |
file_name = temp_file.name | |
return file_name, audio_duration_seconds | |
except Exception as e: | |
print("可能遇到mp3 can not sync to MPEG frame错误,总之音频能获取到但是不能识别",e) | |
return None,None#这种也返回none告知错误不要管了 | |
else: | |
print("Error: Failed to synthesize audio. Status code:", response.status_code) | |
return None,None | |
# 补零函数,将数字部分补齐为指定长度 | |
def zero_pad(s, length): | |
return s.zfill(length) | |
def gpt_polish(text:str)->"通过gpt润色str文案并返回str新文案,或者gpt请求失败none": | |
# Set your OpenAI API key | |
api_key = my_openai_key | |
# Define the headers | |
headers = { | |
'Authorization': f'Bearer {api_key}', | |
'Content-Type': 'application/json', | |
} | |
# Chat Completions request data | |
data = { | |
'model': 'gpt-3.5-turbo', # Replace with your chosen model | |
'messages': [ | |
{'role': 'system', 'content': "你是一个assistant,能够根据user发送的漫画中提取的对话文字,生成一个短视频中一帧的文案(1-2句话)"}, | |
{'role': 'user', 'content': text} | |
] | |
} | |
try: | |
response = requests.post('https://api.yingwu.lol/v1/chat/completions', headers=headers, data=json.dumps(data)) | |
print("gpt请求的结果是",response.text) | |
print("润色后文案是:"+response.json()['choices'][0]['message']['content']) | |
return response.json()['choices'][0]['message']['content'] | |
except Exception as e: | |
print("gpt润色文案失败:") | |
print(e) | |
return None | |
if __name__ == '__main__': | |
# 获取存放去水印漫画图片的路径 ---放这里是因为获取对话文字时需要和原图关联 | |
img_path = 'manga1' | |
# 获取切割后的文本框路径 | |
dialog_img_path = 'manga12' | |
#获取漫画原图无水印的加入image_files,并排序 | |
subdir_path = os.path.join(os.getcwd(), img_path) | |
# 对话图片经过加入list并补0确定顺序 | |
image_files = [] | |
for root, dirs, files in os.walk(subdir_path): | |
for file in files: | |
if file.endswith(".jpg") or file.endswith(".png"): | |
image_files.append(os.path.relpath(os.path.join(root, file))) | |
# 对对话框文件名中的数字部分进行补零操作-这样顺序会正常 | |
image_files.sort( | |
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) | |
dialog_subdir_path = os.path.join(os.getcwd(), dialog_img_path) | |
# 对话图片经过加入list并补0确定顺序 | |
dialog_image_files = [] | |
for root, dirs, files in os.walk(dialog_subdir_path): | |
for file in files: | |
if file.endswith(".jpg") or file.endswith(".png"): | |
dialog_image_files.append(os.path.relpath(os.path.join(root, file))) | |
# 对对话框文件名中的数字部分进行补零操作-这样顺序会正常 | |
dialog_image_files.sort( | |
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) | |
# 对话图片经过加入list并补0确定顺序 | |
###音视频相关参数------------------------------------------------------------------------------------- | |
##这个是临时生成音频文件的全局变量--方便后续删除 | |
filename = '' | |
# 视频分辨率和帧率 | |
# 获取第一张图片的尺寸 | |
image = Image.open(image_files[0]) | |
width, height = 1125, 1600 # 无法显示可能是win播放器不支持 | |
fps = 30 | |
font_path = '1.ttf' # 设置字体以防默认字体无法同时处理中英文 | |
# 创建视频编辑器 | |
video_clips = [] | |
###音视频相关参数------------------------------------------------------------------------------------- | |
#因为是根据原图无水印的进行遍历,所以处理前要进行筛选,只处理能找到相应对话框图片的原图 | |
filtered_image_files = [] | |
for image_path in image_files: | |
dialog_list = get_associate_dialog(image_path, dialog_img_path) | |
if dialog_list: | |
filtered_image_files.append(image_path) | |
image_files = filtered_image_files | |
for idx, image_file in enumerate(image_files): | |
print("现在处理的图片是"+image_file) | |
#后面是视音频生成部分-这里图片需要用到完整的去水印的而不是对话框用于识别的 | |
img = cv2.imread(image_file) | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) ##只支持英文路径 | |
##获取当前图片对应的对话框识别文字(还需gpt处理后作为字幕文案) | |
cur_copywrite = get_image_copywrite(image_file,dialog_img_path) # image_file就是6.jpg了 | |
#cur_copywrite = gpt_polish(cur_copywrite)#不用gpt,只用新版漫画块得到的100%识别原文即可 | |
if cur_copywrite is not None: | |
##获取当前图片对应的临时音频文件名称和文案时长 | |
filename, duration = get_audio_data(cur_copywrite) | |
if filename is not None: | |
print("存放临时mp3文件的路径是",filename) | |
clip = ImageClip(img).set_duration(duration).resize((width, height)) # 初始clip | |
txt_clip = TextClip(cur_copywrite, fontsize=40, color='white', bg_color='black', | |
font=font_path) ##文本clip后加入视频 | |
txt_clip = txt_clip.set_pos(('center', 'bottom')).set_duration(duration) | |
# 创建音频剪辑 | |
audio_clip = AudioFileClip(filename) | |
clip = clip.set_audio(audio_clip) # 将音频与视频片段关联 | |
clip = CompositeVideoClip([clip, txt_clip]) | |
video_clips.append(clip) | |
else: | |
pass ##音频特殊字符或者其他原因无法生成跳过 | |
else: | |
pass ##图片不规范直接跳过 | |
video = concatenate_videoclips(video_clips) | |
# 保存视频 | |
video.write_videofile('mp4_out/output_video.mp4', fps=fps,temp_audiofile="mp3_out/temp.mp3") | |
# # 在文件关闭后删除临时文件 | |
print("删除临时mp3文件", filename) | |
os.remove(filename) | |