# based on sqlalchemy's lib/sqlalchemy/util/_concurrency_py3k.py
from collections.abc import Callable
from contextvars import copy_context
import sys
from typing import Any
from typing import Awaitable
from typing import NoReturn
from typing import ParamSpec
from typing import TypeVar
_T = TypeVar("_T")
_P = ParamSpec("_P")
try:
import greenlet
# implementation based on snaury gist at
# https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
# Issue for context: https://github.com/python-greenlet/greenlet/issues/173
class _AsyncIoGreenlet(greenlet.greenlet):
def __init__(self, fn: Callable[..., Any], driver: greenlet.greenlet | None) -> None:
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
self.gr_context = copy_context()
def await_(awaitable: Awaitable[_T]) -> _T:
"""Awaits an async function in a sync method.
The sync method must be insice a :func:`greenlet_spawn` context.
:func:`await_` calls cannot be nested.
Args:
awaitable (Awaitable): The awaitable to call.
Raises:
RuntimeError: If ``await_`` was called outside a :func:`greenlet_spawn` context or
nested in another ``await_`` call.
Returns:
Any: The return value of ``awaitable`` or raises an exception if it raised one.
"""
# this is called in the context greenlet while running fn
current = greenlet.getcurrent()
if not isinstance(current, _AsyncIoGreenlet):
raise RuntimeError(
"Cannot use await_ outside of greenlet_spawn target "
"or nested inside another await_ call"
)
# returns the control to the driver greenlet passing it
# a coroutine to run. Once the awaitable is done, the driver greenlet
# switches back to this greenlet with the result of awaitable that is
# then returned to the caller (or raised as error)
return current.driver.switch(awaitable) # type: ignore[union-attr,no-any-return]
async def greenlet_spawn(fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> _T:
"""Runs a sync function ``fn`` in a new greenlet.
The sync function can then use :func:`await_` to wait for async functions.
Args:
fn (Callable): The sync callable to call.
\\*args: Positional arguments to pass to the ``fn`` callable.
\\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
Returns:
Any: The return value of ``fn`` or raises an exception if it raised one.
"""
context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
# runs the function synchronously in gl greenlet. If the execution
# is interrupted by await_, context is not dead and result is a
# coroutine to wait. If the context is dead the function has
# returned, and its result can be returned.
try:
result = context.switch(*args, **kwargs)
while not context.dead:
try:
# wait for a coroutine from await_ and then return its result
# back to it.
value = await result
except Exception:
# this allows an exception to be raised within
# the moderated greenlet so that it can continue
# its expected flow.
result = context.throw(*sys.exc_info())
else:
result = context.switch(value)
finally:
# clean up to avoid cycle resolution by gc
del context.driver
return result # type: ignore[no-any-return]
except ImportError: # pragma: no cover
greenlet = None # type: ignore[assignment]
def _not_implemented() -> NoReturn:
raise ValueError("Greenlet is required to use this function")
[docs]
def await_(awaitable: Awaitable[_T]) -> _T:
_not_implemented()
[docs]
async def greenlet_spawn(fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs) -> _T:
_not_implemented()