Source code for asphalt.core.context

import logging
import re
from asyncio import get_event_loop, iscoroutinefunction, AbstractEventLoop
from concurrent.futures import Executor
from functools import wraps
from inspect import isawaitable, getattr_static
from traceback import format_exception
from typing import Optional, Callable, Any, Sequence, Dict, Tuple, Type, List, Union, Awaitable

import asyncio_extras
from async_generator import async_generator, isasyncgenfunction
from typeguard import check_argument_types

from asphalt.core.event import Signal, Event, wait_event
from asphalt.core.utils import qualified_name, callable_name

__all__ = ('ResourceEvent', 'ResourceConflict', 'ResourceNotFound', 'TeardownError', 'Context',
           'executor', 'context_teardown')

logger = logging.getLogger(__name__)
factory_callback_type = Callable[['Context'], Any]
resource_name_re = re.compile(r'\w+')


class ResourceContainer:
    """
    Contains the resource value or its factory callable, plus some metadata.

    :ivar value_or_factory: the resource value or the factory callback
    :ivar types: type names the resource was registered with
    :vartype types: Tuple[type, ...]
    :ivar str name: name of the resource
    :ivar str context_attr: the context attribute of the resource
    :ivar bool is_factory: ``True`` if ``value_or_factory`` if this is a resource factory
    """

    __slots__ = 'value_or_factory', 'types', 'name', 'context_attr', 'is_factory'

    def __init__(self, value_or_factory, types: Tuple[type, ...], name: str,
                 context_attr: Optional[str], is_factory: bool) -> None:
        self.value_or_factory = value_or_factory
        self.types = types
        self.name = name
        self.context_attr = context_attr
        self.is_factory = is_factory

    def generate_value(self, ctx: 'Context'):
        assert self.is_factory, 'generate_value() only works for resource factories'
        value = self.value_or_factory(ctx)

        container = ResourceContainer(value, self.types, self.name, self.context_attr, False)
        for type_ in self.types:
            ctx._resources[(type_, self.name)] = container

        if self.context_attr:
            setattr(ctx, self.context_attr, value)

        return value

    def __repr__(self):
        typenames = ', '.join(qualified_name(cls) for cls in self.types)
        value_repr = ('factory=%s' % callable_name(self.value_or_factory) if self.is_factory
                      else 'value=%r' % self.value_or_factory)
        return ('{self.__class__.__name__}({value_repr}, types=[{typenames}], name={self.name!r}, '
                'context_attr={self.context_attr!r})'.format(
                    self=self, value_repr=value_repr, typenames=typenames))


class ResourceEvent(Event):
    """
    Dispatched when a resource or resource factory has been added to a context.

    :ivar resource_types: types the resource was registered under
    :vartype resource_types: Tuple[type, ...]
    :ivar str name: name of the resource
    :ivar bool is_factory: ``True`` if a resource factory was added, ``False`` if a regular
        resource was added
    """

    __slots__ = 'resource_types', 'resource_name', 'is_factory'

    def __init__(self, source: 'Context', topic: str, types: Tuple[type, ...], name: str,
                 is_factory: bool) -> None:
        super().__init__(source, topic)
        self.resource_types = types
        self.resource_name = name
        self.is_factory = is_factory


class ResourceConflict(Exception):
    """
    Raised when a new resource that is being published conflicts with an existing resource or
    context variable.
    """


class ResourceNotFound(LookupError):
    """Raised when a resource request cannot be fulfilled within the allotted time."""

    def __init__(self, type: type, name: str) -> None:
        super().__init__(type, name)
        self.type = type
        self.name = name

    def __str__(self):
        return 'no matching resource was found for type={typename} name={self.name!r}'.\
            format(self=self, typename=qualified_name(self.type))


class TeardownError(Exception):
    """
    Raised after context teardown when one or more teardown callbacks raised an exception.

    :ivar exceptions: exceptions raised during context teardown, in the order in which they were
        raised
    :vartype exceptions: List[Exception]
    """

    def __init__(self, exceptions: List[Exception]) -> None:
        super().__init__(exceptions)
        self.exceptions = exceptions

    def __str__(self):
        separator = '----------------------------\n'
        tracebacks = separator.join('\n'.join(format_exception(type(exc), exc, exc.__traceback__))
                                    for exc in self.exceptions)
        return '{} exceptions(s) were raised by teardown callbacks:\n{}{}'.\
            format(len(self.exceptions), separator, tracebacks)


