Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile,Body | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
from PIL import Image | |
import numpy as np | |
import urllib.request | |
import io | |
import os | |
from moviepyTest import test | |
from typing import * | |
from fastapi.responses import PlainTextResponse #执行其他py的plaintext返回 | |
import subprocess | |
from fastapi import BackgroundTasks | |
import time | |
from bilibili_api import sync, video_uploader, Credential#bili上传部分 | |
sessdata = os.getenv('sessdata') | |
bili_jct = os.getenv('bili_jct') | |
buvid3 = os.getenv('buvid3') | |
app = FastAPI() | |
def inference(): | |
return "<p>Hello, World!</p>" | |
def t5(input): | |
return {"output": input} | |
def t5(): | |
result = test() | |
return {"output": result} | |
##这个比较快不用异步 | |
#增加指定保存路径功能,默认是/manga路径 | |
def getOriginalMangaList(images: List[UploadFile] = File(...), save_path: str = "/manga"): | |
saved_files = [] | |
for idx, image in enumerate(images): | |
img = image.file.read() | |
image_data = Image.open(io.BytesIO(img)).convert("L").convert("RGB") | |
path_to_image = os.path.join(save_path, f"{idx}.jpg") | |
image_data.save(path_to_image) | |
saved_files.append(path_to_image) | |
return {"message": "Images saved successfully", "saved_files": saved_files} | |
##这个比较快不用异步 | |
##这个比较快不用异步 | |
async def delete_files(directory: str): | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
return {"message": f"成功删除{directory}目录下的所有文件"} | |
##这个比较快不用异步 | |
########异步处理py文件执行接口 | |
def file_executer(file_name:str)->"执行返回": | |
try: | |
print("开始执行py任务",file_name) | |
# result = subprocess.check_output(["python", f"{file_name}.py"], stderr=subprocess.STDOUT).decode("utf-8")#执行完成后显示运行py的print | |
##########test 边执行py边显示print+++++++++++++++++++++++++++++++++++++ | |
# 开始执行Python 脚本 | |
output = [] | |
process = subprocess.Popen(["python", f"{file_name}.py"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
# 逐行读取脚本的输出并显示 | |
for line in iter(process.stdout.readline, b''): | |
line_text = line.decode('utf-8').strip() | |
print(line_text) | |
output.append(line_text) | |
process.wait() | |
result = '\n'.join(output) | |
##########test 边执行py边显示print+++++++++++++++++++++++++++++++++++++ | |
print("执行完成py任务:",file_name,"结果如下") | |
print(result) | |
return PlainTextResponse(result) | |
except subprocess.CalledProcessError as e: | |
print("执行py任务失败",file_name,"原因如下") | |
print(f"Error executing {file_name}.py: {e}") | |
return PlainTextResponse(f"Error executing {file_name}.py: {e}") | |
async def execute_py_file(file_name: str,background_tasks: BackgroundTasks): | |
result = "接受到了请求{filename}任务".format(filename = file_name) | |
background_tasks.add_task(file_executer,file_name) | |
return result | |
########异步处理py文件执行接口 | |
#收到图片后为了后台任务的顺序,单独设定一个函数依次将自定义执行顺序加入队列 ,有默认参数的写后面-并且和bili合并,不然还要等output.mp4生成后才能发起提交请求 | |
async def execute_all_task(background_tasks: BackgroundTasks,file_list: list = ["1removeMask", "2magiDialogCut", "3mergeDialogToVideo"], | |
bili_meta_data:dict = None,cover_image: UploadFile = File(...),mp4_out_file:str='mp4_out/output.mp4', | |
allow_submit:"是否将结果允许上传"=False,cover_path:str ='/cover' | |
): | |
for file_name in file_list: | |
background_tasks.add_task(file_executer, file_name) | |
if allow_submit: | |
#如果请求中设置允许上传output.mp4 | |
cover_img = cover_image.file.read() | |
cover_img_data = Image.open(io.BytesIO(img)).convert("L").convert("RGB") | |
path_to_image = os.path.join(cover_path, f"cover.jpg") | |
cover_img_data.save(path_to_image) | |
#video数据从meta用户请求获取,至于 | |
background_tasks.add_task(upload_video, bili_meta_data) | |
return {"message": "提交video任务已加入队列"} | |
return {"message": "Tasks added to the queue"} | |
##########异步样例 | |
def someTask(): | |
time.sleep(20) | |
print("睡眠20s结束") | |
def returnRandomSubscribeUrl(background_tasks: BackgroundTasks)->str: | |
#返回 | |
result = "先返回" | |
background_tasks.add_task(someTask) | |
return result | |
##########异步样例 | |
async def upload_video(): | |
credential = Credential(sessdata=sessdata, | |
bili_jct=bili_jct, | |
buvid3=buvid3) | |
page = video_uploader.VideoUploaderPage(path='mp4_out/output_video.mp4', title=meta['title'], description=meta['desc']) | |
uploader = video_uploader.VideoUploader([page], meta, credential, cover='cover.jpg') | |
async def ev(data): | |
print(data) | |
await uploader.start() | |