|
import re
|
|
from enum import Enum
|
|
from inspect import isclass
|
|
from inspect import isfunction as isFn
|
|
from inspect import iscoroutinefunction as isAsync
|
|
from typing import Any, Dict, List, Tuple, Optional
|
|
|
|
import bilibili_api
|
|
|
|
FUNC = re.compile(r"[^\.\(]+")
|
|
ARGS = re.compile(r"[^,\(\)]*[,\)]")
|
|
SENTENCES = re.compile(r"\w+(?:\([^\)]*\))?\.?")
|
|
OPS = {
|
|
":int": int,
|
|
":float": float,
|
|
":bool": lambda s: s == "True",
|
|
}
|
|
|
|
|
|
class Parser:
|
|
"""
|
|
解析器
|
|
"""
|
|
|
|
def __init__(self, params: Dict[str, str]):
|
|
self.valid = True
|
|
self.params = params
|
|
|
|
async def __aenter__(self):
|
|
"""
|
|
解析前准备
|
|
|
|
把 params 中的参数先解析了
|
|
"""
|
|
for key, val in self.params.items():
|
|
obj, err = await self.parse(val)
|
|
if err is None:
|
|
if isinstance(obj, bilibili_api.Credential):
|
|
self.valid = await bilibili_api.check_valid(obj)
|
|
self.params[key] = obj
|
|
return self
|
|
|
|
async def __aexit__(self, type, value, trace):
|
|
...
|
|
|
|
async def transform(self, var: str) -> Any:
|
|
"""
|
|
类型装换函数
|
|
|
|
通过在字符串后加上 `:int` `:float` `:bool` `:parse` 等操作符来实现
|
|
|
|
Args:
|
|
var (str): 需要转换的字符串
|
|
|
|
Returns:
|
|
Any: 装换结果
|
|
"""
|
|
for key, fn in OPS.items():
|
|
if var.endswith(key):
|
|
return fn(var.replace(key, ""))
|
|
if var.endswith(":parse"):
|
|
obj, err = await self.parse(var.replace(":parse", ""))
|
|
if err is None:
|
|
return obj
|
|
|
|
|
|
return self.params.get(var, var)
|
|
|
|
async def parse(self, path: str) -> Tuple[Any, Optional[str]]:
|
|
"""
|
|
分析指令
|
|
|
|
Args:
|
|
path (str): 需要解析的 token 对应库中的路径
|
|
|
|
Returns:
|
|
Any: 最终数据 若解析失败为 None
|
|
|
|
str: 错误信息 若解析成功为 None
|
|
"""
|
|
|
|
if path.replace(":int", "").replace(":float", "").replace(".", "").replace("-", "").isdigit():
|
|
return await self.transform(path), None
|
|
|
|
|
|
sentences = SENTENCES.findall(path)
|
|
|
|
position: Any = bilibili_api
|
|
|
|
async def inner() -> Optional[str]:
|
|
"""
|
|
递归取值
|
|
|
|
Returns:
|
|
str: 错误信息 若解析成功为 None
|
|
"""
|
|
nonlocal position
|
|
|
|
sentence = sentences.pop(0)
|
|
func: str = FUNC.findall(sentence)[0]
|
|
flags: List[str] = ARGS.findall(sentence)
|
|
args, kwargs = [], {}
|
|
|
|
for flag in flags:
|
|
|
|
flag = flag[:-1]
|
|
if flag == "":
|
|
continue
|
|
|
|
|
|
arg = flag.split("=")
|
|
if len(arg) == 1:
|
|
args.append(await self.transform(arg[0]))
|
|
else:
|
|
kwargs[arg[0]] = await self.transform(arg[1])
|
|
|
|
|
|
|
|
|
|
if isinstance(position, dict):
|
|
position = position.get(func, None)
|
|
elif isinstance(position, list):
|
|
position = position[int(func)]
|
|
else:
|
|
position = getattr(position, func, None)
|
|
|
|
|
|
if isAsync(position):
|
|
position = await position(*args, **kwargs)
|
|
elif isFn(position):
|
|
position = position(*args, **kwargs)
|
|
elif isclass(position) and not issubclass(position, Enum):
|
|
position = position(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
if position is None:
|
|
return sentence
|
|
if len(sentences) == 0:
|
|
return None
|
|
return await inner()
|
|
|
|
msg = await inner()
|
|
return position, msg
|
|
|