Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile,Body,Query,Response | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
from PIL import Image | |
import numpy as np | |
import urllib.request | |
import io | |
import os | |
from typing import * | |
from fastapi.responses import PlainTextResponse #执行其他py的plaintext返回 | |
import subprocess | |
from fastapi import BackgroundTasks | |
import time | |
from bilibili_api import sync, video_uploader, Credential#bili上传部分 | |
import asyncio #保证后台任务的先后 | |
import json #对bili_meta_data进行dumps ->str和loads—>dict | |
import bilibili_api | |
import random #随机获取cover | |
import requests #用于请求其他space的图片api | |
sessdata = os.getenv('sessdata') | |
bili_jct = os.getenv('bili_jct') | |
buvid3 = os.getenv('buvid3') | |
app = FastAPI() | |
def inference(): | |
return "<p>Hello, World!</p>" | |
def t5(input): | |
return {"output": input} | |
##这个比较快不用异步 | |
#增加指定保存路径功能,默认是/manga路径 | |
def getOriginalMangaList(images: List[UploadFile] = File(...), save_path: str = "/manga"): | |
saved_files = [] | |
for idx, image in enumerate(images): | |
img = image.file.read() | |
image_data = Image.open(io.BytesIO(img)).convert("L").convert("RGB") | |
path_to_image = os.path.join(save_path, f"{idx}.jpg") | |
image_data.save(path_to_image) | |
saved_files.append(path_to_image) | |
return {"message": "Images saved successfully", "saved_files": saved_files} | |
##这个比较快不用异步 | |
##这个比较快不用异步 | |
async def delete_all_files_and_folders(directory:str): | |
for root, dirs, files in os.walk(directory, topdown=False): | |
for name in files: | |
file_path = os.path.join(root, name) | |
print("删除指定目录函数检查到文件,删除文件:", file_path) | |
os.remove(file_path) | |
for name in dirs: | |
dir_path = os.path.join(root, name) | |
print("删除指定目录函数检查到目录,删除目录:", dir_path) | |
os.rmdir(dir_path) | |
return {"message": f"成功删除{directory}目录下的所有文件"} | |
##这个比较快不用异步 | |
########异步处理py文件执行接口 | |
def file_executer(file_name:str)->"执行返回": | |
try: | |
print("开始执行py任务",file_name) | |
# result = subprocess.check_output(["python", f"{file_name}.py"], stderr=subprocess.STDOUT).decode("utf-8")#执行完成后显示运行py的print | |
##########test 边执行py边显示print+++++++++++++++++++++++++++++++++++++ | |
# 开始执行Python 脚本 | |
output = [] | |
process = subprocess.Popen(["python", f"{file_name}.py"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
# 逐行读取脚本的输出并显示 | |
for line in iter(process.stdout.readline, b''): | |
line_text = line.decode('utf-8').strip() | |
print(line_text) | |
output.append(line_text) | |
process.wait() | |
result = '\n'.join(output) | |
##########test 边执行py边显示print+++++++++++++++++++++++++++++++++++++ | |
print("执行完成py任务:",file_name,"结果如下") | |
print(result) | |
return PlainTextResponse(result) | |
except subprocess.CalledProcessError as e: | |
print("执行py任务失败",file_name,"原因如下") | |
print(f"Error executing {file_name}.py: {e}") | |
return PlainTextResponse(f"Error executing {file_name}.py: {e}") | |
async def execute_py_file(file_name: str,background_tasks: BackgroundTasks): | |
result = "接受到了请求{filename}任务".format(filename = file_name) | |
background_tasks.add_task(file_executer,file_name) | |
return result | |
########异步处理py文件执行接口 | |
#保证既要提交队列后返回给用户,又要先完成前面的video生成步骤再执行后面的submit函数,还有兼顾allow_submit的值 | |
##能立刻返回的 | |
async def execute_all_task(background_tasks: BackgroundTasks, bili_meta_data,file_list: List[str] = Query(["0filterImage","1removeMask", "2magiDialogCut", "3mergeDialogToVideo"]), | |
mp4_out_file: str = 'mp4_out/output.mp4', | |
allow_submit: bool = False | |
): | |
#cover_url因为本地上传有协程问题,所以采用cover_url然后转换为Picture类型即可->bilibili_api.utls.picture.Picture.from_url(str)->str | |
print("获取到的用户bili_meta_data是:", bili_meta_data) | |
#这里需要先转换bili_meta_data从str变成dict才能继续 | |
bili_meta_data = json.loads(bili_meta_data) | |
print("获取到的file_list是",file_list) | |
print("获取到的allow_submit是",allow_submit) | |
background_tasks.add_task(process_tasks_and_upload, file_list, bili_meta_data, allow_submit,background_tasks) | |
return {"message": "提交成功"} | |
def process_tasks_and_upload(file_list, bili_meta_data, allow_submit,background_tasks): | |
tasks = [] | |
for file_name in file_list: | |
tasks.append(file_executer(file_name)) | |
if allow_submit: | |
#因为采用cover_url上传,所以不需要读取内容然后保存本地->这样也会引起协程问题 | |
print("bili的提交进程进行加入后台任务") | |
background_tasks.add_task(upload_video,bili_meta_data) | |
##尝试能立刻返回的 | |
##########异步样例 | |
def someTask(): | |
time.sleep(20) | |
print("睡眠20s结束") | |
def returnRandomSubscribeUrl(background_tasks: BackgroundTasks)->str: | |
#返回 | |
result = "先返回" | |
background_tasks.add_task(someTask) | |
return result | |
##########异步样例 | |
async def upload_video(meta:dict): | |
#假设最多重新上传5次,上传cover使用随机图片,超时则再次执行 | |
max_attempts = 5 | |
attempts = 0 | |
while attempts < max_attempts: | |
try: | |
await upload_with_random_cover(meta); | |
except Exception as e: | |
print("上传video遇到错误,可能是cover不符合超时,下面将调用upload_with_random_cover函数") | |
print(f"Attempt 第{attempts+1}次重新上传: 本次错误原因是Error : {e}") | |
#重新上传会执行try的部分,所以except部分不用写upload again | |
attempts += 1 | |
else: | |
print("No errors occurred") | |
break | |
#检测output.mp4下载查看生成效果(b不过审时用) | |
async def get_video(dir:str = "/mp4_out/output.mp4")->"video file in header": | |
video_file_path = dir | |
video_file = open(video_file_path, "rb") | |
video_data = video_file.read() | |
video_file.close() | |
response = Response(content=video_data, media_type="video/mp4") | |
response.headers["Content-Disposition"] = "attachment; filename=output.mp4" | |
return response | |
# 补零函数,将数字部分补齐为指定长度 | |
def zero_pad(s, length): | |
return s.zfill(length) | |
#如果默认的cover不能过审,那么随机选取无水印的manga1下面的随便一个作为cover重新上传 | |
async def upload_again(bili_meta_data:str): | |
bili_meta_data = json.loads(bili_meta_data) | |
await upload_with_random_cover(bili_meta_data) | |
#这个函数单独拆开,因为手动和自动重新上传都需要调用 | |
async def upload_with_random_cover(bili_meta_data:dict): | |
meta = bili_meta_data | |
credential = Credential(sessdata=sessdata, | |
bili_jct=bili_jct, | |
buvid3=buvid3) | |
# 使用manga12中的裁剪漫画块中的作为cover,过审概率应该高一些 | |
img_path = 'manga12' | |
#获取漫画原图无水印的加入image_files,并排序 | |
subdir_path = os.path.join(os.getcwd(), img_path) | |
# 对话图片经过加入list并补0确定顺序 | |
image_files = [] | |
for root, dirs, files in os.walk(subdir_path): | |
for file in files: | |
if file.endswith(".jpg") or file.endswith(".png"): | |
image_files.append(os.path.relpath(os.path.join(root, file))) | |
# 对对话框文件名中的数字部分进行补零操作-这样顺序会正常 | |
image_files.sort( | |
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) | |
# 获取随机一张图片图片存入cover/0.jpg作为cover图片,至于保存为cover/0.jpg而不是直接用是因为能在出错的时候看效果,通过/cover.jpg接口 | |
random_image = Image.open(random.choice(image_files)) | |
# 定义要保存的文件路径 | |
save_path = os.path.join("cover", "0.jpg") | |
# 保存图片文件 | |
random_image.save(save_path) | |
with open("cover/0.jpg", "rb") as f: | |
cover_data = f.read() | |
cover = bilibili_api.utils.picture.Picture.from_content(cover_data,"jpg")#直接读取本地图片content | |
page = video_uploader.VideoUploaderPage(path='mp4_out/output_video.mp4', title=meta['title'], description=meta['desc']) | |
uploader = video_uploader.VideoUploader([page], meta, credential, cover=cover) | |
async def ev(data): | |
print(data) | |
await uploader.start() | |
def get_cover_image(): | |
cover_dir = "cover" | |
file_path = os.path.join(cover_dir, "0.jpg") | |
return FileResponse(file_path) | |