from fastapi import FastAPI, File, UploadFile |
from fastapi.staticfiles import StaticFiles |
from fastapi.responses import FileResponse |
from PIL import Image |
import numpy as np |
import urllib.request |
import io |
import os |
from moviepyTest import test |
from typing import * |
from fastapi.responses import PlainTextResponse |
import subprocess |
from fastapi import BackgroundTasks |
import time |
app = FastAPI() |
@app.get("/inference") |
def inference(): |
return "<p>Hello, World!</p>" |
@app.get("/infer_t5") |
def t5(input): |
return {"output": input} |
@app.get("/moviepyTest") |
def t5(): |
result = test() |
return {"output": result} |
@app.post("/getOriginalMangaList") |
async def getOriginalMangaList(images: List[UploadFile] = File(...)): |
for idx, image in enumerate(images): |
img = await image.read() |
image = Image.open(io.BytesIO(img)).convert("L").convert("RGB") |
path_to_image = f"/manga/{idx}.jpg" |
image.save(path_to_image) |
return "获取图片保存成功" |
@app.delete("/deleteFiles") |
async def delete_files(directory: str): |
for filename in os.listdir(directory): |
file_path = os.path.join(directory, filename) |
if os.path.isfile(file_path): |
os.remove(file_path) |
return {"message": f"成功删除{directory}目录下的所有文件"} |
def file_executer(file_name:str)->"执行返回": |
try: |
print("开始执行py任务",file_name) |
result = subprocess.check_output(["python", f"{file_name}.py"]).decode("utf-8") |
print("执行完成py任务:",file_name,"结果如下") |
print(result) |
return PlainTextResponse(result) |
except subprocess.CalledProcessError as e: |
print("执行py任务失败",file_name,"原因如下") |
print(f"Error executing {file_name}.py: {e}") |
return PlainTextResponse(f"Error executing {file_name}.py: {e}") |
@app.get("/execute_py_file/{file_name}") |
async def execute_py_file(file_name: str,background_tasks: BackgroundTasks): |
result = "接受到了请求{filename}任务".format(filename = file_name) |
background_tasks.add_task(file_executer,file_name) |
return result |
@app.get("/execute_all_task") |
async def execute_all_task(background_tasks: BackgroundTasks): |
background_tasks.add_task(file_executer,"1removeMask") |
background_tasks.add_task(file_executer,"2magiDialogCut") |
background_tasks.add_task(file_executer,"3mergeDialogToVideo") |
return {"message": "Tasks added to the queue"} |
def someTask(): |
time.sleep(20) |
print("睡眠20s结束") |
@app.get("/backTaskTest") |
def returnRandomSubscribeUrl(background_tasks: BackgroundTasks)->str: |
result = "先返回" |
background_tasks.add_task(someTask) |
return result |