Spaces:
Sleeping
Sleeping
File size: 729 Bytes
0aee47a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
"""
bilibili_api.utils.sync
同步执行异步函数
"""
import asyncio
from typing import Any, TypeVar, Coroutine
T = TypeVar("T")
def __ensure_event_loop() -> None:
try:
asyncio.get_event_loop()
except:
asyncio.set_event_loop(asyncio.new_event_loop())
def sync(coroutine: Coroutine[Any, Any, T]) -> T:
"""
同步执行异步函数,使用可参考 [同步执行异步代码](https://nemo2011.github.io/bilibili-api/#/sync-executor)
Args:
coroutine (Coroutine): 异步函数
Returns:
该异步函数的返回值
"""
__ensure_event_loop()
loop = asyncio.get_event_loop()
return loop.run_until_complete(coroutine)
|