Spaces:
Sleeping
Sleeping
""" | |
bilibili_api.utils.sync | |
同步执行异步函数 | |
""" | |
import asyncio | |
from typing import Any, TypeVar, Coroutine | |
T = TypeVar("T") | |
def __ensure_event_loop() -> None: | |
try: | |
asyncio.get_event_loop() | |
except: | |
asyncio.set_event_loop(asyncio.new_event_loop()) | |
def sync(coroutine: Coroutine[Any, Any, T]) -> T: | |
""" | |
同步执行异步函数,使用可参考 [同步执行异步代码](https://nemo2011.github.io/bilibili-api/#/sync-executor) | |
Args: | |
coroutine (Coroutine): 异步函数 | |
Returns: | |
该异步函数的返回值 | |
""" | |
__ensure_event_loop() | |
loop = asyncio.get_event_loop() | |
return loop.run_until_complete(coroutine) | |