rogerxavier's picture
Upload 258 files
0aee47a verified
raw
history blame
4.57 kB
import re
from enum import Enum
from inspect import isclass
from inspect import isfunction as isFn
from inspect import iscoroutinefunction as isAsync
from typing import Any, Dict, List, Tuple, Optional
import bilibili_api
FUNC = re.compile(r"[^\.\(]+")
ARGS = re.compile(r"[^,\(\)]*[,\)]")
SENTENCES = re.compile(r"\w+(?:\([^\)]*\))?\.?")
OPS = {
":int": int,
":float": float,
":bool": lambda s: s == "True",
}
class Parser:
"""
解析器
"""
def __init__(self, params: Dict[str, str]):
self.valid = True
self.params = params
async def __aenter__(self):
"""
解析前准备
把 params 中的参数先解析了
"""
for key, val in self.params.items():
obj, err = await self.parse(val)
if err is None:
if isinstance(obj, bilibili_api.Credential):
self.valid = await bilibili_api.check_valid(obj)
self.params[key] = obj
return self
async def __aexit__(self, type, value, trace):
...
async def transform(self, var: str) -> Any:
"""
类型装换函数
通过在字符串后加上 `:int` `:float` `:bool` `:parse` 等操作符来实现
Args:
var (str): 需要转换的字符串
Returns:
Any: 装换结果
"""
for key, fn in OPS.items():
if var.endswith(key):
return fn(var.replace(key, ""))
if var.endswith(":parse"):
obj, err = await self.parse(var.replace(":parse", ""))
if err is None:
return obj
# 是请求时 params 中定义的变量 获取出来 不是的话返回原字符
return self.params.get(var, var)
async def parse(self, path: str) -> Tuple[Any, Optional[str]]:
"""
分析指令
Args:
path (str): 需要解析的 token 对应库中的路径
Returns:
Any: 最终数据 若解析失败为 None
str: 错误信息 若解析成功为 None
"""
# 纯数字
if path.replace(":int", "").replace(":float", "").replace(".", "").replace("-", "").isdigit():
return await self.transform(path), None
# 指令列表
sentences = SENTENCES.findall(path)
# 起始点
position: Any = bilibili_api
async def inner() -> Optional[str]:
"""
递归取值
Returns:
str: 错误信息 若解析成功为 None
"""
nonlocal position
# 分解执行的函数名、参数、指名参数
sentence = sentences.pop(0)
func: str = FUNC.findall(sentence)[0]
flags: List[str] = ARGS.findall(sentence)
args, kwargs = [], {}
for flag in flags:
# 去除句尾的逗号或小括号
flag = flag[:-1]
if flag == "":
continue
# 通过判断是否有等号存入参数列表或指名参数字典
arg = flag.split("=")
if len(arg) == 1:
args.append(await self.transform(arg[0]))
else:
kwargs[arg[0]] = await self.transform(arg[1])
# print(position, func, args, kwargs)
# 开始转移
if isinstance(position, dict):
position = position.get(func, None)
elif isinstance(position, list):
position = position[int(func)]
else:
position = getattr(position, func, None)
# 赋值参数
if isAsync(position):
position = await position(*args, **kwargs)
elif isFn(position):
position = position(*args, **kwargs)
elif isclass(position) and not issubclass(position, Enum):
position = position(*args, **kwargs)
# 为空返回出错语句
# 否则检查是否分析完全部语句
# 是则返回 None 否继续递归
if position is None:
return sentence
if len(sentences) == 0:
return None
return await inner()
msg = await inner()
return position, msg