Source code for falcon_auth2.utils.asyncio_compat

# based on sqlalchemy's lib/sqlalchemy/util/_concurrency_py3k.py
import sys
from typing import Any
from typing import Callable
from typing import Coroutine

try:
    import greenlet

    try:
        from contextvars import copy_context as _copy_context

        # If greenlet.gr_context is present in current version of greenlet,
        # it will be set with a copy of the current context on creation.
        # Refs: https://github.com/python-greenlet/greenlet/pull/198
        getattr(greenlet.greenlet, "gr_context")
    except (ImportError, AttributeError):
        _copy_context = None

    # implementation based on snaury gist at
    # https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
    # Issue for context: https://github.com/python-greenlet/greenlet/issues/173
    class _AsyncIoGreenlet(greenlet.greenlet):
        def __init__(self, fn, driver):
            greenlet.greenlet.__init__(self, fn, driver)
            self.driver = driver
            if _copy_context is not None:
                self.gr_context = _copy_context()

    def await_(awaitable: Coroutine) -> Any:
        """Awaits an async function in a sync method.

        The sync method must be insice a :func:`greenlet_spawn` context.
        :func:`await_` calls cannot be nested.

        Args:
            awaitable (Coroutine): The coroutine to call.

        Raises:
            RuntimeError: If ``await_`` was called outside a :func:`greenlet_spawn` context or
                nested in another ``await_`` call.

        Returns:
            Any: The return value of ``awaitable`` or raises an exception if it raised one.
        """
        # this is called in the context greenlet while running fn
        current = greenlet.getcurrent()
        if not isinstance(current, _AsyncIoGreenlet):
            raise RuntimeError(
                "Cannot use await_ outside of greenlet_spawn target "
                "or nested inside another await_ call"
            )
        # returns the control to the driver greenlet passing it
        # a coroutine to run. Once the awaitable is done, the driver greenlet
        # switches back to this greenlet with the result of awaitable that is
        # then returned to the caller (or raised as error)
        return current.driver.switch(awaitable)

    async def greenlet_spawn(fn: Callable, *args, **kwargs) -> Any:
        """Runs a sync function ``fn`` in a new greenlet.

        The sync function can then use :func:`await_` to wait for async functions.

        Args:
            fn (Callable): The sync callable to call.
            \\*args: Positional arguments to pass to the ``fn`` callable.
            \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.

        Returns:
            Any: The return value of ``fn`` or raises an exception if it raised one.
        """
        context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
        # runs the function synchronously in gl greenlet. If the execution
        # is interrupted by await_, context is not dead and result is a
        # coroutine to wait. If the context is dead the function has
        # returned, and its result can be returned.
        try:
            result = context.switch(*args, **kwargs)
            while not context.dead:
                try:
                    # wait for a coroutine from await_ and then return its result
                    # back to it.
                    value = await result
                except Exception:
                    # this allows an exception to be raised within
                    # the moderated greenlet so that it can continue
                    # its expected flow.
                    result = context.throw(*sys.exc_info())
                else:
                    result = context.switch(value)
        finally:
            # clean up to avoid cycle resolution by gc
            del context.driver
        return result

except ImportError:  # pragma: no cover
    greenlet = None

    def _not_implemented():
        # this conditional is to prevent pylance from considering
        # greenlet_spawn() etc as "no return" and dimming out code below it
        if greenlet:
            return None
        raise ValueError("Greenlet is required to use this function")

[docs] def await_(awaitable): _not_implemented()
[docs] async def greenlet_spawn(fn, *args, **kw): _not_implemented()