|
from fastapi import FastAPI, File, UploadFile,Body,Query,Response,Path |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.responses import FileResponse |
|
from PIL import Image |
|
import numpy as np |
|
import urllib.request |
|
import io |
|
import os |
|
from typing import * |
|
from fastapi.responses import PlainTextResponse |
|
import subprocess |
|
from fastapi import BackgroundTasks |
|
import time |
|
from bilibili_api import sync, video_uploader, Credential |
|
import asyncio |
|
import json |
|
import bilibili_api |
|
import random |
|
import requests |
|
|
|
sessdata = os.getenv('sessdata') |
|
bili_jct = os.getenv('bili_jct') |
|
buvid3 = os.getenv('buvid3') |
|
app = FastAPI() |
|
|
|
|
|
@app.get("/inference") |
|
def inference(): |
|
return "<p>Hello, World!</p>" |
|
|
|
@app.get("/infer_t5") |
|
def t5(input): |
|
|
|
return {"output": input} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/getOriginalMangaList") |
|
def getOriginalMangaList(images: List[UploadFile] = File(...), save_path: str = "/manga"): |
|
saved_files = [] |
|
for idx, image in enumerate(images): |
|
img = image.file.read() |
|
image_data = Image.open(io.BytesIO(img)).convert("L").convert("RGB") |
|
path_to_image = os.path.join(save_path, f"{idx}.jpg") |
|
image_data.save(path_to_image) |
|
saved_files.append(path_to_image) |
|
return {"message": "Images saved successfully", "saved_files": saved_files} |
|
|
|
|
|
|
|
@app.delete("/deleteFiles") |
|
async def delete_all_files_and_folders(directory:str): |
|
for root, dirs, files in os.walk(directory, topdown=False): |
|
for name in files: |
|
file_path = os.path.join(root, name) |
|
print("删除指定目录函数检查到文件,删除文件:", file_path) |
|
os.remove(file_path) |
|
for name in dirs: |
|
dir_path = os.path.join(root, name) |
|
print("删除指定目录函数检查到目录,删除目录:", dir_path) |
|
os.rmdir(dir_path) |
|
return {"message": f"成功删除{directory}目录下的所有文件"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
def file_executer(file_name:str)->"执行返回": |
|
try: |
|
print("开始执行py任务",file_name) |
|
|
|
|
|
|
|
output = [] |
|
process = subprocess.Popen(["python", f"{file_name}.py"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
|
|
|
for line in iter(process.stdout.readline, b''): |
|
line_text = line.decode('utf-8').strip() |
|
print(line_text) |
|
output.append(line_text) |
|
process.wait() |
|
result = '\n'.join(output) |
|
|
|
|
|
print("执行完成py任务:",file_name,"结果如下") |
|
print(result) |
|
return PlainTextResponse(result) |
|
except subprocess.CalledProcessError as e: |
|
print("执行py任务失败",file_name,"原因如下") |
|
print(f"Error executing {file_name}.py: {e}") |
|
return PlainTextResponse(f"Error executing {file_name}.py: {e}") |
|
|
|
@app.get("/execute_py_file/{file_name}") |
|
async def execute_py_file(file_name: str,background_tasks: BackgroundTasks): |
|
result = "接受到了请求{filename}任务".format(filename = file_name) |
|
background_tasks.add_task(file_executer,file_name) |
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/execute_all_task") |
|
async def execute_all_task(background_tasks: BackgroundTasks, bili_meta_data,file_list: List[str] = Query(["0filterImage","1removeMask", "2magiDialogCut", "3mergeDialogToVideo"]), |
|
mp4_out_file: str = 'mp4_out/output.mp4', |
|
allow_submit: bool = False |
|
): |
|
|
|
print("获取到的用户bili_meta_data是:", bili_meta_data) |
|
|
|
bili_meta_data = json.loads(bili_meta_data) |
|
print("获取到的file_list是",file_list) |
|
print("获取到的allow_submit是",allow_submit) |
|
|
|
background_tasks.add_task(process_tasks_and_upload, file_list, bili_meta_data, allow_submit,background_tasks) |
|
return {"message": "提交成功"} |
|
|
|
def process_tasks_and_upload(file_list, bili_meta_data, allow_submit,background_tasks): |
|
tasks = [] |
|
for file_name in file_list: |
|
tasks.append(file_executer(file_name)) |
|
|
|
if allow_submit: |
|
|
|
print("bili的提交进程进行加入后台任务") |
|
background_tasks.add_task(upload_video,bili_meta_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def someTask(): |
|
time.sleep(20) |
|
print("睡眠20s结束") |
|
|
|
@app.get("/backTaskTest") |
|
def returnRandomSubscribeUrl(background_tasks: BackgroundTasks)->str: |
|
|
|
result = "先返回" |
|
background_tasks.add_task(someTask) |
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
async def upload_video(meta:dict): |
|
|
|
max_attempts = 5 |
|
attempts = 0 |
|
while attempts < max_attempts: |
|
try: |
|
await upload_with_random_cover(meta); |
|
except Exception as e: |
|
print("上传video遇到错误,可能是cover不符合超时,下面将调用upload_with_random_cover函数") |
|
print(f"Attempt 第{attempts+1}次重新上传: 本次错误原因是Error : {e}") |
|
|
|
attempts += 1 |
|
else: |
|
print("No errors occurred") |
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/get_video") |
|
async def get_video(dir:str = "/mp4_out/output.mp4")->"video file in header": |
|
video_file_path = dir |
|
video_file = open(video_file_path, "rb") |
|
video_data = video_file.read() |
|
video_file.close() |
|
|
|
response = Response(content=video_data, media_type="video/mp4") |
|
response.headers["Content-Disposition"] = "attachment; filename=output.mp4" |
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
def zero_pad(s, length): |
|
return s.zfill(length) |
|
|
|
|
|
|
|
@app.get("/upload_again") |
|
async def upload_again(bili_meta_data:str): |
|
bili_meta_data = json.loads(bili_meta_data) |
|
await upload_with_random_cover(bili_meta_data) |
|
|
|
|
|
|
|
async def upload_with_random_cover(bili_meta_data:dict): |
|
meta = bili_meta_data |
|
credential = Credential(sessdata=sessdata, |
|
bili_jct=bili_jct, |
|
buvid3=buvid3) |
|
|
|
|
|
img_path = 'manga12' |
|
|
|
subdir_path = os.path.join(os.getcwd(), img_path) |
|
|
|
image_files = [] |
|
for root, dirs, files in os.walk(subdir_path): |
|
for file in files: |
|
if file.endswith(".jpg") or file.endswith(".png"): |
|
image_files.append(os.path.relpath(os.path.join(root, file))) |
|
|
|
image_files.sort( |
|
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3)) |
|
|
|
|
|
random_image = Image.open(random.choice(image_files)) |
|
|
|
save_path = os.path.join("cover", "0.jpg") |
|
|
|
random_image.save(save_path) |
|
|
|
with open("cover/0.jpg", "rb") as f: |
|
cover_data = f.read() |
|
cover = bilibili_api.utils.picture.Picture.from_content(cover_data,"jpg") |
|
|
|
|
|
|
|
page = video_uploader.VideoUploaderPage(path='mp4_out/output_video.mp4', title=meta['title'], description=meta['desc']) |
|
|
|
uploader = video_uploader.VideoUploader([page], meta, credential, cover=cover) |
|
|
|
@uploader.on("__ALL__") |
|
async def ev(data): |
|
print(data) |
|
|
|
await uploader.start() |
|
|
|
|
|
|
|
|
|
@app.get("/cover.jpg") |
|
def get_cover_image(): |
|
cover_dir = "cover" |
|
file_path = os.path.join(cover_dir, "0.jpg") |
|
return FileResponse(file_path) |
|
|
|
|
|
|
|
|
|
@app.get("/get_image/{directory}/{file_name}") |
|
def get_image(directory: str, file_name: str = Path(...)): |
|
image_dir = f"/{directory}" |
|
file_path = os.path.join(image_dir, file_name) |
|
return FileResponse(file_path) |
|
|