Spaces:
Sleeping
Sleeping
import re | |
from enum import Enum | |
from inspect import isclass | |
from inspect import isfunction as isFn | |
from inspect import iscoroutinefunction as isAsync | |
from typing import Any, Dict, List, Tuple, Optional | |
import bilibili_api | |
FUNC = re.compile(r"[^\.\(]+") | |
ARGS = re.compile(r"[^,\(\)]*[,\)]") | |
SENTENCES = re.compile(r"\w+(?:\([^\)]*\))?\.?") | |
OPS = { | |
":int": int, | |
":float": float, | |
":bool": lambda s: s == "True", | |
} | |
class Parser: | |
""" | |
解析器 | |
""" | |
def __init__(self, params: Dict[str, str]): | |
self.valid = True | |
self.params = params | |
async def __aenter__(self): | |
""" | |
解析前准备 | |
把 params 中的参数先解析了 | |
""" | |
for key, val in self.params.items(): | |
obj, err = await self.parse(val) | |
if err is None: | |
if isinstance(obj, bilibili_api.Credential): | |
self.valid = await bilibili_api.check_valid(obj) | |
self.params[key] = obj | |
return self | |
async def __aexit__(self, type, value, trace): | |
... | |
async def transform(self, var: str) -> Any: | |
""" | |
类型装换函数 | |
通过在字符串后加上 `:int` `:float` `:bool` `:parse` 等操作符来实现 | |
Args: | |
var (str): 需要转换的字符串 | |
Returns: | |
Any: 装换结果 | |
""" | |
for key, fn in OPS.items(): | |
if var.endswith(key): | |
return fn(var.replace(key, "")) | |
if var.endswith(":parse"): | |
obj, err = await self.parse(var.replace(":parse", "")) | |
if err is None: | |
return obj | |
# 是请求时 params 中定义的变量 获取出来 不是的话返回原字符 | |
return self.params.get(var, var) | |
async def parse(self, path: str) -> Tuple[Any, Optional[str]]: | |
""" | |
分析指令 | |
Args: | |
path (str): 需要解析的 token 对应库中的路径 | |
Returns: | |
Any: 最终数据 若解析失败为 None | |
str: 错误信息 若解析成功为 None | |
""" | |
# 纯数字 | |
if path.replace(":int", "").replace(":float", "").replace(".", "").replace("-", "").isdigit(): | |
return await self.transform(path), None | |
# 指令列表 | |
sentences = SENTENCES.findall(path) | |
# 起始点 | |
position: Any = bilibili_api | |
async def inner() -> Optional[str]: | |
""" | |
递归取值 | |
Returns: | |
str: 错误信息 若解析成功为 None | |
""" | |
nonlocal position | |
# 分解执行的函数名、参数、指名参数 | |
sentence = sentences.pop(0) | |
func: str = FUNC.findall(sentence)[0] | |
flags: List[str] = ARGS.findall(sentence) | |
args, kwargs = [], {} | |
for flag in flags: | |
# 去除句尾的逗号或小括号 | |
flag = flag[:-1] | |
if flag == "": | |
continue | |
# 通过判断是否有等号存入参数列表或指名参数字典 | |
arg = flag.split("=") | |
if len(arg) == 1: | |
args.append(await self.transform(arg[0])) | |
else: | |
kwargs[arg[0]] = await self.transform(arg[1]) | |
# print(position, func, args, kwargs) | |
# 开始转移 | |
if isinstance(position, dict): | |
position = position.get(func, None) | |
elif isinstance(position, list): | |
position = position[int(func)] | |
else: | |
position = getattr(position, func, None) | |
# 赋值参数 | |
if isAsync(position): | |
position = await position(*args, **kwargs) | |
elif isFn(position): | |
position = position(*args, **kwargs) | |
elif isclass(position) and not issubclass(position, Enum): | |
position = position(*args, **kwargs) | |
# 为空返回出错语句 | |
# 否则检查是否分析完全部语句 | |
# 是则返回 None 否继续递归 | |
if position is None: | |
return sentence | |
if len(sentences) == 0: | |
return None | |
return await inner() | |
msg = await inner() | |
return position, msg | |