moviepy_with_manga_test / bilibili_api /interactive_video.py
rogerxavier's picture
Upload 258 files
0aee47a verified
"""
bilibili_api.interactive_video
互动视频相关操作
"""
# pylint: skip-file
import os
import copy
import enum
import json
import time
import shutil
import zipfile
from urllib import parse
from random import randint as rand
from asyncio import CancelledError, create_task
from typing import List, Tuple, Union, Callable, Coroutine
import httpx
from . import settings
from .video import Video
from .utils.utils import get_api
from .utils.AsyncEvent import AsyncEvent
from .utils.credential import Credential
from .utils.network import Api
API = get_api("interactive_video")
class InteractiveButtonAlign(enum.Enum):
"""
按钮的文字在按钮中的位置
``` text
-----
|xxx|----o (TEXT_LEFT)
-----
-----
o----|xxx| (TEXT_RIGHT)
-----
----------
|XXXXXXXX| (DEFAULT)
----------
```
- DEFAULT
- TEXT_UP
- TEXT_RIGHT
- TEXT_DOWN
- TEXT_LEFT
"""
DEFAULT = 0
TEXT_UP = 1
TEXT_RIGHT = 2
TEXT_DOWN = 3
TEXT_LEFT = 4
class InteractiveNodeJumpingType(enum.Enum):
"""
对下一节点的跳转的方式
- ASK : 选择
- DEFAULT: 跳转到默认节点
- READY : 选择(只有一个选择)
"""
READY = 1
DEFAULT = 0
ASK = 2
class InteractiveVariable:
"""
互动节点的变量
"""
def __init__(
self,
name: str,
var_id: str,
var_value: int,
show: bool = False,
random: bool = False,
):
"""
Args:
name (str) : 变量名
var_id (str) : 变量 id
var_value (int) : 变量的值
show (bool) : 是否显示
random (bool) : 是否为随机值(1-100)
"""
self.__var_id = var_id
self.__var_value = var_value
self.__name = name
self.__is_show = show
self.__random = random
def get_id(self) -> str:
return self.__var_id
def refresh_value(self) -> None:
"""
刷新变量数值
"""
if self.is_random():
self.__var_value = int(rand(0, 100))
def get_value(self) -> int:
return self.__var_value
def is_show(self) -> bool:
return self.__is_show
def is_random(self) -> bool:
return self.__random
def get_name(self) -> str:
return self.__name
def __str__(self):
return f"{self.__name} {self.__var_value}"
class InteractiveButton:
"""
互动视频节点按钮类
"""
def __init__(
self,
text: str,
x: int,
y: int,
align: Union[InteractiveButtonAlign, int] = InteractiveButtonAlign.DEFAULT,
):
"""
Args:
text (str) : 文字
x (int) : x 轴
y (int) : y 轴
align (InteractiveButtonAlign | int): 按钮的文字在按钮中的位置
"""
self.__text = text
self.__pos = (x, y)
if isinstance(align, InteractiveButtonAlign):
align = align.value
self.__align = align
def get_text(self) -> str:
return self.__text
def get_align(self) -> int:
return self.__align # type: ignore
def get_pos(self) -> Tuple[int, int]:
return self.__pos
def __str__(self):
return f"{self.__text} {self.__pos}"
class InteractiveJumpingCondition:
"""
节点跳转的公式,只有公式成立才会跳转
"""
def __init__(self, var: List[InteractiveVariable] = [], condition: str = "True"):
"""
Args:
var (List[InteractiveVariable]): 所有变量
condition (str) : 公式
"""
self.__vars = var
self.__command = condition
def get_result(self) -> bool:
"""
计算公式获得结果
Returns:
bool: 是否成立
"""
if self.__command == "":
return True
command = copy.copy(self.__command)
for var in self.__vars:
var_name = var.get_id()
var_value = var.get_value()
command = command.replace(var_name, str(var_value))
command = command.replace("&&", " and ")
command = command.replace("||", " or ")
command = command.replace("!", " not ")
command = command.replace("===", "==")
command = command.replace("!==", "!=")
command = command.replace("true", "True")
command = command.replace("false", "False")
return eval(command)
def __str__(self):
return f"{self.__command}"
class InteractiveJumpingCommand:
"""
节点跳转对变量的操作
"""
def __init__(self, var: List[InteractiveVariable] = [], command: str = ""):
"""
Args:
var (List[InteractiveVariable]): 所有变量
command (str) : 公式
"""
self.__vars = var
self.__command = command
def run_command(self) -> List["InteractiveVariable"]:
"""
执行操作
Returns:
List[InteractiveVariable]
"""
if self.__command == "":
return self.__vars
for code in self.__command.split(";"):
var_name_ = code.split("=")[0]
var_new_value = code.split("=")[1]
for var in self.__vars:
var_name = var.get_id()
var_value = var.get_value()
var_new_value = var_new_value.replace(var_name, str(var_value))
var_new_value_calc = eval(var_new_value)
for var in self.__vars:
if var.get_id() == var_name_:
var._InteractiveVariable__var_value = var_new_value_calc # type: ignore
return self.__vars
class InteractiveNode:
"""
互动视频节点类
"""
def __init__(
self,
video: "InteractiveVideo",
node_id: int,
cid: int,
vars: List[InteractiveVariable],
button: Union[InteractiveButton, None] = None,
condition: InteractiveJumpingCondition = InteractiveJumpingCondition(),
native_command: InteractiveJumpingCommand = InteractiveJumpingCommand(),
is_default: bool = False,
):
"""
Args:
video (InteractiveVideo) : 视频类
node_id (int) : 节点 id
cid (int) : CID
vars (List[InteractiveVariable]) : 变量
button (InteractiveButton) : 对应的按钮
condition (InteractiveJumpingCondition): 跳转公式
native_command (InteractiveJumpingCommand) : 跳转时变量操作
is_default (bool) : 是不是默认的跳转的节点
"""
self.__parent = video
self.__id = node_id
self.__cid = cid
self.__button = button
self.__jumping_command = condition
self.__is_default = is_default
self.__vars = vars
self.__command = native_command
self.__vars = self.__command.run_command()
async def get_vars(self) -> List[InteractiveVariable]:
"""
获取节点的所有变量
Returns:
List[InteractiveVariable]: 节点的所有变量
"""
return self.__vars
async def get_children(self) -> List["InteractiveNode"]:
"""
获取节点的所有子节点
Returns:
List[InteractiveNode]: 所有子节点
"""
edge_info = await self.__parent.get_edge_info(self.__id)
nodes = []
if edge_info["edges"].get("questions") == None:
return []
for node in edge_info["edges"]["questions"][0]["choices"]:
node_id = node["id"]
node_cid = node["cid"]
if "text_align" in node.keys():
text_align = node["text_align"]
else:
text_align = 0
if "option" in node.keys():
node_button = InteractiveButton(
node["option"], node.get("x"), node.get("y"), text_align
)
else:
node_button = None
node_condition = InteractiveJumpingCondition(
await self.get_vars(), node["condition"]
)
node_command = InteractiveJumpingCommand(
await self.get_vars(), node["native_action"]
)
if "is_default" in node.keys():
node_is_default = node["is_default"]
else:
node_is_default = False
node_vars = copy.deepcopy(await self.get_vars())
nodes.append(
InteractiveNode(
self.__parent,
node_id,
node_cid,
node_vars,
node_button,
node_condition,
node_command,
node_is_default,
)
)
return nodes
def is_default(self) -> bool:
return self.__is_default
async def get_jumping_type(self) -> int:
"""
获取子节点跳转方式 (参考 InteractiveNodeJumpingType)
"""
edge_info = await self.__parent.get_edge_info(self.__id)
return edge_info["edges"]["questions"][0]["type"]
def get_node_id(self) -> int:
return self.__id
def get_cid(self) -> int:
return self.__cid
def get_self_button(self) -> "InteractiveButton":
if self.__button == None:
return InteractiveButton("", -1, -1)
return self.__button
def get_jumping_condition(self) -> "InteractiveJumpingCondition":
return self.__jumping_command
def get_video(self) -> "InteractiveVideo":
return self.__parent
async def get_info(self) -> dict:
"""
获取节点的简介
Returns:
dict: 调用 API 返回的结果
"""
return await self.__parent.get_edge_info(self.__id)
def __str__(self):
return f"{self.get_node_id()}"
class InteractiveGraph:
"""
情节树类
"""
def __init__(self, video: "InteractiveVideo", skin: dict, root_cid: int):
"""
Args:
video (InteractiveVideo): 互动视频类
skin (dict) : 样式
root_cid (int) : 根节点 CID
"""
self.__parent = video
self.__skin = skin
self.__node = InteractiveNode(self.__parent, 1, root_cid, [])
def get_video(self) -> "InteractiveVideo":
return self.__parent
def get_skin(self) -> dict:
return self.__skin
async def get_root_node(self) -> "InteractiveNode":
"""
获取根节点
Returns:
InteractiveNode: 根节点
"""
edge_info = await self.__parent.get_edge_info(None)
if "hidden_vars" in edge_info.keys():
node_vars = edge_info["hidden_vars"]
else:
node_vars = []
var_list = []
for var in node_vars:
var_value = var["value"]
var_name = var["name"]
var_show = var["is_show"]
var_id = var["id_v2"]
if var["type"] == 2:
random = True
else:
random = False
var_list.append(
InteractiveVariable(var_name, var_id, var_value, var_show, random)
)
self.__node._InteractiveNode__command = InteractiveJumpingCommand( # type: ignore
var_list
)
self.__node._InteractiveNode__vars = var_list # type: ignore
return self.__node
async def get_children(self) -> List["InteractiveNode"]:
"""
获取子节点
Returns:
List[InteractiveNode]: 子节点
"""
return await self.__node.get_children()
class InteractiveVideo(Video):
"""
互动视频类
"""
def __init__(self, bvid=None, aid=None, credential=None):
super().__init__(bvid, aid, credential)
async def up_get_ivideo_pages(self) -> dict:
"""
获取交互视频的分 P 信息。up 主需要拥有视频所有权。
Returns:
dict: 调用 API 返回的结果
"""
credential = self.credential if self.credential else Credential()
api = API["info"]["videolist"]
params = {"bvid": self.get_bvid()}
return await Api(**api, credential=credential).update_params(**params).result
async def up_submit_story_tree(self, story_tree: str) -> dict:
"""
上传交互视频的情节树。up 主需要拥有视频所有权。
Args:
story_tree (str): 情节树的描述,参考 bilibili_storytree.StoryGraph, 需要 Serialize 这个结构
Returns:
dict: 调用 API 返回的结果
"""
credential = self.credential if self.credential else Credential()
api = API["operate"]["savestory"]
form_data = {"preview": "0", "data": story_tree, "csrf": credential.bili_jct}
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "https://member.bilibili.com",
"Content-Encoding": "gzip, deflate, br",
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json, text/plain, */*",
}
data = parse.urlencode(form_data)
return (
await Api(**api, credential=credential, no_csrf=True)
.update_data(**data)
.update_headers(**headers)
.result
)
async def get_graph_version(self) -> int:
"""
获取剧情图版本号,仅供 `get_edge_info()` 使用。
Returns:
int: 剧情图版本号
"""
# 取得初始顶点 cid
bvid = self.get_bvid()
credential = self.credential if self.credential else Credential()
v = Video(bvid=bvid, credential=credential)
page_list = await v.get_pages()
cid = page_list[0]["cid"]
# 获取剧情图版本号
url = "https://api.bilibili.com/x/player/v2"
params = {"bvid": bvid, "cid": cid}
resp = (
await Api(method="GET", url=url, credential=credential)
.update_params(**params)
.result
)
return resp["interaction"]["graph_version"]
async def get_edge_info(self, edge_id: Union[int, None] = None):
"""
获取剧情图节点信息
Args:
edge_id (int, optional) : 节点 ID,为 None 时获取根节点信息. Defaults to None.
Returns:
dict: 调用 API 返回的结果
"""
bvid = self.get_bvid()
credential = self.credential if self.credential is not None else Credential()
api = API["info"]["edge_info"]
params = {"bvid": bvid, "graph_version": (await self.get_graph_version())}
if edge_id is not None:
params["edge_id"] = edge_id
return await Api(**api, credential=credential).update_params(**params).result
async def mark_score(self, score: int = 5):
"""
为互动视频打分
Args:
score (int): 互动视频分数. Defaults to 5.
Returns:
dict: 调用 API 返回的结果
"""
self.credential.raise_for_no_sessdata()
self.credential.raise_for_no_bili_jct()
api = API["operate"]["mark_score"]
data = {"mark": score, "bvid": self.get_bvid()}
return await Api(**api, credential=self.credential).update_data(**data).result
async def get_cid(self) -> int:
"""
获取稿件 cid
"""
return await super().get_cid(0)
async def get_graph(self):
"""
获取稿件情节树
Returns:
InteractiveGraph: 情节树
"""
edge_info = await self.get_edge_info(1)
cid = await self.get_cid()
return InteractiveGraph(self, edge_info["edges"]["skin"], cid)
class InteractiveVideoDownloaderEvents(enum.Enum):
"""
互动视频下载器事件枚举
| event | meaning | IVI mode | NODE_VIDEOS mode | DOT_GRAPH mode | NO_PACKAGING mode | Is Built-In downloader event |
| ----- | ------- | -------- | ---------------- | -------------- | ----------------- | ------------------------- |
| START | 开始下载 | [x] | [x] | [x] | [x] | [ ] |
| GET | 获取到节点信息 | [x] | [x] | [x] | [x] | [ ] |
| PREPARE_DOWNLOAD | 准备下载单个节点 | [x] | [x] | [ ] | [x] | [ ] |
| DOWNLOAD_START | 开始下载单个文件 | Unknown | Unknown | [ ] | Unknown | [x] |
| DOWNLOAD_PART | 文件分块部分完成 | Unknown | Unknown | [ ] | Unknown | [x] |
| DOWNLOAD_SUCCESS | 完成下载 | Unknown | Unknown | [ ] | Unknown | [x] |
| PACKAGING | 正在打包 | [x] | [ ] | [ ] | [ ] | [ ] |
| SUCCESS | 下载成功 | [x] | [x] | [x] | [x] | [ ] |
| ABORTED | 用户暂停 | [x] | [x] | [x] | [x] | [ ] |
| FAILED | 下载失败 | [x] | [x] | [x] | [x] | [ ] |
"""
START = "START"
GET = "GET"
DOWNLOAD_START = "DOWNLOAD_START"
DOWNLOAD_PART = "DOWNLOAD_PART"
DOWNLOAD_SUCCESS = "DOWNLOAD_SUCCESS"
PACKAGING = "PACKAGING"
SUCCESS = "SUCCESS"
ABORTED = "ABORTED"
FAILED = "FAILED"
class InteractiveVideoDownloaderMode(enum.Enum):
"""
互动视频下载模式
- IVI: 下载可播放的 ivi 文件
- NODE_VIDEOS: 下载所有节点的所有视频并存放在某个文件夹,每一个节点的视频命名为 `{节点 id} {节点标题 (自动去除敏感字符)}.mp4`
- DOT_GRAPH: 下载 dot 格式的情节树图表
- NO_PACKAGING: 前面按照 ivi 文件下载步骤进行下载,但是最终不会打包成为 ivi 文件,所有文件将存放于一个文件夹中。互动视频数据将存放在一个文件夹中,里面的文件命名/含义与拆包后的 ivi 文件完全相同。
"""
IVI = "ivi"
NODE_VIDEOS = "videos"
DOT_GRAPH = "dot"
NO_PACKAGING = "no_pack"
class InteractiveVideoDownloader(AsyncEvent):
"""
互动视频下载类
"""
def __init__(
self,
video: InteractiveVideo,
out: str = "",
self_download_func: Union[Coroutine, None] = None,
downloader_mode: InteractiveVideoDownloaderMode = InteractiveVideoDownloaderMode.IVI,
):
"""
Args:
video (InteractiveVideo) : 互动视频类
out (str) : 输出文件地址 (如果模式为 NODE_VIDEOS/NO_PACKAGING 则此参数表示所有节点视频的存放目录)
self_download_func (Coroutine | None) : 自定义下载函数(需 async 函数)
downloader_mode (InteractiveVideoDownloaderMode): 下载模式
`self_download_func` 函数应接受两个参数(第一个是下载 URL,第二个是输出地址(精确至文件名))
"""
super().__init__()
self.__video = video
if self_download_func == None:
self.__download_func = self.__download
else:
self.__download_func = self_download_func
self.__task = None
self.__out = out
self.__mode = downloader_mode
async def __download(self, url: str, out: str) -> None:
resp = httpx.get(
url,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://www.bilibili.com",
},
proxies={"all://": settings.proxy},
stream=True,
)
resp.raise_for_status()
if os.path.exists(out):
os.remove(out)
parent = os.path.dirname(out)
if not os.path.exists(parent):
os.mkdir(parent)
self.dispatch("DOWNLOAD_START", {"url": url, "out": out})
all_length = int(resp.headers["Content-Length"])
parts = all_length // 1024 + (1 if all_length % 1024 != 0 else 0)
cnt = 0
start_time = time.perf_counter()
with open(out, "wb") as f:
for chunk in resp.iter_bytes(1024):
cnt += 1
self.dispatch(
"DOWNLOAD_PART",
{
"done": cnt,
"total": parts,
"time": int(time.perf_counter() - start_time),
},
)
f.write(chunk)
self.dispatch("DOWNLOAD_SUCCESS")
async def __main(self) -> None:
# 初始化
self.dispatch("START")
if self.__out == "":
self.__out = self.__video.get_bvid() + ".ivi"
if self.__out.endswith(".ivi"):
self.__out = self.__out.rstrip(".ivi")
if os.path.exists(self.__out + ".ivi"):
os.remove(self.__out + ".ivi")
tmp_dir_name = self.__out + ".tmp"
if not os.path.exists(tmp_dir_name):
os.mkdir(tmp_dir_name)
def createEdge(edge_id: int):
"""
创建节点信息到 edges_info
"""
edges_info[edge_id] = {
"title": None,
"cid": None,
"button": None,
"condition": None,
"jump_type": None,
"is_default": None,
"command": None,
"sub": [],
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
# 存储顶点信息
edges_info = {}
# 使用队列来遍历剧情图,初始为 None 是为了从初始顶点开始
queue: List[InteractiveNode] = [
await (await self.__video.get_graph()).get_root_node()
]
# 设置初始顶点
n = await (await self.__video.get_graph()).get_root_node()
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": (n.get_self_button().get_pos()),
}
edges_info[n.get_node_id()]["vars"] = [
var2dict(var) for var in (await n.get_vars())
]
edges_info[n.get_node_id()]["condition"] = (n.get_jumping_condition()._InteractiveJumpingCondition__command,) # type: ignore
edges_info[n.get_node_id()]["jump_type"] = 0
edges_info[n.get_node_id()]["is_default"] = True
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
while queue:
# 出队
now_node = queue.pop()
if (
now_node.get_node_id() in edges_info
and edges_info[now_node.get_node_id()]["title"] is not None
and edges_info[now_node.get_node_id()]["cid"] is not None
):
# 该情况为已获取到所有信息,说明是跳转到之前已处理的顶点,不作处理
continue
# 获取顶点信息,最大重试 3 次
retry = 3
while True:
try:
node = await now_node.get_info()
subs = await now_node.get_children()
self.dispatch(
"GET",
{"title": node["title"], "node_id": now_node.get_node_id()},
)
break
except Exception as e:
retry -= 1
if retry < 0:
raise e
# 检查节顶点是否在 edges_info 中,本次步骤得到 title 信息
if node["edge_id"] not in edges_info:
# 不在,新建
createEdge(node["edge_id"])
# 设置 title
edges_info[node["edge_id"]]["title"] = node["title"]
# 无可达顶点,即不能再往下走了,类似树的叶子节点
if "questions" not in node["edges"]:
continue
# 遍历所有可达顶点
for n in subs:
# 该步骤获取顶点的 cid(视频分 P 的 ID)
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": n.get_self_button().get_pos(),
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
edges_info[n.get_node_id()]["condition"] = n.get_jumping_condition()._InteractiveJumpingCondition__command # type: ignore
edges_info[n.get_node_id()][
"jump_type"
] = await now_node.get_jumping_type()
edges_info[n.get_node_id()]["is_default"] = n.is_default()
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
edges_info[now_node.get_node_id()]["sub"] = [
n.get_node_id() for n in subs
]
# 所有可达顶点 ID 入队
queue.insert(0, n)
json.dump(
edges_info,
open(tmp_dir_name + "/ivideo.json", "w+", encoding="utf-8"),
indent=2,
)
json.dump(
{
"bvid": self.__video.get_bvid(),
"title": (await self.__video.get_info())["title"],
},
open(tmp_dir_name + "/bilivideo.json", "w+", encoding="utf-8"),
indent=2,
)
cid_set = set()
for key, item in edges_info.items():
cid = item["cid"]
if not cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": item["cid"]})
cid_set.add(cid)
url = await self.__video.get_download_url(cid=cid, html5=True)
await self.__download_func(
url["durl"][0]["url"],
tmp_dir_name + "/" + str(cid) + ".mp4",
) # type: ignore
root_cid = await self.__video.get_cid()
if not root_cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": root_cid})
cid = await self.__video.get_cid()
url = await self.__video.get_download_url(cid=cid, html5=True)
title = (await self.__video.get_info())["title"]
await self.__download_func(
url["durl"][0]["url"], tmp_dir_name + "/" + str(cid) + ".mp4"
) # type: ignore
self.dispatch("PACKAGING")
zip = zipfile.ZipFile(
open(self.__out + ".ivi", "wb+"), mode="w", compression=zipfile.ZIP_DEFLATED
) # outFullName为压缩文件的完整路径
for path, dirnames, filenames in os.walk(tmp_dir_name):
# 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
fpath = path.replace(tmp_dir_name, "")
for filename in filenames:
zip.write(os.path.join(path, filename), os.path.join(fpath, filename))
zip.close()
shutil.rmtree(tmp_dir_name)
self.dispatch("SUCCESS")
async def __node_videos_main(self) -> None:
# 初始化
self.dispatch("START")
tmp_dir_name = self.__out
if not os.path.exists(tmp_dir_name):
os.mkdir(tmp_dir_name)
def createEdge(edge_id: int):
"""
创建节点信息到 edges_info
"""
edges_info[edge_id] = {
"title": None,
"cid": None,
"button": None,
"condition": None,
"jump_type": None,
"is_default": None,
"command": None,
"sub": [],
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
# 存储顶点信息
edges_info = {}
# 使用队列来遍历剧情图,初始为 None 是为了从初始顶点开始
queue: List[InteractiveNode] = [
await (await self.__video.get_graph()).get_root_node()
]
# 设置初始顶点
n = await (await self.__video.get_graph()).get_root_node()
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": (n.get_self_button().get_pos()),
}
edges_info[n.get_node_id()]["vars"] = [
var2dict(var) for var in (await n.get_vars())
]
edges_info[n.get_node_id()]["condition"] = (n.get_jumping_condition()._InteractiveJumpingCondition__command,) # type: ignore
edges_info[n.get_node_id()]["jump_type"] = 0
edges_info[n.get_node_id()]["is_default"] = True
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
while queue:
# 出队
now_node = queue.pop()
if (
now_node.get_node_id() in edges_info
and edges_info[now_node.get_node_id()]["title"] is not None
and edges_info[now_node.get_node_id()]["cid"] is not None
):
# 该情况为已获取到所有信息,说明是跳转到之前已处理的顶点,不作处理
continue
# 获取顶点信息,最大重试 3 次
retry = 3
while True:
try:
node = await now_node.get_info()
subs = await now_node.get_children()
self.dispatch(
"GET",
{"title": node["title"], "node_id": now_node.get_node_id()},
)
break
except Exception as e:
retry -= 1
if retry < 0:
raise e
# 检查节顶点是否在 edges_info 中,本次步骤得到 title 信息
if node["edge_id"] not in edges_info:
# 不在,新建
createEdge(node["edge_id"])
# 设置 title
edges_info[node["edge_id"]]["title"] = node["title"]
# 无可达顶点,即不能再往下走了,类似树的叶子节点
if "questions" not in node["edges"]:
continue
# 遍历所有可达顶点
for n in subs:
# 该步骤获取顶点的 cid(视频分 P 的 ID)
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": n.get_self_button().get_pos(),
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
edges_info[n.get_node_id()]["condition"] = n.get_jumping_condition()._InteractiveJumpingCondition__command # type: ignore
edges_info[n.get_node_id()][
"jump_type"
] = await now_node.get_jumping_type()
edges_info[n.get_node_id()]["is_default"] = n.is_default()
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
edges_info[now_node.get_node_id()]["sub"] = [
n.get_node_id() for n in subs
]
# 所有可达顶点 ID 入队
queue.insert(0, n)
cid_set = set()
for key, item in edges_info.items():
cid = item["cid"]
if not cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": item["cid"]})
cid_set.add(cid)
url = await self.__video.get_download_url(cid=cid, html5=True)
await self.__download_func(
url["durl"][0]["url"],
tmp_dir_name + "/" + str(key) + " " + item["title"] + ".mp4",
) # type: ignore
root_cid = await self.__video.get_cid()
if not root_cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": root_cid})
cid = await self.__video.get_cid()
url = await self.__video.get_download_url(cid=cid, html5=True)
title = (await self.__video.get_info())["title"]
await self.__download_func(
url["durl"][0]["url"], tmp_dir_name + "/1 " + title + ".mp4"
) # type: ignore
self.dispatch("SUCCESS")
async def __dot_graph_main(self) -> None:
self.dispatch("START")
if not self.__out.endswith(".dot"):
self.__out += ".dot"
class node_info:
node_id: int
subs: List[int]
cid: int
title: str
def __eq__(self, info: "node_info"):
self.subs.sort()
info.subs.sort()
return (
(info.subs == self.subs)
and (info.title == self.title)
and (info.cid == self.cid)
)
def __lt__(self, info: "node_info"):
return self.cid < info.cid
def __gt__(self, info: "node_info"):
return self.cid > info.cid
fetched_nodes_info: List[node_info] = []
node_info_dict = {}
scripts = []
graph = await self.__video.get_graph()
queue: List[InteractiveNode] = [await graph.get_root_node()]
while queue:
queue_backup = copy.copy(queue)
queue = []
for cur_node in queue_backup:
cur_node_info = await cur_node.get_info()
cur_node_children = await cur_node.get_children()
self.dispatch(
"GET",
{
"title": cur_node_info["title"],
"node_id": cur_node.get_node_id(),
},
)
cur_node_info_class = node_info()
cur_node_info_class.node_id = cur_node.get_node_id()
cur_node_info_class.cid = cur_node.get_cid()
cur_node_info_class.subs = [n.get_node_id() for n in cur_node_children]
cur_node_info_class.title = cur_node_info["title"]
back_to_pre = False
back_to_node_title = -1
for fetched_info in fetched_nodes_info:
if fetched_info == cur_node_info_class:
back_to_pre = True
back_to_node_title = fetched_info.title
if not back_to_pre:
node_info_dict[cur_node.get_node_id()] = cur_node_info_class
for cur_node_child in cur_node_children:
script_label = ""
if cur_node_child.get_jumping_condition()._InteractiveJumpingCondition__command != "": # type: ignore
script_label = script_label + "Condition: [" + cur_node_child.get_jumping_condition()._InteractiveJumpingCondition__command + "]" # type: ignore
if cur_node_child._InteractiveNode__command._InteractiveJumpingCommand__command != "": # type: ignore
script_label = script_label + "\nNative Command: [" + cur_node_child._InteractiveNode__command._InteractiveJumpingCommand__command + "]" # type: ignore
elif cur_node_child._InteractiveNode__command._InteractiveJumpingCommand__command != "": # type: ignore
script_label = script_label + "\nNative Command: [" + cur_node_child._InteractiveNode__command._InteractiveJumpingCommand__command + "]" # type: ignore
scripts.append(
{
"from": cur_node.get_node_id(),
"to": cur_node_child.get_node_id(),
"label": script_label,
}
)
queue.append(cur_node_child)
fetched_nodes_info.append(cur_node_info_class)
else:
node_info_dict[cur_node.get_node_id()] = f"跳转至 {back_to_node_title}"
graph_content = "digraph {\nfontname=FangSong\nnode [fontname=FangSong]\n"
for script in scripts:
graph_content += f'\t{script["from"]} -> {script["to"]}'
if script["label"] != "":
graph_content += f' [label="{script["label"]}"]\n'
else:
graph_content += "\n"
for node_info_key, node_info_item in node_info_dict.items():
if isinstance(node_info_item, node_info):
graph_content += f'\t{node_info_key} [label="{node_info_item.title}"]\n'
else:
graph_content += f'\t{node_info_key} [label="{node_info_item}"]\n'
vars_string = "Variables: "
for var in await (await graph.get_root_node()).get_vars():
var_attribute = ""
if var.is_random():
var_attribute = "Random"
else:
if var.is_show():
var_attribute = "Normal"
else:
var_attribute = "Hide"
vars_string += f"[{var.get_id()} -> {var.get_name()} = {var.get_value()}, {var_attribute}]\n"
graph_content += f'\tlabel="{vars_string}"'
graph_content += "}"
with open(self.__out, "w+", encoding="utf-8") as dot_file:
dot_file.write(graph_content)
self.dispatch("SUCCESS")
async def __no_packaging_main(self) -> None:
# 初始化
self.dispatch("START")
tmp_dir_name = self.__out
if not os.path.exists(tmp_dir_name):
os.mkdir(tmp_dir_name)
def createEdge(edge_id: int):
"""
创建节点信息到 edges_info
"""
edges_info[edge_id] = {
"title": None,
"cid": None,
"button": None,
"condition": None,
"jump_type": None,
"is_default": None,
"command": None,
"sub": [],
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
# 存储顶点信息
edges_info = {}
# 使用队列来遍历剧情图,初始为 None 是为了从初始顶点开始
queue: List[InteractiveNode] = [
await (await self.__video.get_graph()).get_root_node()
]
# 设置初始顶点
n = await (await self.__video.get_graph()).get_root_node()
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": (n.get_self_button().get_pos()),
}
edges_info[n.get_node_id()]["vars"] = [
var2dict(var) for var in (await n.get_vars())
]
edges_info[n.get_node_id()]["condition"] = (n.get_jumping_condition()._InteractiveJumpingCondition__command,) # type: ignore
edges_info[n.get_node_id()]["jump_type"] = 0
edges_info[n.get_node_id()]["is_default"] = True
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
while queue:
# 出队
now_node = queue.pop()
if (
now_node.get_node_id() in edges_info
and edges_info[now_node.get_node_id()]["title"] is not None
and edges_info[now_node.get_node_id()]["cid"] is not None
):
# 该情况为已获取到所有信息,说明是跳转到之前已处理的顶点,不作处理
continue
# 获取顶点信息,最大重试 3 次
retry = 3
while True:
try:
node = await now_node.get_info()
subs = await now_node.get_children()
self.dispatch(
"GET",
{"title": node["title"], "node_id": now_node.get_node_id()},
)
break
except Exception as e:
retry -= 1
if retry < 0:
raise e
# 检查节顶点是否在 edges_info 中,本次步骤得到 title 信息
if node["edge_id"] not in edges_info:
# 不在,新建
createEdge(node["edge_id"])
# 设置 title
edges_info[node["edge_id"]]["title"] = node["title"]
# 无可达顶点,即不能再往下走了,类似树的叶子节点
if "questions" not in node["edges"]:
continue
# 遍历所有可达顶点
for n in subs:
# 该步骤获取顶点的 cid(视频分 P 的 ID)
if n.get_node_id() not in edges_info:
createEdge(n.get_node_id())
edges_info[n.get_node_id()]["cid"] = n.get_cid()
edges_info[n.get_node_id()]["button"] = {
"text": n.get_self_button().get_text(),
"align": n.get_self_button().get_align(),
"pos": n.get_self_button().get_pos(),
}
def var2dict(var: InteractiveVariable):
return {
"name": var.get_name(),
"id": var.get_id(),
"value": var.get_value(),
"show": var.is_show(),
"random": var.is_random(),
}
edges_info[n.get_node_id()]["condition"] = n.get_jumping_condition()._InteractiveJumpingCondition__command # type: ignore
edges_info[n.get_node_id()][
"jump_type"
] = await now_node.get_jumping_type()
edges_info[n.get_node_id()]["is_default"] = n.is_default()
edges_info[n.get_node_id()]["command"] = n._InteractiveNode__command._InteractiveJumpingCommand__command # type: ignore
edges_info[now_node.get_node_id()]["sub"] = [
n.get_node_id() for n in subs
]
# 所有可达顶点 ID 入队
queue.insert(0, n)
json.dump(
edges_info,
open(tmp_dir_name + "/ivideo.json", "w+", encoding="utf-8"),
indent=2,
)
json.dump(
{
"bvid": self.__video.get_bvid(),
"title": (await self.__video.get_info())["title"],
},
open(tmp_dir_name + "/bilivideo.json", "w+", encoding="utf-8"),
indent=2,
)
cid_set = set()
for key, item in edges_info.items():
cid = item["cid"]
if not cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": item["cid"]})
cid_set.add(cid)
url = await self.__video.get_download_url(cid=cid, html5=True)
await self.__download_func(
url["durl"][0]["url"],
tmp_dir_name + "/" + str(key) + " " + item["title"] + ".mp4",
) # type: ignore
root_cid = await self.__video.get_cid()
if not root_cid in cid_set:
self.dispatch("PREPARE_DOWNLOAD", {"cid": root_cid})
cid = await self.__video.get_cid()
url = await self.__video.get_download_url(cid=cid, html5=True)
title = (await self.__video.get_info())["title"]
await self.__download_func(
url["durl"][0]["url"], tmp_dir_name + "/1 " + title + ".mp4"
) # type: ignore
self.dispatch("SUCCESS")
async def start(self) -> None:
"""
开始下载
"""
if self.__mode.value == "ivi":
task = create_task(self.__main())
elif self.__mode.value == "dot":
task = create_task(self.__dot_graph_main())
elif self.__mode.value == "no_pack":
task = create_task(self.__no_packaging_main())
else:
task = create_task(self.__node_videos_main())
self.__task = task
try:
result = await task
self.__task = None
return result
except CancelledError:
# 忽略 task 取消异常
pass
except Exception as e:
self.dispatch("FAILED", {"err": e})
raise e
async def abort(self) -> None:
"""
中断下载
"""
if self.__task:
self.__task.cancel("用户手动取消")
self.dispatch("ABORTED", None)
def get_ivi_file_meta(path: str) -> dict:
"""
获取 ivi 文件信息
Args:
path (str): 文件地址
Returns:
dict: 文件信息
"""
ivi = zipfile.ZipFile(open(path, "rb"))
info = ivi.open("bilivideo.json").read()
return json.loads(info)