Spaces:
Sleeping
Sleeping
""" | |
bilibili_api.utils.upos | |
""" | |
import os | |
import json | |
import httpx | |
import asyncio | |
from asyncio.tasks import create_task | |
from .utils import get_api | |
from .network import get_session | |
from ..exceptions.NetworkException import NetworkException | |
from ..exceptions.ResponseCodeException import ResponseCodeException | |
from ..exceptions.ApiException import ApiException | |
class UposFile: | |
""" | |
Upos 文件对象 | |
""" | |
path: str | |
size: int | |
def __init__(self, path: str) -> None: | |
self.path = path | |
self.size = self._get_size() | |
def _get_size(self) -> int: | |
""" | |
获取文件大小 | |
Returns: | |
int: 文件大小 | |
""" | |
size: int = 0 | |
stream = open(self.path, "rb") | |
while True: | |
s: bytes = stream.read(1024) | |
if not s: | |
break | |
size += len(s) | |
stream.close() | |
return size | |
class UposFileUploader: | |
""" | |
Upos 文件上传 | |
""" | |
_upload_id: str | |
_upload_url: str | |
_session: httpx.AsyncClient | |
def __init__(self, file: UposFile, preupload: dict) -> None: | |
self.file = file | |
self.preupload = preupload | |
self._upload_id = preupload["upload_id"] | |
self._upload_url = f'https:{preupload["endpoint"]}/{preupload["upos_uri"].removeprefix("upos://")}' | |
self._session = get_session() | |
async def upload(self) -> dict: | |
""" | |
上传文件 | |
Returns: | |
dict: filename, cid | |
""" | |
page_size = self.file.size | |
# 所有分块起始位置 | |
chunk_offset_list = list(range(0, page_size, self.preupload["chunk_size"])) | |
# 分块总数 | |
total_chunk_count = len(chunk_offset_list) | |
# 并发上传分块 | |
chunk_number = 0 | |
# 上传队列 | |
chunks_pending = [] | |
for offset in chunk_offset_list: | |
chunks_pending.insert( | |
0, | |
self._upload_chunk(offset, chunk_number, total_chunk_count), | |
) | |
chunk_number += 1 | |
while chunks_pending: | |
tasks = [] | |
while len(tasks) < self.preupload["threads"] and len(chunks_pending) > 0: | |
tasks.append(create_task(chunks_pending.pop())) | |
result = await asyncio.gather(*tasks) | |
for r in result: | |
if not r["ok"]: | |
chunks_pending.insert( | |
0, | |
self._upload_chunk( | |
r["offset"], | |
r["chunk_number"], | |
total_chunk_count, | |
), | |
) | |
data = await self._complete_file(total_chunk_count) | |
return data | |
async def _upload_chunk( | |
self, | |
offset: int, | |
chunk_number: int, | |
total_chunk_count: int, | |
) -> dict: | |
""" | |
上传视频分块 | |
Args: | |
offset (int): 分块起始位置 | |
chunk_number (int): 分块编号 | |
total_chunk_count (int): 总分块数 | |
Returns: | |
dict: 上传结果和分块信息。 | |
""" | |
chunk_event_callback_data = { | |
"offset": offset, | |
"chunk_number": chunk_number, | |
"total_chunk_count": total_chunk_count, | |
} | |
stream = open(self.file.path, "rb") | |
stream.seek(offset) | |
chunk = stream.read(self.preupload["chunk_size"]) | |
stream.close() | |
err_return = { | |
"ok": False, | |
"chunk_number": chunk_number, | |
"offset": offset, | |
} | |
real_chunk_size = len(chunk) | |
params = { | |
"partNumber": str(chunk_number + 1), | |
"uploadId": str(self._upload_id), | |
"chunk": str(chunk_number), | |
"chunks": str(total_chunk_count), | |
"size": str(real_chunk_size), | |
"start": str(offset), | |
"end": str(offset + real_chunk_size), | |
"total": self.file.size, | |
} | |
ok_return = { | |
"ok": True, | |
"chunk_number": chunk_number, | |
"offset": offset, | |
} | |
try: | |
resp = await self._session.put( | |
self._upload_url, | |
data=chunk, # type: ignore | |
params=params, | |
headers={"x-upos-auth": self.preupload["auth"]}, | |
) | |
if resp.status_code >= 400: | |
chunk_event_callback_data["info"] = f"Status {resp.status_code}" | |
return err_return | |
data = resp.text | |
if data != "MULTIPART_PUT_SUCCESS" and data != "": | |
chunk_event_callback_data["info"] = "分块上传失败" | |
return err_return | |
except Exception as e: | |
chunk_event_callback_data["info"] = str(e) | |
return err_return | |
return ok_return | |
async def _complete_file(self, chunks: int) -> dict: | |
""" | |
提交文件 | |
Args: | |
chunks (int): 分块数量 | |
Returns: | |
dict: filename: 该分 P 的标识符,用于最后提交视频。cid: 分 P 的 cid | |
""" | |
data = { | |
"parts": list( | |
map(lambda x: {"partNumber": x, "eTag": "etag"}, range(1, chunks + 1)) | |
) | |
} | |
params = { | |
"output": "json", | |
"name": os.path.basename(os.path.split(self.file.path)[1]), | |
"profile": "ugcfx/bup", | |
"uploadId": self._upload_id, | |
"biz_id": self.preupload["biz_id"], | |
} | |
resp = await self._session.post( | |
url=self._upload_url, | |
data=json.dumps(data), # type: ignore | |
headers={ | |
"x-upos-auth": self.preupload["auth"], | |
"content-type": "application/json; charset=UTF-8", | |
}, | |
params=params, | |
) | |
if resp.status_code >= 400: | |
err = NetworkException(resp.status_code, "状态码错误,提交分 P 失败") | |
raise err | |
data = json.loads(resp.read()) | |
if data["OK"] != 1: | |
err = ResponseCodeException(-1, f'提交分 P 失败,原因: {data["message"]}') | |
raise err |