Spaces:
Sleeping
Sleeping
""" | |
bilibili_api.session | |
消息相关 | |
""" | |
import json | |
import time | |
import asyncio | |
import logging | |
import datetime | |
from enum import Enum | |
from typing import Union, Optional | |
from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
from bilibili_api.exceptions import ApiException | |
from .video import Video | |
from .user import get_self_info | |
from .utils.utils import get_api, raise_for_statement | |
from .utils.picture import Picture | |
from .utils.AsyncEvent import AsyncEvent | |
from .utils.credential import Credential | |
from .utils.network import Api | |
API = get_api("session") | |
async def fetch_session_msgs( | |
talker_id: int, credential: Credential, session_type: int = 1, begin_seqno: int = 0 | |
) -> dict: | |
""" | |
获取指定用户的近三十条消息 | |
Args: | |
talker_id (int) : 用户 UID | |
credential (Credential): Credential | |
session_type (int) : 会话类型 1 私聊 2 应援团 | |
begin_seqno (int) : 起始 Seqno | |
Returns: | |
dict: 调用 API 返回结果 | |
""" | |
credential.raise_for_no_sessdata() | |
params = { | |
"talker_id": talker_id, | |
"session_type": session_type, | |
"begin_seqno": begin_seqno, | |
} | |
api = API["session"]["fetch"] | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def new_sessions( | |
credential: Credential, begin_ts: int = int(time.time() * 1000000) | |
) -> dict: | |
""" | |
获取新消息 | |
Args: | |
credential (Credential): Credential | |
begin_ts (int) : 起始时间戳 | |
Returns: | |
dict: 调用 API 返回结果 | |
""" | |
credential.raise_for_no_sessdata() | |
params = {"begin_ts": begin_ts, "build": 0, "mobi_app": "web"} | |
api = API["session"]["new"] | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_sessions(credential: Credential, session_type: int = 4) -> dict: | |
""" | |
获取已有消息 | |
Args: | |
credential (Credential): Credential | |
session_type (int) : 会话类型 1: 私聊, 2: 通知, 3: 应援团, 4: 全部 | |
Returns: | |
dict: 调用 API 返回结果 | |
""" | |
credential.raise_for_no_sessdata() | |
params = { | |
"session_type": session_type, | |
"group_fold": 1, | |
"unfollow_fold": 0, | |
"sort_rule": 2, | |
"build": 0, | |
"mobi_app": "web", | |
} | |
api = API["session"]["get"] | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_session_detail( | |
credential: Credential, talker_id: int, session_type: int = 1 | |
) -> dict: | |
""" | |
获取会话详情 | |
Args: | |
credential (Credential): Credential | |
session_type (int) : 会话类型 | |
talker_id (int) : 会话对象的UID | |
Returns: | |
dict: 调用 API 返回结果 | |
""" | |
credential.raise_for_no_sessdata() | |
params = {"talker_id": talker_id, "session_type": session_type} | |
api = API["session"]["get_session_detail"] | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_replies( | |
credential: Credential, | |
last_reply_id: Optional[int] = None, | |
reply_time: Optional[int] = None, | |
) -> dict: | |
""" | |
获取收到的回复 | |
Args: | |
credential (Credential): 凭据类. | |
last_reply_id (Optional, int) 最后一个评论的 ID | |
reply_time (Optional, int) 最后一个评论发送时间 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["replies"] | |
params = {"id": last_reply_id, "reply_time": reply_time} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_likes( | |
credential: Credential, last_id: int = None, like_time: int = None | |
) -> dict: | |
""" | |
获取收到的赞 | |
Args: | |
credential (Credential): 凭据类. | |
last_id (Optional, int) 最后一个 ID | |
like_time (Optional, int) 最后一个点赞发送时间 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["likes"] | |
params = {"id": last_id, "like_time": like_time} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_at( | |
credential: Credential, last_uid: int = None, at_time: int = None | |
) -> dict: | |
""" | |
获取收到的 AT | |
Args: | |
credential (Credential): 凭据类. | |
last_id (Optional, int) 最后一个 ID | |
at_time (Optional, int) 最后一个点赞发送时间 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["at"] | |
params = {"id": last_uid, "at_time": at_time} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_unread_messages(credential: Credential) -> dict: | |
""" | |
获取未读的信息 | |
Args: | |
credential (Credential): 凭据类. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["unread"] | |
return await Api(**api, credential=credential).result | |
async def get_system_messages(credential: Credential) -> dict: | |
""" | |
获取系统信息 | |
Args: | |
credential (Credential): 凭据类. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["system_msg"] | |
return await Api(**api, credential=credential).result | |
async def get_session_settings(credential: Credential) -> dict: | |
""" | |
获取消息设置 | |
Args: | |
credential (Credential): 凭据类. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["session"]["session_settings"] | |
return await Api(**api, credential=credential).result | |
class EventType(Enum): | |
""" | |
事件类型 | |
- TEXT: 纯文字消息 | |
- PICTURE: 图片消息 | |
- WITHDRAW: 撤回消息 | |
- GROUPS_PICTURE: 应援团图片,但似乎不常触发,一般使用 PICTURE 即可 | |
- SHARE_VIDEO: 分享视频 | |
- NOTICE: 系统通知 | |
- PUSHED_VIDEO: UP主推送的视频 | |
- WELCOME: 新成员加入应援团欢迎 | |
""" | |
TEXT = 1 | |
PICTURE = 2 | |
WITHDRAW = 5 | |
GROUPS_PICTURE = 6 | |
SHARE_VIDEO = 7 | |
NOTICE = 10 | |
PUSHED_VIDEO = 11 | |
WELCOME = 306 | |
class Event: | |
""" | |
事件参数: | |
+ receiver_id: 收信人 UID | |
+ receiver_type: 收信人类型,1: 私聊, 2: 应援团通知, 3: 应援团 | |
+ sender_uid: 发送人 UID | |
+ talker_id: 对话人 UID | |
+ msg_seqno: 事件 Seqno | |
+ msg_type: 事件类型 | |
+ msg_key: 事件唯一编号 | |
+ timestamp: 事件时间戳 | |
+ content: 事件内容 | |
""" | |
receiver_id: int | |
receiver_type: int | |
sender_uid: int | |
talker_id: int | |
msg_seqno: int | |
msg_type: int | |
msg_key: int | |
timestamp: int | |
content: Union[str, int, Picture, Video] | |
def __init__(self, data: dict, self_uid: int): | |
""" | |
信息事件类型 | |
Args: | |
data: 接收到的事件详细信息 | |
self_uid: 用户自身 UID | |
""" | |
self.__dict__.update(data) | |
self.uid = self_uid | |
try: | |
self.__content() | |
except AttributeError: | |
logging.error(f"解析消息错误:{data}") | |
def __str__(self): | |
if self.receiver_type == 1: | |
if self.receiver_id == self.uid: | |
msg_type = "收到" | |
user_id = self.sender_uid | |
elif self.sender_uid == self.uid: | |
msg_type = "发送" | |
user_id = self.receiver_id | |
elif self.receiver_type == 2: | |
user_id = self.receiver_id | |
if self.sender_uid == self.uid: | |
msg_type = "发送应援团" | |
elif self.sender_uid == 0: | |
msg_type = "系统提示" | |
else: | |
msg_type = "收到应援团" | |
return f"{msg_type} {user_id} 信息 {self.content}({self.timestamp})" # type: ignore | |
def __content(self) -> None: | |
""" | |
更新消息内容 | |
""" | |
content: dict = json.loads(self.content) # type: ignore | |
mt = self.msg_type | |
if mt == EventType.TEXT.value: | |
self.content = content.get("content") # type: ignore | |
elif mt == EventType.WELCOME.value: | |
self.content = content.get("content") + str(content.get("group_id")) # type: ignore | |
elif mt == EventType.WITHDRAW.value: | |
self.content = str(content) | |
elif mt == EventType.PICTURE.value or mt == EventType.GROUPS_PICTURE.value: | |
content.pop("original") | |
self.content = Picture(**content) | |
elif mt == EventType.SHARE_VIDEO.value or mt == EventType.PUSHED_VIDEO.value: | |
self.content = Video(bvid=content.get("bvid"), aid=content.get("id")) | |
elif mt == EventType.NOTICE.value: | |
self.content = content["title"] + " " + content["text"] | |
else: | |
logging.error(f"未知消息类型:{mt},消息内容:{content}") | |
async def send_msg( | |
credential: Credential, | |
receiver_id: int, | |
msg_type: EventType, | |
content: Union[str, Picture], | |
) -> dict: | |
""" | |
给用户发送私聊信息。目前仅支持纯文本。 | |
Args: | |
credential (Credential) : 凭证 | |
receiver_id (int) : 接收者 UID | |
msg_type (EventType) : 信息类型,参考 Event 类的事件类型。 | |
content (str | Picture): 信息内容。支持文字和图片。 | |
Returns: | |
dict: 调用 API 返回结果 | |
""" | |
credential.raise_for_no_sessdata() | |
credential.raise_for_no_bili_jct() | |
api = API["operate"]["send_msg"] | |
self_info = await get_self_info(credential) | |
sender_uid = self_info["mid"] | |
if msg_type == EventType.TEXT: | |
real_content = json.dumps({"content": content}) | |
elif msg_type == EventType.WITHDRAW: | |
real_content = str(content) | |
elif msg_type == EventType.PICTURE or msg_type == EventType.GROUPS_PICTURE: | |
raise_for_statement(isinstance(content, Picture), "TypeError") | |
await content.upload_file(credential=credential, data={"biz": "im"}) | |
real_content = json.dumps( | |
{ | |
"url": content.url, | |
"height": content.height, | |
"width": content.width, | |
"imageType": content.imageType, | |
"original": 1, | |
"size": content.size, | |
} | |
) | |
else: | |
raise ApiException("信息类型不支持。") | |
data = { | |
"msg[sender_uid]": sender_uid, | |
"msg[receiver_id]": receiver_id, | |
"msg[receiver_type]": 1, | |
"msg[msg_type]": msg_type.value, | |
"msg[msg_status]": 0, | |
"msg[content]": real_content, | |
"msg[dev_id]": "A6716E9A-7CE3-47AF-994B-F0B34178D28D", | |
"msg[new_face_version]": 0, | |
"msg[timestamp]": int(time.time()), | |
"from_filework": 0, | |
"build": 0, | |
"mobi_app": "web", | |
} | |
return await Api(**api, credential=credential).update_data(**data).result | |
class Session(AsyncEvent): | |
""" | |
会话类,用来开启消息监听。 | |
""" | |
def __init__(self, credential: Credential, debug=False): | |
super().__init__() | |
# 会话状态 | |
self.__status = 0 | |
# 已获取会话中最大的时间戳 默认当前时间 | |
self.maxTs = int(time.time() * 1000000) | |
# 会话UID为键 会话中最大Seqno为值 | |
self.maxSeqno = dict() | |
# 凭证 | |
self.credential = credential | |
# 异步定时任务框架 | |
self.sched = AsyncIOScheduler(timezone="Asia/Shanghai") | |
# 已接收的所有事件 用于撤回时找回 | |
self.events = dict() | |
# logging | |
self.logger = logging.getLogger("Session") | |
self.logger.setLevel(logging.DEBUG if debug else logging.INFO) | |
if not self.logger.handlers: | |
handler = logging.StreamHandler() | |
handler.setFormatter( | |
logging.Formatter( | |
"[%(asctime)s][%(levelname)s]: %(message)s", "%Y-%m-%d %H:%M:%S" | |
) | |
) | |
self.logger.addHandler(handler) | |
def on(self, event_type: EventType): | |
""" | |
重载装饰器注册事件监听器 | |
Args: | |
event_type (EventType): 事件类型 | |
""" | |
return super().on(event_name=str(event_type.value)) | |
def get_status(self) -> int: | |
""" | |
获取连接状态 | |
Returns: | |
int: 0 初始化,1 已连接,2 断开连接中,3 已断开,4 错误 | |
""" | |
return self.__status | |
async def run(self, exclude_self: bool = True) -> None: | |
""" | |
非阻塞异步爬虫 定时发送请求获取消息 | |
Args: | |
exclude_self: bool 是否排除自己发出的消息,默认排除 | |
""" | |
# 获取自身UID 用于后续判断消息是发送还是接收 | |
self_info = await get_self_info(self.credential) | |
self.uid = self_info["mid"] | |
# 初始化 只接收开始运行后的新消息 | |
js = await get_sessions(self.credential) | |
self.maxSeqno = { | |
_session["talker_id"]: _session["max_seqno"] | |
for _session in js.get("session_list", []) | |
} | |
# 间隔 6 秒轮询消息列表 之前设置 3 秒询一次 跑了一小时给我账号冻结了 | |
async def qurey(): | |
js: dict = await new_sessions(self.credential, self.maxTs) | |
if js.get("session_list") is None: | |
return | |
pending = set() | |
for session in js["session_list"]: | |
self.maxTs = max(self.maxTs, session["session_ts"]) | |
pending.add( | |
asyncio.create_task( | |
fetch_session_msgs( | |
session["talker_id"], | |
self.credential, | |
session["session_type"], | |
self.maxSeqno.get(session["talker_id"]), # type: ignore | |
) | |
) | |
) | |
self.maxSeqno[session["talker_id"]] = session["max_seqno"] | |
while pending: | |
done, pending = await asyncio.wait(pending) | |
for done_task in done: | |
result: dict = await done_task | |
if result is None or result.get("messages") is None: | |
continue | |
for message in result.get("messages", [])[::-1]: | |
event = Event(message, self.uid) | |
if event.msg_type == EventType.WITHDRAW.value: | |
self.logger.info( | |
str( | |
self.events.get( | |
event.content, f"key={event.content}" | |
) | |
) | |
+ f" 被撤回({event.timestamp})" | |
) | |
else: | |
self.logger.info(event) | |
# 自己发出的消息不发布任务 | |
if event.sender_uid != self.uid or not exclude_self: | |
self.dispatch(str(event.msg_type), event) | |
self.events[str(event.msg_key)] = event | |
self.logger.debug(f"maxTs = {self.maxTs}") | |
self.__status = 1 | |
self.sched.start() | |
self.logger.info("开始轮询") | |
async def start(self, exclude_self: bool = True) -> None: | |
""" | |
阻塞异步启动 通过调用 self.close() 后可断开连接 | |
Args: | |
exclude_self: bool 是否排除自己发出的消息,默认排除 | |
""" | |
await self.run(exclude_self) | |
while self.get_status() < 2: | |
await asyncio.sleep(1) | |
if self.get_status() == 2: | |
self.__status = 3 | |
async def reply(self, event: Event, content: Union[str, Picture]) -> dict: # type: ignore | |
""" | |
快速回复消息 | |
Args: | |
event : Event 要回复的消息 | |
content: str | Picture 要回复的文字内容 | |
Returns: | |
dict: 调用接口返回的内容。 | |
""" | |
if self.uid == event.sender_uid: | |
self.logger.error("不能给自己发送消息哦~") | |
else: | |
msg_type = ( | |
EventType.PICTURE if isinstance(content, Picture) else EventType.TEXT | |
) | |
return await send_msg(self.credential, event.sender_uid, msg_type, content) | |
def close(self) -> None: | |
"""结束轮询""" | |
self.sched.remove_job("query") | |
self.__status = 2 | |
self.logger.info("结束轮询") | |