class Context:
    """
    Contexts give request handlers and callbacks access to resources.

    Contexts are stacked in a way that accessing an attribute that is not present in the current
    context causes the attribute to be looked up in the parent instance and so on, until the
    attribute is found (or :class:`AttributeError` is raised).

    :param parent: the parent context, if any

    :ivar Context parent: the parent context, if any
    :var Signal resource_added: a signal (:class:`ResourceEvent`) dispatched when a resource
        has been published in this context
    """

    resource_added = Signal(ResourceEvent)

    def __init__(self, parent: 'Context' = None) -> None:
        assert check_argument_types()
        self._parent = parent
        self._loop = getattr(parent, 'loop', None) or get_event_loop()
        self._closed = False
        self._resources = {}  # type: Dict[Tuple[type, str], ResourceContainer]
        self._resource_factories = {}  # type: Dict[Tuple[type, str], ResourceContainer]
        self._resource_factories_by_context_attr = {}  # type: Dict[str, ResourceContainer]
        self._teardown_callbacks = []  # type: List[Tuple[Callable, bool]]

    def __getattr__(self, name):
        # First look for a resource factory in the whole context chain
        for ctx in self.context_chain:
            factory = ctx._resource_factories_by_context_attr.get(name)
            if factory:
                return factory.generate_value(self)

        # When that fails, look directly for an attribute in the parents
        for ctx in self.context_chain[1:]:
            value = getattr_static(ctx, name, None)
            if value is not None:
                return getattr(ctx, name)

        raise AttributeError('no such context variable: {}'.format(name))

    @property
    def context_chain(self) -> List['Context']:
        """Return a list of contexts starting from this one, its parent and so on."""
        contexts = []
        ctx = self
        while ctx is not None:
            contexts.append(ctx)
            ctx = ctx.parent

        return contexts

    @property
    def loop(self) -> AbstractEventLoop:
        """Return the event loop associated with this context."""
        return self._loop

    @property
    def parent(self) -> Optional['Context']:
        """Return the parent context, or ``None`` if there is no parent."""
        return self._parent

    @property
    def closed(self) -> bool:
        """Return ``True`` if the context has been closed, ``False`` otherwise."""
        return self._closed

    def _check_closed(self):
        if self._closed:
            raise RuntimeError('this context has already been closed')

    def add_teardown_callback(self, callback: Callable, pass_exception: bool = False) -> None:
        """
        Add a callback to be called when this context closes.

        This is intended for cleanup of resources, and the list of callbacks is processed in the
        reverse order in which they were added, so the last added callback will be called first.

        The callback may return an awaitable. If it does, the awaitable is awaited on before
        calling any further callbacks.

        :param callback: a callable that is called with either no arguments or with the exception
            that ended this context, based on the value of ``pass_exception``
        :param pass_exception: ``True`` to pass the callback the exception that ended this context
            (or ``None`` if the context ended cleanly)

        """
        assert check_argument_types()
        self._check_closed()
        self._teardown_callbacks.append((callback, pass_exception))

    async def close(self, exception: BaseException = None) -> None:
        """
        Close this context and call any necessary resource teardown callbacks.

        If a teardown callback returns an awaitable, the return value is awaited on before calling
        any further teardown callbacks.

        All callbacks will be processed, even if some of them raise exceptions. If at least one
        callback raised an error, this method will raise a :exc:`~.TeardownError` at the end.

        After this method has been called, resources can no longer be requested or published on
        this context.

        :param exception: the exception, if any, that caused this context to be closed
        :raises .TeardownError: if one or more teardown callbacks raise an exception

        """
        self._check_closed()
        self._closed = True

        exceptions = []
        for callback, pass_exception in reversed(self._teardown_callbacks):
            try:
                retval = callback(exception) if pass_exception else callback()
                if isawaitable(retval):
                    await retval
            except Exception as e:
                exceptions.append(e)

        del self._teardown_callbacks
        if exceptions:
            raise TeardownError(exceptions)

    def __enter__(self):
        self._check_closed()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.loop.run_until_complete(self.close(exc_val))

    async def __aenter__(self):
        self._check_closed()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close(exc_val)

    def add_resource(self, value, name: str = 'default', context_attr: str = None,
                     types: Union[type, Sequence[type]] = ()) -> None:
        """
        Add a resource to this context.

        This will cause a ``resource_added`` event to be dispatched.

        :param value: the actual resource value
        :param name: name of this resource (unique among all its registered types within a single
            context)
        :param context_attr: name of the context attribute this resource will be accessible as
        :param types: type(s) to register the resource as (omit to use the type of ``value``)
        :raises asphalt.core.context.ResourceConflict: if the resource conflicts with an existing
            one in any way

        """
        assert check_argument_types()
        self._check_closed()
        if isinstance(types, type):
            types = (types,)
        elif not types:
            types = (type(value),)

        if value is None:
            raise ValueError('"value" must not be None')
        if not resource_name_re.fullmatch(name):
            raise ValueError('"name" must be a nonempty string consisting only of alphanumeric '
                             'characters and underscores')
        if context_attr and getattr_static(self, context_attr, None) is not None:
            raise ResourceConflict('this context already has an attribute {!r}'.format(
                context_attr))
        for resource_type in types:
            if (resource_type, name) in self._resources:
                raise ResourceConflict(
                    'this context already contains a resource of type {} using the name {!r}'.
                    format(qualified_name(resource_type), name))

        resource = ResourceContainer(value, tuple(types), name, context_attr, False)
        for type_ in resource.types:
            self._resources[(type_, name)] = resource

        if context_attr:
            setattr(self, context_attr, value)

        # Notify listeners that a new resource has been made available
        self.resource_added.dispatch(types, name, False)

    def add_resource_factory(self, factory_callback: factory_callback_type,
                             types: Union[type, Sequence[Type]], name: str = 'default',
                             context_attr: str = None) -> None:
        """
        Add a resource factory to this context.

        This will cause a ``resource_added`` event to be dispatched.

        A resource factory is a callable that generates a "contextual" resource when it is
        requested by either using any of the methods :meth:`get_resource`, :meth:`require_resource`
        or :meth:`request_resource` or its context attribute is accessed.

        When a new resource is created in this manner, it is always bound to the context through
        it was requested, regardless of where in the chain the factory itself was added to.

        :param factory_callback: a (non-coroutine) callable that takes a context instance as
            argument and returns a tuple of (resource object, teardown callback)
        :param types: one or more types to register the generated resource as on the target context
        :param name: name of the resource that will be created in the target context
        :param context_attr: name of the context attribute the created resource will be accessible
            as
        :raises asphalt.core.context.ResourceConflict: if there is an existing resource factory for
            the given type/name combinations or the given context variable

        """
        assert check_argument_types()
        self._check_closed()
        types = (types,) if isinstance(types, type) else types
        if not resource_name_re.fullmatch(name):
            raise ValueError('"name" must be a nonempty string consisting only of alphanumeric '
                             'characters and underscores')
        if iscoroutinefunction(factory_callback):
            raise TypeError('"factory_callback" must not be a coroutine function')
        if not types:
            raise ValueError('"types" must not be empty')

        # Check for a conflicting context attribute
        if context_attr in self._resource_factories_by_context_attr:
            raise ResourceConflict(
                'this context already contains a resource factory for the context attribute {!r}'.
                format(context_attr))

        # Check for conflicts with existing resource factories
        types = tuple(types)
        for type_ in types:
            if (type_, name) in self._resource_factories:
                raise ResourceConflict('this context already contains a resource factory for the '
                                       'type {}'.format(qualified_name(type_)))

        # Add the resource factory to the appropriate lookup tables
        resource = ResourceContainer(factory_callback, types, name, context_attr, True)
        for type_ in types:
            self._resource_factories[(type_, name)] = resource

        if context_attr:
            self._resource_factories_by_context_attr[context_attr] = resource

        # Notify listeners that a new resource has been made available
        self.resource_added.dispatch(types, name, True)

    def get_resource(self, type: type, name: str = 'default'):
        """
        Look up a resource in the chain of contexts.

        :param type: type of the requested resource
        :param name: name of the requested resource
        :return: the requested resource, or ``None`` if none was available

        """
        assert check_argument_types()
        self._check_closed()
        key = (type, name)

        # First check if there's already a matching resource in this context
        resource = self._resources.get(key)
        if resource is not None:
            return resource.value_or_factory

        # Next, check if there's a resource factory available on the context chain
        resource = next((ctx._resource_factories[key] for ctx in self.context_chain
                         if key in ctx._resource_factories), None)
        if resource is not None:
            return resource.generate_value(self)

        # Finally, check parents for a matching resource
        return next((ctx._resources[key].value_or_factory for ctx in self.context_chain
                     if key in ctx._resources), None)

    def require_resource(self, type: type, name: str = 'default'):
        """
        Look up a resource in the chain of contexts and raise an exception if it is not found.

        This is like :meth:`get_resource` except that instead of returning ``None`` when a resource
        is not found, it will raise :exc:`~asphalt.core.context.ResourceNotFound`.

        :param type: type of the requested resource
        :param name: name of the requested resource
        :return: the requested resource
        :raises asphalt.core.context.ResourceNotFound: if a resource of the given type and name was
            not found

        """
        resource = self.get_resource(type, name)
        if resource is None:
            raise ResourceNotFound(type, name)

        return resource

    async def request_resource(self, type: type, name: str = 'default'):
        """
        Look up a resource in the chain of contexts.

        This is like :meth:`get_resource` except that if the resource is not already available, it
        will wait for one to become available.

        :param type: type of the requested resource
        :param name: name of the requested resource
        :return: the requested resource

        """
        # First try to locate an existing resource in this context and its parents
        value = self.get_resource(type, name)
        if value is not None:
            return value

        # Wait until a matching resource or resource factory is available
        signals = [ctx.resource_added for ctx in self.context_chain]
        await wait_event(
            signals, lambda event: event.resource_name == name and type in event.resource_types)
        return self.get_resource(type, name)

    def call_async(self, func: Callable, *args, **kwargs):
        """
        Call the given callable in the event loop thread.

        This method lets you call asynchronous code from a worker thread.
        Do not use it from within the event loop thread.

        If the callable returns an awaitable, it is resolved before returning to the caller.

        :param func: a regular function or a coroutine function
        :param args: positional arguments to call the callable with
        :param kwargs: keyword arguments to call the callable with
        :return: the return value of the call

        """
        return asyncio_extras.call_async(self.loop, func, *args, **kwargs)

    def call_in_executor(self, func: Callable, *args, executor: Union[Executor, str] = None,
                         **kwargs) -> Awaitable:
        """
        Call the given callable in an executor.

        :param func: the callable to call
        :param args: positional arguments to call the callable with
        :param executor: either an :class:`~concurrent.futures.Executor` instance, the resource
            name of one or ``None`` to use the event loop's default executor
        :param kwargs: keyword arguments to call the callable with
        :return: an awaitable that resolves to the return value of the call

        """
        assert check_argument_types()
        if isinstance(executor, str):
            executor = self.require_resource(Executor, executor)

        return asyncio_extras.call_in_executor(func, *args, executor=executor, **kwargs)

    def threadpool(self, executor: Union[Executor, str] = None):
        """
        Return an asynchronous context manager that runs the block in a (thread pool) executor.

        :param executor: either an :class:`~concurrent.futures.Executor` instance, the resource
            name of one or ``None`` to use the event loop's default executor
        :return: an asynchronous context manager

        """
        assert check_argument_types()
        if isinstance(executor, str):
            executor = self.require_resource(Executor, executor)

        return asyncio_extras.threadpool(executor)


