Spaces:
Sleeping
Sleeping
File size: 5,856 Bytes
0aee47a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
"""
from bilibili_api import Credential
凭据操作类
"""
import re
import time
import uuid
import binascii
from typing import Union
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_OAEP
from .credential import Credential as _Credential
from .network import Api, get_api, get_session, HEADERS
key = RSA.importKey(
"""\
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDLgd2OAkcGVtoE3ThUREbio0Eg
Uc/prcajMKXvkCKFCWhJYJcLkcM2DKKcSeFpD/j6Boy538YXnR6VhcuUJOhH2x71
nzPjfdTcqMz7djHum0qSZA0AyCBDABUqCrfNgCiJ00Ra7GmRj+YCK1NJEuewlb40
JNrRuoEUXpabUzGB8QIDAQAB
-----END PUBLIC KEY-----"""
)
API = get_api("credential")
class Credential(_Credential):
"""
凭据操作类,用于各种请求操作。
"""
async def check_refresh(self) -> bool:
"""
检查是否需要刷新 cookies
Returns:
bool: cookies 是否需要刷新
"""
return await check_cookies(self)
async def refresh(self) -> None:
"""
刷新 cookies
"""
new_cred: Credential = await refresh_cookies(self)
self.sessdata = new_cred.sessdata
self.bili_jct = new_cred.bili_jct
self.dedeuserid = new_cred.dedeuserid
self.ac_time_value = new_cred.ac_time_value
async def check_valid(self) -> bool:
"""
检查 cookies 是否有效
Returns:
bool: cookies 是否有效
"""
data = await Api(
credential=self, **get_api("credential")["info"]["valid"]
).result
return data["isLogin"]
@staticmethod
def from_cookies(cookies: dict={}) -> "Credential":
"""
从 cookies 新建 Credential
Args:
cookies (dict, optional): Cookies. Defaults to {}.
Returns:
Credential: 凭据类
"""
c = Credential()
c.sessdata = cookies.get("SESSDATA")
c.bili_jct = cookies.get("bili_jct")
c.buvid3 = cookies.get("buvid3")
c.dedeuserid = cookies.get("DedeUserID")
c.ac_time_value = cookies.get("ac_time_value")
return c
"""
Cookies 刷新相关
感谢 bilibili-API-collect 提供的刷新 Cookies 的思路
https://socialsisteryi.github.io/bilibili-API-collect/docs/login/cookie_refresh.html
"""
async def check_cookies(credential: Credential) -> bool:
"""
检查是否需要刷新 Cookies
Args:
credential (Credential): 用户凭证
Return:
bool: 是否需要刷新 Cookies
"""
api = API["info"]["check_cookies"]
return (await Api(**api, credential=credential).result)["refresh"]
def getCorrespondPath() -> str:
"""
根据时间生成 CorrespondPath
Return:
str: CorrespondPath
"""
ts = round(time.time() * 1000)
cipher = PKCS1_OAEP.new(key, SHA256)
encrypted = cipher.encrypt(f"refresh_{ts}".encode())
return binascii.b2a_hex(encrypted).decode()
async def get_refresh_csrf(credential: Credential) -> str:
"""
获取刷新 Cookies 的 csrf
Return:
str: csrf
"""
correspond_path = getCorrespondPath()
api = API["operate"]["get_refresh_csrf"]
cookies = credential.get_cookies()
cookies["buvid3"] = str(uuid.uuid1())
cookies["Domain"] = ".bilibili.com"
resp = await get_session().request(
"GET",
api["url"].replace("{correspondPath}", correspond_path),
cookies=cookies,
headers=HEADERS.copy(),
)
if resp.status_code == 404:
raise Exception("correspondPath 过期或错误。")
elif resp.status_code == 200:
text = resp.text
refresh_csrf = re.findall('<div id="1-name">(.+?)</div>', text)[0]
return refresh_csrf
elif resp.status_code != 200:
raise Exception("获取刷新 Cookies 的 csrf 失败。")
async def refresh_cookies(credential: Credential) -> Credential:
"""
刷新 Cookies
Args:
credential (Credential): 用户凭证
Return:
Credential: 新的用户凭证
"""
api = API["operate"]["refresh_cookies"]
credential.raise_for_no_bili_jct()
credential.raise_for_no_ac_time_value()
refresh_csrf = await get_refresh_csrf(credential)
data = {
"csrf": credential.bili_jct,
"refresh_csrf": refresh_csrf,
"refresh_token": credential.ac_time_value,
"source": "main_web",
}
cookies = credential.get_cookies()
cookies["buvid3"] = str(uuid.uuid1())
cookies["Domain"] = ".bilibili.com"
resp = await get_session().request(
"POST", api["url"], cookies=cookies, data=data, headers=HEADERS.copy()
)
if resp.status_code != 200 or resp.json()["code"] != 0:
raise Exception("刷新 Cookies 失败")
new_credential = Credential(
sessdata=resp.cookies["SESSDATA"],
bili_jct=resp.cookies["bili_jct"],
dedeuserid=resp.cookies["DedeUserID"],
ac_time_value=resp.json()["data"]["refresh_token"],
)
await confirm_refresh(credential, new_credential)
return new_credential
async def confirm_refresh(
old_credential: Credential, new_credential: Credential
) -> None:
"""
让旧的refresh_token对应的 Cookie 失效
Args:
old_credential (Credential): 旧的用户凭证
new_credential (Credential): 新的用户凭证
"""
api = API["operate"]["confirm_refresh"]
data = {
"csrf": new_credential.bili_jct,
"refresh_token": old_credential.ac_time_value,
}
await Api(**api, credential=new_credential).update_data(**data).result
|