Spaces:
Sleeping
Sleeping
r""" | |
bilibili_api.live | |
直播相关 | |
""" | |
import json | |
import time | |
import base64 | |
import struct | |
import asyncio | |
import logging | |
from enum import Enum | |
from typing import Any, List, Union | |
import brotli | |
import aiohttp | |
from aiohttp.client_ws import ClientWebSocketResponse | |
from .utils.utils import get_api, raise_for_statement | |
from .utils.danmaku import Danmaku | |
from .utils.network import get_aiohttp_session, Api, HEADERS | |
from .utils.AsyncEvent import AsyncEvent | |
from .utils.credential import Credential | |
from .exceptions.LiveException import LiveException | |
API = get_api("live") | |
class ScreenResolution(Enum): | |
""" | |
直播源清晰度。 | |
清晰度编号,4K 20000,原画 10000,蓝光(杜比)401,蓝光 400,超清 250,高清 150,流畅 80 | |
+ FOUR_K : 4K。 | |
+ ORIGINAL : 原画。 | |
+ BLU_RAY_DOLBY : 蓝光(杜比)。 | |
+ BLU_RAY : 蓝光。 | |
+ ULTRA_HD : 超清。 | |
+ HD : 高清。 | |
+ FLUENCY : 流畅。 | |
""" | |
FOUR_K = 20000 | |
ORIGINAL = 10000 | |
BLU_RAY_DOLBY = 401 | |
BLU_RAY = 400 | |
ULTRA_HD = 250 | |
HD = 150 | |
FLUENCY = 80 | |
class LiveProtocol(Enum): | |
""" | |
直播源流协议。 | |
流协议,0 为 FLV 流,1 为 HLS 流。默认:0,1 | |
+ FLV : 0。 | |
+ HLS : 1。 | |
+ DEFAULT : 0,1 | |
""" | |
FLV = 0 | |
HLS = 1 | |
DEFAULT = "0,1" | |
class LiveFormat(Enum): | |
""" | |
直播源容器格式 | |
容器格式,0 为 flv 格式;1 为 ts 格式(仅限 hls 流);2 为 fmp4 格式(仅限 hls 流)。默认:0,2 | |
+ FLV : 0。 | |
+ TS : 1。 | |
+ FMP4 : 2。 | |
+ DEFAULT : 2。 | |
""" | |
FLV = 0 | |
TS = 1 | |
FMP4 = 2 | |
DEFAULT = "0,1,2" | |
class LiveCodec(Enum): | |
""" | |
直播源视频编码 | |
视频编码,0 为 avc 编码,1 为 hevc 编码。默认:0,1 | |
+ AVC : 0。 | |
+ HEVC : 1。 | |
+ DEFAULT : 0,1。 | |
""" | |
AVC = 0 | |
HEVC = 1 | |
DEFAULT = "0,1" | |
class LiveRoom: | |
""" | |
直播类,获取各种直播间的操作均在里边。 | |
AttributesL | |
credential (Credential): 凭据类 | |
room_display_id (int) : 房间展示 id | |
""" | |
def __init__( | |
self, room_display_id: int, credential: Union[Credential, None] = None | |
): | |
""" | |
Args: | |
room_display_id (int) : 房间展示 ID(即 URL 中的 ID) | |
credential (Credential, optional): 凭据. Defaults to None. | |
""" | |
self.room_display_id = room_display_id | |
if credential is None: | |
self.credential = Credential() | |
else: | |
self.credential = credential | |
self.__ruid = None | |
async def start(self, area_id: int) -> dict: | |
""" | |
开始直播 | |
Args: | |
area_id (int): 直播分区id(子分区id)。可使用 live_area 模块查询。 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["start"] | |
params = { | |
"area_v2": area_id, | |
"room_id": self.room_display_id, | |
"platform": "pc", | |
} | |
resp = ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
return resp | |
async def stop(self) -> dict: | |
""" | |
关闭直播 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["stop"] | |
params = { | |
"room_id": self.room_display_id, | |
} | |
resp = ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
return resp | |
async def get_room_play_info(self) -> dict: | |
""" | |
获取房间信息(真实房间号,封禁情况等) | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["room_play_info"] | |
params = { | |
"room_id": self.room_display_id, | |
} | |
resp = ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
# 缓存真实房间 ID | |
self.__ruid = resp["uid"] | |
return resp | |
async def get_room_id(self) -> int: | |
return (await self.get_room_play_info())["room_id"] | |
async def __get_ruid(self) -> int: | |
""" | |
获取真实房间 ID,若有缓存则使用缓存 | |
""" | |
if self.__ruid is None: | |
await self.get_room_play_info() | |
return self.__ruid # type: ignore | |
async def get_ruid(self) -> int: | |
return await self.__get_ruid() | |
async def get_danmu_info(self) -> dict: | |
""" | |
获取聊天弹幕服务器配置信息(websocket) | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["danmu_info"] | |
params = {"id": self.room_display_id} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_room_info(self) -> dict: | |
""" | |
获取直播间信息(标题,简介等) | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["room_info"] | |
params = {"room_id": self.room_display_id} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_fan_model( | |
self, | |
page_num: int = 1, | |
target_id: Union[int, None] = None, | |
roomId: Union[int, None] = None, | |
) -> dict: | |
""" | |
获取自己的粉丝勋章信息 | |
如果带有房间号就返回是否具有的判断 has_medal | |
如果带有主播 id ,就返回主播的粉丝牌,没有就返回 null | |
Args: | |
roomId (int, optional) : 指定房间,查询是否拥有此房间的粉丝牌 | |
target_id (int | None, optional): 指定返回一个主播的粉丝牌,留空就不返回 | |
page_num (int | None, optional): 粉丝牌列表,默认 1 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["info"]["live_info"] | |
params = { | |
"pageSize": 10, | |
"page": page_num, | |
} | |
if roomId: | |
params["roomId"] = roomId | |
if target_id: | |
params["target_id"] = target_id | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_user_info_in_room(self) -> dict: | |
""" | |
获取自己在直播间的信息(粉丝勋章等级,直播用户等级等) | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["info"]["user_info_in_room"] | |
params = {"room_id": self.room_display_id} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_popular_ticket_num(self) -> dict: | |
""" | |
获取自己在直播间的人气票数量(付费人气票已赠送的量,免费人气票的持有量) | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["info"]["popular_ticket"] | |
params = { | |
"ruid": await self.__get_ruid(), | |
"surce": 0, | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def send_popular_ticket(self) -> dict: | |
""" | |
赠送自己在直播间的所有免费人气票 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
self.credential.raise_for_no_bili_jct() | |
api = API["operate"]["send_popular_ticket"] | |
params = { | |
"ruid": await self.__get_ruid(), | |
"visit_id": "", | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_dahanghai(self, page: int = 1) -> dict: | |
""" | |
获取大航海列表 | |
Args: | |
page (int, optional): 页码. Defaults to 1. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["dahanghai"] | |
params = { | |
"roomid": self.room_display_id, | |
"ruid": await self.__get_ruid(), | |
"page_size": 30, | |
"page": page, | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_gaonengbang(self, page: int = 1) -> dict: | |
""" | |
获取高能榜列表 | |
Args: | |
page (int, optional): 页码. Defaults to 1 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["gaonengbang"] | |
params = { | |
"roomId": self.room_display_id, | |
"ruid": await self.__get_ruid(), | |
"pageSize": 50, | |
"page": page, | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_seven_rank(self) -> dict: | |
""" | |
获取七日榜 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["seven_rank"] | |
params = { | |
"roomid": self.room_display_id, | |
"ruid": await self.__get_ruid(), | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_fans_medal_rank(self) -> dict: | |
""" | |
获取粉丝勋章排行 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["fans_medal_rank"] | |
params = {"roomid": self.room_display_id, "ruid": await self.__get_ruid()} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_black_list(self, page: int = 1) -> dict: | |
""" | |
获取黑名单列表 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["black_list"] | |
params = {"room_id": self.room_display_id, "ps": page} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_room_play_url( | |
self, screen_resolution: ScreenResolution = ScreenResolution.ORIGINAL | |
) -> dict: | |
""" | |
获取房间直播流列表 | |
Args: | |
screen_resolution (ScreenResolution, optional): 清晰度. Defaults to ScreenResolution.ORIGINAL | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["room_play_url"] | |
params = { | |
"cid": self.room_display_id, | |
"platform": "web", | |
"qn": screen_resolution.value, | |
"https_url_req": "1", | |
"ptype": "16", | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_room_play_info_v2( | |
self, | |
live_protocol: LiveProtocol = LiveProtocol.DEFAULT, | |
live_format: LiveFormat = LiveFormat.DEFAULT, | |
live_codec: LiveCodec = LiveCodec.DEFAULT, | |
live_qn: ScreenResolution = ScreenResolution.ORIGINAL, | |
) -> dict: | |
""" | |
获取房间信息及可用清晰度列表 | |
Args: | |
live_protocol (LiveProtocol, optional) : 直播源流协议. Defaults to LiveProtocol.DEFAULT. | |
live_format (LiveFormat, optional) : 直播源容器格式. Defaults to LiveFormat.DEFAULT. | |
live_codec (LiveCodec, optional) : 直播源视频编码. Defaults to LiveCodec.DEFAULT. | |
live_qn (ScreenResolution, optional): 直播源清晰度. Defaults to ScreenResolution.ORIGINAL. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["room_play_info_v2"] | |
params = { | |
"room_id": self.room_display_id, | |
"platform": "web", | |
"ptype": "16", | |
"protocol": live_protocol.value, | |
"format": live_format.value, | |
"codec": live_codec.value, | |
"qn": live_qn.value, | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def ban_user(self, uid: int) -> dict: | |
""" | |
封禁用户 | |
Args: | |
uid (int): 用户 UID | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["operate"]["add_block"] | |
data = { | |
"room_id": self.room_display_id, | |
"tuid": uid, | |
"mobile_app": "web", | |
"visit_id": "", | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def unban_user(self, uid: int) -> dict: | |
""" | |
解封用户 | |
Args: | |
uid (int): 用户 UID | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["operate"]["del_block"] | |
data = { | |
"room_id": self.room_display_id, | |
"tuid": uid, | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def send_danmaku(self, danmaku: Danmaku, reply_mid: int = None) -> dict: | |
""" | |
直播间发送弹幕 | |
Args: | |
danmaku (Danmaku): 弹幕类 | |
reply_mid (int, optional): @的 UID. Defaults to None. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["operate"]["send_danmaku"] | |
room_id = (await self.get_room_play_info())["room_id"] | |
data = { | |
"mode": danmaku.mode, | |
"msg": danmaku.text, | |
"roomid": room_id, | |
"bubble": 0, | |
"rnd": int(time.time()), | |
"color": int(danmaku.color, 16), | |
"fontsize": danmaku.font_size, | |
} | |
if reply_mid: data["reply_mid"] = reply_mid | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def sign_up_dahanghai(self, task_id: int = 1447) -> dict: | |
""" | |
大航海签到 | |
Args: | |
task_id (int, optional): 签到任务 ID. Defaults to 1447 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
self.credential.raise_for_no_bili_jct() | |
api = API["operate"]["sign_up_dahanghai"] | |
data = { | |
"task_id": task_id, | |
"uid": await self.__get_ruid(), | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def send_gift_from_bag( | |
self, | |
uid: int, | |
bag_id: int, | |
gift_id: int, | |
gift_num: int, | |
storm_beat_id: int = 0, | |
price: int = 0, | |
) -> dict: | |
""" | |
赠送包裹中的礼物,获取包裹信息可以使用 get_self_bag 方法 | |
Args: | |
uid (int) : 赠送用户的 UID | |
bag_id (int) : 礼物背包 ID | |
gift_id (int) : 礼物 ID | |
gift_num (int) : 礼物数量 | |
storm_beat_id (int, optional) : 未知, Defaults to 0 | |
price (int, optional) : 礼物单价,Defaults to 0 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
self.credential.raise_for_no_bili_jct() | |
api = API["operate"]["send_gift_from_bag"] | |
data = { | |
"uid": uid, | |
"bag_id": bag_id, | |
"gift_id": gift_id, | |
"gift_num": gift_num, | |
"platform": "pc", | |
"send_ruid": 0, | |
"storm_beat_id": storm_beat_id, | |
"price": price, | |
"biz_code": "live", | |
"biz_id": self.room_display_id, | |
"ruid": await self.__get_ruid(), | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def receive_reward(self, receive_type: int = 2) -> dict: | |
""" | |
领取自己在某个直播间的航海日志奖励 | |
Args: | |
receive_type (int) : 领取类型,Defaults to 2. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["operate"]["receive_reward"] | |
data = { | |
"ruid": await self.__get_ruid(), | |
"receive_type": receive_type, | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def get_general_info(self, act_id: int = 100061) -> dict: | |
""" | |
获取自己在该房间的大航海信息, 比如是否开通, 等级等 | |
Args: | |
act_id (int, optional) : 未知,Defaults to 100061 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["info"]["general_info"] | |
params = { | |
"actId": act_id, | |
"roomId": self.room_display_id, | |
"uid": await self.__get_ruid(), | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def update_news(self, content: str) -> dict: | |
""" | |
更新公告 | |
Args: | |
content: 最多60字符 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
api = API["info"]["update_news"] | |
params = { | |
"content": content, | |
"roomId": self.room_display_id, | |
"uid": await self.__get_ruid(), | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_gift_common(self) -> dict: | |
""" | |
获取当前直播间内的普通礼物列表 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api_room_info = API["info"]["room_info"] | |
params_room_info = { | |
"room_id": self.room_display_id, | |
} | |
res_room_info = ( | |
await Api(**api_room_info, credential=self.credential) | |
.update_params(**params_room_info) | |
.result | |
) | |
area_id, area_parent_id = ( | |
res_room_info["room_info"]["area_id"], | |
res_room_info["room_info"]["parent_area_id"], | |
) | |
api = API["info"]["gift_common"] | |
params = { | |
"room_id": self.room_display_id, | |
"area_id": area_id, | |
"area_parent_id": area_parent_id, | |
"platform": "pc", | |
"source": "live", | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def get_gift_special(self, tab_id: int) -> dict: | |
""" | |
注:此 API 已失效,请使用 live.get_gift_config | |
获取当前直播间内的特殊礼物列表 | |
Args: | |
tab_id (int) : 2:特权礼物,3:定制礼物 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api_room_info = API["info"]["room_info"] | |
params_room_info = { | |
"room_id": self.room_display_id, | |
} | |
res_room_info = ( | |
await Api(**api_room_info, credential=self.credential) | |
.update_params(**params_room_info) | |
.result | |
) | |
area_id, area_parent_id = ( | |
res_room_info["room_info"]["area_id"], | |
res_room_info["room_info"]["parent_area_id"], | |
) | |
api = API["info"]["gift_special"] | |
params = { | |
"tab_id": tab_id, | |
"area_id": area_id, | |
"area_parent_id": area_parent_id, | |
"room_id": await self.__get_ruid(), | |
"source": "live", | |
"platform": "pc", | |
"build": 1, | |
} | |
return ( | |
await Api(**api, credential=self.credential).update_params(**params).result | |
) | |
async def send_gift_gold( | |
self, uid: int, gift_id: int, gift_num: int, price: int, storm_beat_id: int = 0 | |
) -> dict: | |
""" | |
赠送金瓜子礼物 | |
Args: | |
uid (int) : 赠送用户的 UID | |
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取) | |
gift_num (int) : 赠送礼物数量 | |
price (int) : 礼物单价 | |
storm_beat_id (int, Optional): 未知,Defaults to 0 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
self.credential.raise_for_no_bili_jct() | |
api = API["operate"]["send_gift_gold"] | |
data = { | |
"uid": uid, | |
"gift_id": gift_id, | |
"gift_num": gift_num, | |
"price": price, | |
"ruid": await self.__get_ruid(), | |
"biz_code": "live", | |
"biz_id": self.room_display_id, | |
"platform": "pc", | |
"storm_beat_id": storm_beat_id, | |
"send_ruid": 0, | |
"coin_type": "gold", | |
"bag_id": "0", | |
"rnd": int(time.time()), | |
"visit_id": "", | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
async def send_gift_silver( | |
self, | |
uid: int, | |
gift_id: int, | |
gift_num: int, | |
price: int, | |
storm_beat_id: int = 0, | |
) -> dict: | |
""" | |
赠送银瓜子礼物 | |
Args: | |
uid (int) : 赠送用户的 UID | |
gift_id (int) : 礼物 ID (可以通过 get_gift_common 或 get_gift_special 或 get_gift_config 获取) | |
gift_num (int) : 赠送礼物数量 | |
price (int) : 礼物单价 | |
storm_beat_id (int, Optional): 未知, Defaults to 0 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
self.credential.raise_for_no_sessdata() | |
self.credential.raise_for_no_bili_jct() | |
api = API["operate"]["send_gift_silver"] | |
data = { | |
"uid": uid, | |
"gift_id": gift_id, | |
"gift_num": gift_num, | |
"price": price, | |
"ruid": await self.__get_ruid(), | |
"biz_code": "live", | |
"biz_id": self.room_display_id, | |
"platform": "pc", | |
"storm_beat_id": storm_beat_id, | |
"send_ruid": 0, | |
"coin_type": "silver", | |
"bag_id": 0, | |
"rnd": int(time.time()), | |
"visit_id": "", | |
} | |
return await Api(**api, credential=self.credential).update_data(**data).result | |
class LiveDanmaku(AsyncEvent): | |
""" | |
Websocket 实时获取直播弹幕 | |
Events: | |
+ DANMU_MSG: 用户发送弹幕 | |
+ SEND_GIFT: 礼物 | |
+ COMBO_SEND:礼物连击 | |
+ GUARD_BUY:续费大航海 | |
+ SUPER_CHAT_MESSAGE:醒目留言(SC) | |
+ SUPER_CHAT_MESSAGE_JPN:醒目留言(带日语翻译?) | |
+ WELCOME: 老爷进入房间 | |
+ WELCOME_GUARD: 房管进入房间 | |
+ NOTICE_MSG: 系统通知(全频道广播之类的) | |
+ PREPARING: 直播准备中 | |
+ LIVE: 直播开始 | |
+ ROOM_REAL_TIME_MESSAGE_UPDATE: 粉丝数等更新 | |
+ ENTRY_EFFECT: 进场特效 | |
+ ROOM_RANK: 房间排名更新 | |
+ INTERACT_WORD: 用户进入直播间 | |
+ ACTIVITY_BANNER_UPDATE_V2: 好像是房间名旁边那个 xx 小时榜 | |
+ =========================== | |
+ 本模块自定义事件: | |
+ ========================== | |
+ VIEW: 直播间人气更新 | |
+ ALL: 所有事件 | |
+ DISCONNECT: 断开连接(传入连接状态码参数) | |
+ TIMEOUT: 心跳响应超时 | |
+ VERIFICATION_SUCCESSFUL: 认证成功 | |
""" | |
PROTOCOL_VERSION_RAW_JSON = 0 | |
PROTOCOL_VERSION_HEARTBEAT = 1 | |
PROTOCOL_VERSION_BROTLI_JSON = 3 | |
DATAPACK_TYPE_HEARTBEAT = 2 | |
DATAPACK_TYPE_HEARTBEAT_RESPONSE = 3 | |
DATAPACK_TYPE_NOTICE = 5 | |
DATAPACK_TYPE_VERIFY = 7 | |
DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE = 8 | |
STATUS_INIT = 0 | |
STATUS_CONNECTING = 1 | |
STATUS_ESTABLISHED = 2 | |
STATUS_CLOSING = 3 | |
STATUS_CLOSED = 4 | |
STATUS_ERROR = 5 | |
def __init__( | |
self, | |
room_display_id: int, | |
debug: bool = False, | |
credential: Union[Credential, None] = None, | |
max_retry: int = 5, | |
retry_after: float = 1, | |
): | |
""" | |
Args: | |
room_display_id (int) : 房间展示 ID | |
debug (bool, optional) : 调试模式,将输出更多信息。. Defaults to False. | |
credential (Credential | None, optional): 凭据. Defaults to None. | |
max_retry (int, optional) : 连接出错后最大重试次数. Defaults to 5 | |
retry_after (int, optional) : 连接出错后重试间隔时间(秒). Defaults to 1 | |
""" | |
super().__init__() | |
self.credential = credential if credential is not None else Credential() | |
self.room_display_id = room_display_id | |
self.max_retry = max_retry | |
self.retry_after = retry_after | |
self.__room_real_id = None | |
self.__status = 0 | |
self.__ws = None | |
self.__tasks = [] | |
self.__debug = debug | |
self.__heartbeat_timer = 60.0 | |
self.err_reason = "" | |
# logging | |
self.logger = logging.getLogger(f"LiveDanmaku_{self.room_display_id}") | |
self.logger.setLevel(logging.DEBUG if debug else logging.INFO) | |
if not self.logger.handlers: | |
handler = logging.StreamHandler() | |
handler.setFormatter( | |
logging.Formatter( | |
"[" | |
+ str(room_display_id) | |
+ "][%(asctime)s][%(levelname)s] %(message)s" | |
) | |
) | |
self.logger.addHandler(handler) | |
def get_status(self) -> int: | |
""" | |
获取连接状态 | |
Returns: | |
int: 0 初始化,1 连接建立中,2 已连接,3 断开连接中,4 已断开,5 错误 | |
""" | |
return self.__status | |
async def connect(self) -> None: | |
""" | |
连接直播间 | |
""" | |
if self.get_status() == self.STATUS_CONNECTING: | |
raise LiveException("正在建立连接中") | |
if self.get_status() == self.STATUS_ESTABLISHED: | |
raise LiveException("连接已建立,不可重复调用") | |
if self.get_status() == self.STATUS_CLOSING: | |
raise LiveException("正在关闭连接,不可调用") | |
await self.__main() | |
async def disconnect(self) -> None: | |
""" | |
断开连接 | |
""" | |
if self.get_status() != self.STATUS_ESTABLISHED: | |
raise LiveException("尚未连接服务器") | |
self.__status = self.STATUS_CLOSING | |
self.logger.info("连接正在关闭") | |
# 取消所有任务 | |
while len(self.__tasks) > 0: | |
self.__tasks.pop().cancel() | |
self.__status = self.STATUS_CLOSED | |
await self.__ws.close() # type: ignore | |
self.logger.info("连接已关闭") | |
async def __main(self) -> None: | |
""" | |
入口 | |
""" | |
self.__status = self.STATUS_CONNECTING | |
room = LiveRoom(self.room_display_id, self.credential) | |
self.logger.info(f"准备连接直播间 {self.room_display_id}") | |
# 获取真实房间号 | |
self.logger.debug("正在获取真实房间号") | |
info = await room.get_room_play_info() | |
self.__room_real_id = info["room_id"] | |
self.logger.debug(f"获取成功,真实房间号:{self.__room_real_id}") | |
# 获取直播服务器配置 | |
self.logger.debug("正在获取聊天服务器配置") | |
conf = await room.get_danmu_info() | |
self.logger.debug("聊天服务器配置获取成功") | |
# 连接直播间 | |
self.logger.debug("准备连接直播间") | |
session = get_aiohttp_session() | |
available_hosts: List[dict] = conf["host_list"] | |
retry = self.max_retry | |
host = None | |
async def on_timeout(ev): | |
# 连接超时 | |
self.err_reason = "心跳响应超时" | |
await self.__ws.close() # type: ignore | |
while True: | |
self.err_reason = "" | |
# 重置心跳计时器 | |
self.__heartbeat_timer = 0 | |
if not available_hosts: | |
self.err_reason = "已尝试所有主机但仍无法连接" | |
break | |
if host is None or retry <= 0: | |
host = available_hosts.pop() | |
retry = self.max_retry | |
port = host["wss_port"] | |
protocol = "wss" | |
uri = f"{protocol}://{host['host']}:{port}/sub" | |
self.__status = self.STATUS_CONNECTING | |
self.logger.info(f"正在尝试连接主机: {uri}") | |
try: | |
async with session.ws_connect(uri, headers=HEADERS.copy()) as ws: | |
async def on_verification_successful(data): | |
# 新建心跳任务 | |
while len(self.__tasks) > 0: | |
self.__tasks.pop().cancel() | |
self.__tasks.append(asyncio.create_task(self.__heartbeat(ws))) | |
self.__ws = ws | |
self.logger.debug("连接主机成功, 准备发送认证信息") | |
await self.__send_verify_data(ws, conf["token"]) | |
async for msg in ws: | |
if msg.type == aiohttp.WSMsgType.BINARY: | |
self.logger.debug(f"收到原始数据:{msg.data}") | |
await self.__handle_data(msg.data) | |
elif msg.type == aiohttp.WSMsgType.ERROR: | |
self.__status = self.STATUS_ERROR | |
self.logger.error("出现错误") | |
elif msg.type == aiohttp.WSMsgType.CLOSING: | |
self.logger.debug("连接正在关闭") | |
self.__status = self.STATUS_CLOSING | |
elif msg.type == aiohttp.WSMsgType.CLOSED: | |
self.logger.info("连接已关闭") | |
self.__status = self.STATUS_CLOSED | |
# 正常断开情况下跳出循环 | |
if self.__status != self.STATUS_CLOSED or self.err_reason: | |
# 非用户手动调用关闭,触发重连 | |
self.logger.warning( | |
"非正常关闭连接" if not self.err_reason else self.err_reason | |
) | |
else: | |
break | |
except Exception as e: | |
await ws.close() | |
self.logger.warning(e) | |
if retry <= 0 or len(available_hosts) == 0: | |
self.logger.error("无法连接服务器") | |
self.err_reason = "无法连接服务器" | |
break | |
self.logger.warning(f"将在 {self.retry_after} 秒后重新连接...") | |
self.__status = self.STATUS_ERROR | |
retry -= 1 | |
await asyncio.sleep(self.retry_after) | |
async def __handle_data(self, data) -> None: | |
""" | |
处理数据 | |
""" | |
data = self.__unpack(data) | |
self.logger.debug(f"收到信息:{data}") | |
for info in data: | |
callback_info = { | |
"room_display_id": self.room_display_id, | |
"room_real_id": self.__room_real_id, | |
} | |
# 依次处理并调用用户指定函数 | |
if ( | |
info["datapack_type"] | |
== LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE | |
): | |
# 认证反馈 | |
if info["data"]["code"] == 0: | |
# 认证成功反馈 | |
self.logger.info("连接服务器并认证成功") | |
self.__status = self.STATUS_ESTABLISHED | |
callback_info["type"] = "VERIFICATION_SUCCESSFUL" | |
callback_info["data"] = None | |
self.dispatch("VERIFICATION_SUCCESSFUL", callback_info) | |
self.dispatch("ALL", callback_info) | |
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE: | |
# 心跳包反馈,返回直播间人气 | |
self.logger.debug("收到心跳包反馈") | |
# 重置心跳计时器 | |
self.__heartbeat_timer = 30.0 | |
callback_info["type"] = "VIEW" | |
callback_info["data"] = info["data"]["view"] | |
self.dispatch("VIEW", callback_info) | |
self.dispatch("ALL", callback_info) | |
elif info["datapack_type"] == LiveDanmaku.DATAPACK_TYPE_NOTICE: | |
# 直播间弹幕、礼物等信息 | |
callback_info["type"] = info["data"]["cmd"] | |
# DANMU_MSG 事件名特殊:DANMU_MSG:4:0:2:2:2:0,需取出事件名,暂不知格式 | |
if callback_info["type"].find("DANMU_MSG") > -1: | |
callback_info["type"] = "DANMU_MSG" | |
info["data"]["cmd"] = "DANMU_MSG" | |
callback_info["data"] = info["data"] | |
self.dispatch(callback_info["type"], callback_info) | |
self.dispatch("ALL", callback_info) | |
else: | |
self.logger.warning("检测到未知的数据包类型,无法处理") | |
async def __send_verify_data(self, ws: ClientWebSocketResponse, token: str) -> None: | |
self.credential.raise_for_no_buvid3() | |
# 没传入 dedeuserid 可以试图 live.get_self_info | |
if not self.credential.has_dedeuserid(): | |
try: | |
info = await get_self_info(self.credential) | |
self.credential.dedeuserid = str(info["uid"]) | |
except: | |
pass # 留到下面一起抛出错误 | |
self.credential.raise_for_no_dedeuserid() | |
verifyData = { | |
"uid": int(self.credential.dedeuserid), | |
"roomid": self.__room_real_id, | |
"protover": 3, | |
"buvid": self.credential.buvid3, | |
"platform": "web", | |
"type": 2, | |
"key": token, | |
} | |
data = json.dumps(verifyData).encode() | |
await self.__send( | |
data, self.PROTOCOL_VERSION_HEARTBEAT, self.DATAPACK_TYPE_VERIFY, ws | |
) | |
async def __heartbeat(self, ws: ClientWebSocketResponse) -> None: | |
""" | |
定时发送心跳包 | |
""" | |
HEARTBEAT = self.__pack( | |
b"[object Object]", | |
self.PROTOCOL_VERSION_HEARTBEAT, | |
self.DATAPACK_TYPE_HEARTBEAT, | |
) | |
while True: | |
if self.__heartbeat_timer == 0: | |
self.logger.debug("发送心跳包") | |
await ws.send_bytes(HEARTBEAT) | |
heartbeat_url = "https://live-trace.bilibili.com/xlive/rdata-interface/v1/heartbeat/webHeartBeat?pf=web&hb=" | |
hb = str( | |
base64.b64encode(f"60|{self.room_display_id}|1|0".encode("utf-8")), | |
"utf-8", | |
) | |
await Api( | |
method="GET", url=heartbeat_url, json_body=True | |
).update_params(**{"hb": hb, "pf": "web"}).result | |
elif self.__heartbeat_timer <= -30: | |
# 视为已异常断开连接,发布 TIMEOUT 事件 | |
self.dispatch("TIMEOUT") | |
break | |
await asyncio.sleep(1.0) | |
self.__heartbeat_timer -= 1 | |
async def __send( | |
self, | |
data: bytes, | |
protocol_version: int, | |
datapack_type: int, | |
ws: ClientWebSocketResponse, | |
) -> None: | |
""" | |
自动打包并发送数据 | |
""" | |
data = self.__pack(data, protocol_version, datapack_type) | |
self.logger.debug(f"发送原始数据:{data}") | |
await ws.send_bytes(data) | |
def __pack(data: bytes, protocol_version: int, datapack_type: int) -> bytes: | |
""" | |
打包数据 | |
""" | |
sendData = bytearray() | |
sendData += struct.pack(">H", 16) | |
raise_for_statement(0 <= protocol_version <= 2, LiveException("数据包协议版本错误,范围 0~2")) | |
sendData += struct.pack(">H", protocol_version) | |
raise_for_statement(datapack_type in [2, 7], LiveException("数据包类型错误,可用类型:2, 7")) | |
sendData += struct.pack(">I", datapack_type) | |
sendData += struct.pack(">I", 1) | |
sendData += data | |
sendData = struct.pack(">I", len(sendData) + 4) + sendData | |
return bytes(sendData) | |
def __unpack(data: bytes) -> List[Any]: | |
""" | |
解包数据 | |
""" | |
ret = [] | |
offset = 0 | |
header = struct.unpack(">IHHII", data[:16]) | |
if header[2] == LiveDanmaku.PROTOCOL_VERSION_BROTLI_JSON: | |
realData = brotli.decompress(data[16:]) | |
else: | |
realData = data | |
if ( | |
header[2] == LiveDanmaku.PROTOCOL_VERSION_HEARTBEAT | |
and header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE | |
): | |
realData = realData[16:] | |
# 心跳包协议特殊处理 | |
recvData = { | |
"protocol_version": header[2], | |
"datapack_type": header[3], | |
"data": {"view": struct.unpack(">I", realData[0:4])[0]}, | |
} | |
ret.append(recvData) | |
return ret | |
while offset < len(realData): | |
header = struct.unpack(">IHHII", realData[offset: offset + 16]) | |
length = header[0] | |
recvData = { | |
"protocol_version": header[2], | |
"datapack_type": header[3], | |
"data": None, | |
} | |
chunkData = realData[(offset + 16): (offset + length)] | |
if header[2] == 0: | |
recvData["data"] = json.loads(chunkData.decode()) | |
elif header[2] == 2: | |
recvData["data"] = json.loads(chunkData.decode()) | |
elif header[2] == 1: | |
if header[3] == LiveDanmaku.DATAPACK_TYPE_HEARTBEAT_RESPONSE: | |
recvData["data"] = {"view": struct.unpack(">I", chunkData)[0]} | |
elif header[3] == LiveDanmaku.DATAPACK_TYPE_VERIFY_SUCCESS_RESPONSE: | |
recvData["data"] = json.loads(chunkData.decode()) | |
ret.append(recvData) | |
offset += length | |
return ret | |
async def get_self_info(credential: Credential) -> dict: | |
""" | |
获取自己直播等级、排行等信息 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
credential.raise_for_no_sessdata() | |
api = API["info"]["user_info"] | |
return await Api(**api, credential=credential).result | |
async def get_self_live_info(credential: Credential) -> dict: | |
""" | |
获取自己的粉丝牌、大航海等信息 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
credential.raise_for_no_sessdata() | |
api = API["info"]["live_info"] | |
return await Api(**api, credential=credential).result | |
async def get_self_dahanghai_info( | |
page: int = 1, page_size: int = 10, credential: Union[Credential, None] = None | |
) -> dict: | |
""" | |
获取自己开通的大航海信息 | |
Args: | |
page (int, optional): 页数. Defaults to 1. | |
page_size (int, optional): 每页数量. Defaults to 10. | |
总页数取得方法: | |
```python | |
import math | |
info = live.get_self_live_info(credential) | |
pages = math.ceil(info['data']['guards'] / 10) | |
``` | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
if credential is None: | |
credential = Credential() | |
credential.raise_for_no_sessdata() | |
api = API["info"]["user_guards"] | |
params = {"page": page, "page_size": page_size} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_self_bag(credential: Credential) -> dict: | |
""" | |
获取自己的直播礼物包裹信息 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
credential.raise_for_no_sessdata() | |
api = API["info"]["bag_list"] | |
return await Api(**api, credential=credential).result | |
async def get_gift_config( | |
room_id: Union[int, None] = None, | |
area_id: Union[int, None] = None, | |
area_parent_id: Union[int, None] = None, | |
): | |
""" | |
获取所有礼物的信息,包括礼物 id、名称、价格、等级等。 | |
同时填了 room_id、area_id、area_parent_id,则返回一个较小的 json,只包含该房间、该子区域、父区域的礼物。 | |
但即使限定了三个条件,仍然会返回约 1.5w 行的 json。不加限定则是 2.8w 行。 | |
Args: | |
room_id (int, optional) : 房间显示 ID. Defaults to None. | |
area_id (int, optional) : 子分区 ID. Defaults to None. | |
area_parent_id (int, optional) : 父分区 ID. Defaults to None. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["gift_config"] | |
params = { | |
"platform": "pc", | |
"source": "live", | |
"room_id": room_id if room_id is not None else "", | |
"area_id": area_id if area_id is not None else "", | |
"area_parent_id": area_parent_id if area_parent_id is not None else "", | |
} | |
return await Api(**api).update_params(**params).result | |
async def get_area_info() -> dict: | |
""" | |
获取所有分区信息 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
api = API["info"]["area_info"] | |
return await Api(**api).result | |
async def get_live_followers_info( | |
need_recommend: bool = True, credential: Union[Credential, None] = None | |
) -> dict: | |
""" | |
获取关注列表中正在直播的直播间信息,包括房间直播热度,房间名称及标题,清晰度,是否官方认证等信息。 | |
Args: | |
need_recommend (bool, optional): 是否接受推荐直播间,Defaults to True | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
if credential is None: | |
credential = Credential() | |
credential.raise_for_no_sessdata() | |
api = API["info"]["followers_live_info"] | |
params = {"need_recommend": int(need_recommend), "filterRule": 0} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def get_unlive_followers_info( | |
page: int = 1, page_size: int = 30, credential: Union[Credential, None] = None | |
) -> dict: | |
""" | |
获取关注列表中未在直播的直播间信息,包括上次开播时间,上次开播的类别,直播间公告,是否有录播等。 | |
Args: | |
page (int, optional): 页码, Defaults to 1. | |
page_size (int, optional): 每页数量 Defaults to 30. | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
if credential is None: | |
credential = Credential() | |
credential.raise_for_no_sessdata() | |
api = API["info"]["followers_unlive_info"] | |
params = { | |
"page": page, | |
"pagesize": page_size, | |
} | |
return await Api(**api, credential=credential).update_params(**params).result | |
async def create_live_reserve( | |
title: str, start_time: int, credential: Credential | |
) -> dict: | |
""" | |
创建直播预约 | |
Args: | |
title (str) : 直播间标题 | |
start_time (int) : 开播时间戳 | |
Returns: | |
dict: 调用 API 返回的结果 | |
""" | |
credential.raise_for_no_sessdata() | |
api = API["operate"]["create_reserve"] | |
data = { | |
"title": title, | |
"type": 2, | |
"live_plan_start_time": start_time, | |
"stime": None, | |
"from": 1, | |
} | |
return await Api(**api, credential=credential).update_data(**data).result | |