rogerxavier's picture
Upload 258 files
0aee47a verified
"""
bilibili_api.utils.upos
"""
import os
import json
import httpx
import asyncio
from asyncio.tasks import create_task
from .utils import get_api
from .network import get_session
from ..exceptions.NetworkException import NetworkException
from ..exceptions.ResponseCodeException import ResponseCodeException
from ..exceptions.ApiException import ApiException
class UposFile:
"""
Upos 文件对象
"""
path: str
size: int
def __init__(self, path: str) -> None:
self.path = path
self.size = self._get_size()
def _get_size(self) -> int:
"""
获取文件大小
Returns:
int: 文件大小
"""
size: int = 0
stream = open(self.path, "rb")
while True:
s: bytes = stream.read(1024)
if not s:
break
size += len(s)
stream.close()
return size
class UposFileUploader:
"""
Upos 文件上传
"""
_upload_id: str
_upload_url: str
_session: httpx.AsyncClient
def __init__(self, file: UposFile, preupload: dict) -> None:
self.file = file
self.preupload = preupload
self._upload_id = preupload["upload_id"]
self._upload_url = f'https:{preupload["endpoint"]}/{preupload["upos_uri"].removeprefix("upos://")}'
self._session = get_session()
async def upload(self) -> dict:
"""
上传文件
Returns:
dict: filename, cid
"""
page_size = self.file.size
# 所有分块起始位置
chunk_offset_list = list(range(0, page_size, self.preupload["chunk_size"]))
# 分块总数
total_chunk_count = len(chunk_offset_list)
# 并发上传分块
chunk_number = 0
# 上传队列
chunks_pending = []
for offset in chunk_offset_list:
chunks_pending.insert(
0,
self._upload_chunk(offset, chunk_number, total_chunk_count),
)
chunk_number += 1
while chunks_pending:
tasks = []
while len(tasks) < self.preupload["threads"] and len(chunks_pending) > 0:
tasks.append(create_task(chunks_pending.pop()))
result = await asyncio.gather(*tasks)
for r in result:
if not r["ok"]:
chunks_pending.insert(
0,
self._upload_chunk(
r["offset"],
r["chunk_number"],
total_chunk_count,
),
)
data = await self._complete_file(total_chunk_count)
return data
async def _upload_chunk(
self,
offset: int,
chunk_number: int,
total_chunk_count: int,
) -> dict:
"""
上传视频分块
Args:
offset (int): 分块起始位置
chunk_number (int): 分块编号
total_chunk_count (int): 总分块数
Returns:
dict: 上传结果和分块信息。
"""
chunk_event_callback_data = {
"offset": offset,
"chunk_number": chunk_number,
"total_chunk_count": total_chunk_count,
}
stream = open(self.file.path, "rb")
stream.seek(offset)
chunk = stream.read(self.preupload["chunk_size"])
stream.close()
err_return = {
"ok": False,
"chunk_number": chunk_number,
"offset": offset,
}
real_chunk_size = len(chunk)
params = {
"partNumber": str(chunk_number + 1),
"uploadId": str(self._upload_id),
"chunk": str(chunk_number),
"chunks": str(total_chunk_count),
"size": str(real_chunk_size),
"start": str(offset),
"end": str(offset + real_chunk_size),
"total": self.file.size,
}
ok_return = {
"ok": True,
"chunk_number": chunk_number,
"offset": offset,
}
try:
resp = await self._session.put(
self._upload_url,
data=chunk, # type: ignore
params=params,
headers={"x-upos-auth": self.preupload["auth"]},
)
if resp.status_code >= 400:
chunk_event_callback_data["info"] = f"Status {resp.status_code}"
return err_return
data = resp.text
if data != "MULTIPART_PUT_SUCCESS" and data != "":
chunk_event_callback_data["info"] = "分块上传失败"
return err_return
except Exception as e:
chunk_event_callback_data["info"] = str(e)
return err_return
return ok_return
async def _complete_file(self, chunks: int) -> dict:
"""
提交文件
Args:
chunks (int): 分块数量
Returns:
dict: filename: 该分 P 的标识符,用于最后提交视频。cid: 分 P 的 cid
"""
data = {
"parts": list(
map(lambda x: {"partNumber": x, "eTag": "etag"}, range(1, chunks + 1))
)
}
params = {
"output": "json",
"name": os.path.basename(os.path.split(self.file.path)[1]),
"profile": "ugcfx/bup",
"uploadId": self._upload_id,
"biz_id": self.preupload["biz_id"],
}
resp = await self._session.post(
url=self._upload_url,
data=json.dumps(data), # type: ignore
headers={
"x-upos-auth": self.preupload["auth"],
"content-type": "application/json; charset=UTF-8",
},
params=params,
)
if resp.status_code >= 400:
err = NetworkException(resp.status_code, "状态码错误,提交分 P 失败")
raise err
data = json.loads(resp.read())
if data["OK"] != 1:
err = ResponseCodeException(-1, f'提交分 P 失败,原因: {data["message"]}')
raise err