def executor(arg: Union[Executor, str, Callable] = None):
    """
    Decorate a function so that it runs in an :class:`~concurrent.futures.Executor`.

    If a resource name is given, the first argument must be a :class:`~.Context`.

    Usage::

        @executor
        def should_run_in_executor():
            ...

    With a resource name::

        @executor('resourcename')
        def should_run_in_executor(ctx):
            ...

    :param arg: a callable to decorate, an :class:`~concurrent.futures.Executor` instance, the
        resource name of one or ``None`` to use the event loop's default executor
    :return: the wrapped function

    """
    def outer_wrapper(func: Callable):
        @wraps(func)
        def inner_wrapper(*args, **kwargs):
            try:
                ctx = next(arg for arg in args[:2] if isinstance(arg, Context))
            except StopIteration:
                raise RuntimeError('the first positional argument to {}() has to be a Context '
                                   'instance'.format(callable_name(func))) from None

            executor = ctx.require_resource(Executor, resource_name)
            return asyncio_extras.call_in_executor(func, *args, executor=executor, **kwargs)

        return inner_wrapper

    if isinstance(arg, str):
        resource_name = arg
        return outer_wrapper

    return asyncio_extras.threadpool(arg)


def context_teardown(func: Callable):
    """
    Wrap an async generator function to execute the rest of the function at context teardown.

    This function returns an async function, which, when called, starts the wrapped async
    generator. The wrapped async function is run until the first ``yield`` statement
    (``await async_generator.yield_()`` on Python 3.5). When the context is being torn down, the
    exception that ended the context, if any, is sent to the generator.

    For example::

        class SomeComponent(Component):
            @context_teardown
            async def start(self, ctx: Context):
                service = SomeService()
                ctx.add_resource(service)
                exception = yield
                service.stop()

    :param func: an async generator function
    :return: an async function

    """
    @wraps(func)
    async def wrapper(*args, **kwargs) -> None:
        async def teardown_callback(exception: Optional[Exception]):
            try:
                await generator.asend(exception)
            except StopAsyncIteration:
                pass
            finally:
                await generator.aclose()

        try:
            ctx = next(arg for arg in args[:2] if isinstance(arg, Context))
        except StopIteration:
            raise RuntimeError('the first positional argument to {}() has to be a Context '
                               'instance'.format(callable_name(func))) from None

        generator = func(*args, **kwargs)
        try:
            await generator.asend(None)
        except StopAsyncIteration:
            raise RuntimeError(
                '{} did not do "await yield_()"'.format(callable_name(func))) from None
        except BaseException:
            await generator.aclose()
            raise
        else:
            ctx.add_teardown_callback(teardown_callback, True)

    if iscoroutinefunction(func):
        func = async_generator(func)
    elif not isasyncgenfunction(func):
        raise TypeError('{} must be an async generator function'.format(callable_name(func)))

    return wrapper