"""Top-level package for Async Nexus."""
__author__ = """Alastair Irvine"""
__email__ = 'alastair@plug.org.au'
__version__ = '1.0.0'
from typing import Set, Dict, Sequence, Tuple, List, Union, AnyStr, Iterable, Callable, Generator, Type, TypeVar, Optional, TextIO, IO, Coroutine
import time
import asyncio
import abc
import itertools
from dataclasses import dataclass
import random
from enum import Enum
import weakref
from . import errors
# *** SPECIAL ***
# anext() compatibility function for Python 3.9 and prior
if not hasattr(__builtins__, 'anext'):
[docs]
async def anext(g):
return await g.__anext__()
# *** CLASSES ***
[docs]
@dataclass
class Event:
"""A simple event dataclass."""
id: int
type: Union[int, str]
# Lower integers represent higher priorities
priority: int
payload: str
[docs]
class NamedEvent(Event, metaclass=abc.ABCMeta):
"""
The qualified name of any given subclass is now used as its type.
Note that the parameter order is different to :class:`Event`.
"""
def __init__(self, id: int, payload: str, *, priority: int = 0):
super().__init__(id, type(self).__qualname__, priority, payload)
[docs]
class EventDispatcher:
"""Dispatches events to registered handlers."""
@abc.abstractmethod
async def _dispatch(self, event: Event) -> None:
pass
[docs]
class EventConsumer(metaclass=abc.ABCMeta):
"""
Optional parent class for classes that are registered as handlers with
:class:`AsyncEventNexus`. Given that any coroutine function/method with
the :meth:`handle` signature can also be a handler, this class has a
different name in order to avoid confusion.
Handlers must add secondary events (arising from the processing of events
they receive) to the queue, so the nexus can dispatch them.
"""
[docs]
@abc.abstractmethod
async def handle(self, event: Event, queue: asyncio.Queue) -> None:
"""
Handle an event.
:param queue: The caller's queue to which any secondary events should be sent.
"""
pass
# If a Callable it should actually be a coroutine function, or if a
# :class:`EventConsumer` is used its ``handle`` method be a coroutine function
GenericHandler = Union[Callable[[Event, asyncio.Queue], None], EventConsumer]
FilterFunc = Callable[[Event, asyncio.Queue], bool]
# This is needed to satisfy the type checker; see `help(typing.Type)`
NamedEventTypeVar = TypeVar('NamedEventTypeVar', bound=NamedEvent)
EventTypeID = Union[int, str, Type[NamedEventTypeVar]]
[docs]
class AsyncEventPriorityQueue(metaclass=abc.ABCMeta):
"""
Replacement for asyncio.Queue supporting multiple sub-queues that operate
on a priority basis.
"""
# FIXME: After implementing, change GenericHandler and signatures on
# EventConsumer, AsyncEventNexus and boundaries.AsyncEventBoundary methods
[docs]
def get_nowait(self) -> Event:
pass
[docs]
async def put(self, event: Event):
pass
[docs]
class AbstractNexus:
[docs]
@abc.abstractmethod
async def ingest(self, event: Event) -> None:
pass
[docs]
class EventFactory:
"""
Creates event IDs and/or whole Event objects, using convenience method(s).
:ivar current_event_id: The ID to be returned by the next call to next_id()
"""
AUTO = -1
RANDOM = -2
def __init__(self):
self.current_event_id = 1
[docs]
def next_id(self) -> int:
id = self.current_event_id
self.current_event_id += 1
return id
[docs]
def random_id(self) -> int:
return random.randint(1, 1000)
[docs]
def create_event(self, payload, *, event_type: EventTypeID, id: int = AUTO, priority: int = 0) -> Event:
"""
Convenient wrapper around :class:`Event` constructor. Also handles
NamedEvent subclasses.
Hint: for integer event types, use non-overlapping :class:`enum.IntEnum`
subclasses. Then when creating log messages etc., you can cast the
event event_type back to the relevant subclass and use the ``.name``
attribute of the resulting object. To determine the event_type category
(group of types represented by a given subclass), attempt to cast the
event_type to the first subclass and if you catch :class:`ValueError` try the
next subclass, and so on.
:param payload: A generic payload to include in the event
:param event_type: Event type or class (subclass of :class:`NamedEvent`); keyword only parameter
:param id: Numeric event ID, or a sequential event ID if ``AUTO``, or a random one if ``RANDOM``
:param priority: Optional priority, with lower integers representing higher priorities
"""
# Make a lambda that will either create an object of the specified
# NamedEvent subclass or a basic Event object if ``event_type`` is a
# simple object
if isinstance(event_type, type):
if issubclass(event_type, NamedEvent):
_create_event = lambda id: event_type(id=id, payload=payload, priority=priority)
else:
raise TypeError("Invalid type for event object class: " + event_type.__name__)
else:
# event_type is a scalar
_create_event = lambda id: Event(id, event_type, priority, payload)
if id == self.AUTO:
return _create_event(self.next_id())
elif id == self.RANDOM:
return _create_event(self.random_id())
else:
return _create_event(id)
[docs]
class EventSource(metaclass=abc.ABCMeta):
"""Abstract base class for anything that creates events. Do not subclass."""
[docs]
async def start(self) -> None:
"""
Any subclass's :meth:`start` coroutine (if any) must call
``await super().start()``.
"""
pass
[docs]
class SimpleEventConverter(EventSource):
"""Event source used in pull mode. Blocks until an event is ready."""
[docs]
@abc.abstractmethod
async def obtain_event(self) -> Event:
pass
[docs]
class EventConverter(SimpleEventConverter):
"""
Event source used in pull mode that uses an internal generator to produce a
stream of events. Subclass this (instead of :class:`SimpleEventConverter`)
when more complicated processing is required, e.g. iterating over sequences.
"""
def __init__(self):
self.gen: AsyncGenerator = self.event_generator()
[docs]
@abc.abstractmethod
async def event_generator(self):
yield None
[docs]
def close(self):
"""Stop the parent and the generator."""
super().close()
return aclose(self.gen)
[docs]
async def obtain_event(self) -> Event:
"""
Gets one event from the internal generator.
:raises errors.NoMoreEvents: If it is not possible to get an event
"""
try:
return await anext(self.gen)
except StopAsyncIteration:
raise errors.NoMoreEvents
[docs]
class EventProducer(EventSource, metaclass=abc.ABCMeta):
"""
Event source used in push mode. Sends each event to one or more nexus
objects. Unlike :class:`SimpleEventConverter`, :class:`EventProducer`
emits events whenever they are ready, without being asked. Each nexus will
await :meth:`start` and call :meth:`EventSource.close`.
It's up to an object of each subclass to manage its own flow of control.
They shouldn't do anything other than allocate resources until
:meth:`start` is awaited.
Uses weak references to stop circular references from preventing garbage collection.
:ivar nexus_list: Maintains the list of AsyncEventNexus objects to send to
:type nexus_list: List[AsyncEventNexus]
:ivar event_factory: Optional object that subclasses may use to create :class:`Event` objects
:type event_factory: Optional[EventFactory]
"""
def __init__(self, event_factory: Optional[EventFactory] = None):
"""
:param event_factory: Optional object that subclasses may use to create :class:`Event` objects
"""
self.nexus_set: Set[AbstractNexus] = weakref.WeakSet()
if event_factory:
self.event_factory = weakref.proxy(event_factory)
else:
self.event_factory = None
[docs]
def register_nexus(self, nexus: AbstractNexus) -> None:
"""
Mandatory method that must be called for each nexus that this producer
is to be associated with.
"""
self.nexus_set.add(nexus)
[docs]
async def start(self) -> None:
"""
Optionally, kicks off any actions the producer needs to do in order to
start producing events. Subclasses must call ``await super().start()``.
"""
if not self.nexus_set:
raise errors.MisconfiguredEventProducer("No nexus objects registered")
[docs]
async def distribute_event(self, event: Event) -> None:
"""Process the event through all nexuses."""
for nexus in self.nexus_set:
await nexus.ingest(event)
[docs]
class Timer(EventProducer):
"""
Represents one of several types of integer ticker, or a one-shot, timer
that can be started/stopped and emits an event each time it fires.
Timer intervals might be longer than specified if other tasks block.
Don't use this class for anything other than creating events.
:ivar interval: How long (in seconds) the timer should run before emitting an event
:ivar type: What type of timer is being created (one of the below values)
:ivar starting_value: The value to start with
:ivar ending_value: The value to count up to or down from
:ivar direction_value: The value added each iteration
:ivar task: asyncio.Task
:ivar event_type: Event type or class (subclass of :class:`NamedEvent`) to be used when an event is created
:ivar event_factory: Optional object used to create :class:`Event` objects
"""
# Timeer type values
COUNT_UP = 1
COUNT_DOWN = 2
ONGOING = 3 # Like COUNT_UP but repeats forever
ONE_SHOT = 4 # Equivalent to COUNT_UP with count=1
def __init__(self, interval: float, *, event_type: EventTypeID, type: int = ONE_SHOT, count: int = 0, event_factory: Optional[EventFactory] = None):
"""
:param event_factory: Required unless a subclass overrides :method:`timer_fired` to create events
"""
super().__init__(event_factory)
if type == self.COUNT_DOWN:
if count <= 0:
raise ValueError("Countdown value invalid")
self.starting_value = count
self.ending_value = 0
self.direction_value = -1
elif type == self.COUNT_UP:
if count <= 0:
raise ValueError("Countup value invalid")
self.starting_value = 0
self.ending_value = count
self.direction_value = 1
elif type == self.ONGOING:
if count != 0:
raise ValueError("Counter value supplied when irrelevant")
self.starting_value = 0
self.ending_value = -1
self.direction_value = 1
elif type == self.ONE_SHOT:
if not 0 <= count <= 1:
raise ValueError("Oneshot value invalid")
self.starting_value = 0
self.ending_value = 1
self.direction_value = 1
else:
raise ValueError("Invalid timer type " + str(type))
self.task = None
self.type = type
self.event_type = event_type
self.interval = interval
[docs]
async def start(self) -> asyncio.Task:
"""
Kicks off actions the producer needs to do in order to
start producing events. Subclasses must call ``await super().start()``.
"""
await super().start()
if self.task:
raise errors.MultipleStart("Timer already started")
self.task = asyncio.create_task(self._loop())
await asyncio.sleep(0) # Give the task a chance to start
return self.task
[docs]
def stop(self) -> bool:
"""
Cancel the timer's task.
:returns: ``True`` if the timer task was cancelled (or never run) or ``False`` if it had already run
"""
if not self.task:
return True
else:
return_value: bool = not (self.task.done() and not self.task.cancelled())
self.task.cancel()
self.task = None
return return_value
async def _loop(self):
"""Emit an event after each timed interval."""
value = self.starting_value
while value != self.ending_value:
await asyncio.sleep(self.interval)
value += self.direction_value
# Timer has fired
event = self.create_event(value)
if event:
await self.distribute_event(event)
# Won't return if self.type == ONGOING
## print(str(self.task) + " done.")
self.task = None
[docs]
def create_event(self, value: int) -> Optional[Event]:
"""
Called each time the timer fires.
Don't call ``super().create_event()`` if overriding.
:param value: The current count down or count up value
:returns: An event (if one is to be emitted on this cycle)
"""
return self.event_factory.create_event(value, event_type=self.event_type)
[docs]
class AsyncEventNexus(EventDispatcher, AbstractNexus, EventFactory):
"""
Distributes events to filters (see alias :class:`FilterFunc`) and/or
handlers (see alias :class:`GenericHandler`) which must also accept a second
argument, being the queue into which any secondary events are added.
Can act as a context manager (non-async), which calls :meth:`cleanup`.
Either :meth:`loop_forever` must be awaited or :meth:`start` called (in
which case it runs the event loop in a separate task).
If :meth:`stop` is called or a task (external or internal) running
:meth:`loop_forever` is cancelled, or an error occurs, :meth:`cleanup` must
then be run unless in a ``with`` block.
AsyncEventNexus objects can be chained, i.e. one nexus
can indirectly be registered as a handler with another; see
:class:`boundaries.AsyncEventBoundary` (this is an experimental feature).
:ivar converters: Sequence of objects with async obtain_event() methods
:ivar producers: Sequence of objects with register_nexus() methods
:ivar handlers: Sequence of :class:`GenericHandler` objects/callables
:ivar filters: Sequence of :class:`FilterFunc` objects
"""
QUEUE_MAXLEN = 50
States = Enum('States', "READY STARTING LOOPING STOPPED")
def __init__(self, multiple: bool = False, bitmode: bool = False):
"""
:param multiple: Allow multiple handlers per event type
:param bitmode: if True, means that event types can only be powers of 2 and handlers can be associated with a bitmask
"""
super().__init__()
if bitmode:
raise NotImplementedError
# A map of predicates that can choose to accept an event or pass it on
# (and if none accept it it will be given to the handler(s))
self.filters: List[FilterFunc] = []
# Either a mapping of each event type (or -1 for any) to a handler, OR
# if self.multiple is True, a mapping of event type / -1 to a list of handlers.
self.handlers: Union[Dict[Union[int, str], GenericHandler], Dict[Union[int, str], List[GenericHandler]]] = {}
self.queue = asyncio.Queue(maxsize=self.QUEUE_MAXLEN)
self.multiple = multiple
self.converters = []
self.producers = []
self.state = self.States.READY
self.loop_task: Optional[asyncio.Task] = None
[docs]
def add_handler(self, type: EventTypeID, handler: GenericHandler):
"""
Add a conditional event handler, where the type must match.
A handler is a coroutine function/method with the same signature as
:class:`EventConsumer.handle` or an object with an equivalent method.
:param type: The type of event to handle, or -1 for events with no dedicated handler
:param handler: If it's a Callable, it will be called or if it's a Coroutine it will be awaited
"""
if self.multiple:
if type in self.handlers:
self.handlers[type].append(handler)
else:
self.handlers[type] = [handler]
else:
if type not in self.handlers:
self.handlers[type] = handler
else:
raise LookupError("Handler for type %s already present" % type)
[docs]
def add_filter(self, handler: FilterFunc):
"""
Add a filter, which is passed all events and returns ``True`` if a given
event is considered consumed, i.e. no further processing by
filters/handlers is required.
:param handler: A Callable to be awaited when an event is received
"""
self.filters.append(handler)
[docs]
def add_converter(self, converter: SimpleEventConverter) -> None:
"""
Add an Event source used in pull mode. Its :meth:`EventSource.start`
coroutine is awaited by the loop, then the nexus continually awaits its
:meth:`SimpleEventConverter.obtain_event` coroutine in the background
with all the others.
"""
self.converters.append(converter)
[docs]
def add_producer(self, producer: EventProducer) -> None:
"""
Add an Event source used in push mode. Its :meth:`EventProducer.start`
coroutine is awaited by the loop, then the nexus takes no further
action, as the producer is responsible for calling :meth:`ingest`.
"""
self.producers.append(producer)
producer.register_nexus(self)
[docs]
def register(self, item: EventSource):
"""
Add an Event source used in push or pull mode.
Calls :meth:`add_converter` or :meth:`add_producer` as appropriate.
"""
if isinstance(item, SimpleEventConverter):
self.add_converter(item)
else:
# Assume it has a start() method, which is the only requirement
self.add_producer(item)
[docs]
async def ingest(self, event: Event) -> None:
"""
Handle an event, with queueing.
All events (including secondary) are sequentially handled, i.e. not as
tasks. Any background tasks should be created as such by handlers.
"""
if self.state not in (self.States.STARTING, self.States.LOOPING):
raise errors.BadCall("AsyncEventNexus event loop not running")
await self.queue.put(event)
# TODO: Wrap the dispatch loop in a critical section and skip it if
# another :meth:`ingest` invocation is in progress on this object.
# This preserves sequential queue processing.
# Process all events, including those generated by handlers
while True:
# Deal with race condition where another coroutine removed the last
# item after the loop condition check by ignoring the exception
try:
event = self.queue.get_nowait()
await self._dispatch(event)
except asyncio.QueueEmpty:
# TODO: release critical section here
break
# This has to be async so handler coroutines can add secondary events to the queue
async def _dispatch(self, event: Event) -> None:
"""Process the event through the filters and handlers.
:raises errors.UnhandledEvent: If no match for the event is found.
"""
try:
if self.multiple:
# This would need handlers to be of type FilterFunc
## for handler in self.handlers[event.type]:
## handled = await handler(event, self.queue)
## if handled:
## break
## else:
## # It might not be a good idea to fall through to the generic handler list
## for handler in self.handlers[-1]:
## handled: bool = await handler(event, self.queue)
## if handled:
## break
## else:
## raise errors.UnhandledEvent("Generic handlers all refused event")
raise NotImplementedError
else:
# Try filters until one accepts the event and if so, stop processing
for handler in self.filters:
handled: bool = await create_handler_coro(handler, event, self.queue)
if handled:
break
else:
try:
# Otherwise, check for a type-specific handler
handler = self.handlers[event.type]
except KeyError:
# Failing that, check for a generic handler
handler = self.handlers[-1]
await create_handler_coro(handler, event, self.queue)
except KeyError:
raise errors.UnhandledEvent("No available event handler for event with ID=%d and type=%s" % (event.id, event.type), event)
[docs]
async def loop_forever(self) -> None:
"""
The main Async Nexus loop. Starts producers, then loops forever
consuming and distributing events from converters. In parallel to
this, any registered producers will feed events into the queue
unprompted.
Any :class:`SimpleEventConverter` object that raises
:class:`errors.NoMoreEvents` will be removed.
"""
if self.state in (self.States.STARTING, self.States.LOOPING):
raise errors.LoopAlreadyStarted("Spurious call to loop_forever()")
elif self.state is self.States.STOPPED:
raise errors.LoopStopped("No longer looping (spurious call to loop_forever())")
elif self.state is self.States.READY:
self.state = self.States.STARTING
# Start all event source objects
await asyncio.gather(*(source.start() for source in itertools.chain(self.producers, self.converters)))
# No need for self.States.STARTED because it transitions to
# self.States.LOOPING immediately
# TO-DO: task cancellation with event arising
self.state = self.States.LOOPING
async def replace_future(task_to_converter_mapping: Dict[asyncio.Task, SimpleEventConverter],
done_future: asyncio.Task) -> None:
"""Replace a task/future that's ready and reuse the rest."""
converter = task_to_converter_mapping[done_future]
del task_to_converter_mapping[done_future]
new_future = asyncio.create_task(converter.obtain_event())
task_to_converter_mapping[new_future] = converter
# This is not really an event loop, because events can be ingested
# and dispatched before this. Converters won't be queried without it
# though.
task_to_converter_mapping: Dict[asyncio.Task, SimpleEventConverter] = {asyncio.create_task(converter.obtain_event()): converter for converter in self.converters}
try:
while True:
converter_futures: List[asyncio.Task] = task_to_converter_mapping.keys()
d, p = await asyncio.wait(converter_futures, return_when=asyncio.FIRST_COMPLETED)
for done_future in d:
try:
# Task::result() might raise
await self.ingest(done_future.result())
except errors.NoMoreEvents:
# Remove references to the converter from the dict and the list
removed_converter_index = self.converters.index(task_to_converter_mapping[done_future])
del self.converters[removed_converter_index]
del task_to_converter_mapping[done_future]
else:
await replace_future(task_to_converter_mapping, done_future)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
self.state = self.States.STOPPED
[docs]
def start(self) -> asyncio.Task:
"""
Start the event loop in the background.
:returns: the task to be optionally awaited (in case it returns due to error or cancellation)
"""
if self.state is not self.States.READY:
raise errors.InvalidLoopState("start() called when state is " + self.state.name)
if not self.loop_task:
self.loop_task = asyncio.create_task(self.loop_forever())
return self.loop_task
else:
raise errors.BadCall("AsyncEventNexus loop task already running")
[docs]
def stop(self):
"""
Stop the task running the loop. cleanup() must then be run unless in a ``with`` block.
"""
self.loop_task.cancel()
[docs]
def cleanup(self):
"""Destroy all producers and converters, then forget all handlers and filters."""
while self.producers:
producer = self.producers.pop()
try:
producer.close()
except Exception:
pass
while self.converters:
converter = self.converters.pop()
try:
converter.close()
except Exception:
pass
self.handlers.clear()
self.filters.clear()
def __enter__(self):
if self.state is not self.States.READY or any((self.producers, self.converters)):
raise errors.BadCall("ContextManager entered for non-pristine nexus")
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
return False # ensure the exception, if any, is re-raised
[docs]
class EventFanout(EventConsumer):
"""
Event consumer that sends each event to every registered handler.
A handler is a coroutine function/method with the same signature as
:meth:`handle` or an object with an equivalent method.
"""
def __init__(self):
self.handlers: Set[GenericHandler] = set()
[docs]
def register(self, handler: GenericHandler):
"""Add an unconditional event handler."""
self.handlers.add(handler)
[docs]
def deregister(self, handler: GenericHandler):
"""Remove an unconditional event handler."""
self.handlers.remove(handler)
[docs]
async def handle(self, event: Event, queue: asyncio.Queue) -> None:
"""Process the event through all handlers."""
if not self.handlers:
raise errors.MisconfiguredEventConsumer("No handler objects registered")
await asyncio.gather(*(create_handler_coro(handler, event, queue) for handler in self.handlers))
[docs]
class EventDiscarder(EventConsumer):
"""
Event consumer that discards each event.
"""
[docs]
async def handle(self, event: Event, queue: asyncio.Queue) -> None:
pass
# *** FUNCTIONS ***
[docs]
def create_handler_coro(handler: Union[GenericHandler, FilterFunc], event: Event, queue: asyncio.Queue) -> Coroutine:
"""
Supports calling dynamic handlers that are either a coroutine
function/method, or a :class:`EventConsumer` object. Note that this
doesn't actually call the handler.
"""
if callable(handler):
return handler(event, queue)
else:
return handler.handle(event, queue)