async_nexus package¶
Submodules¶
async_nexus.boundaries module¶
Classes for managing the interaction between event domains, each of which
contains an async_nexus.AsyncEventNexus and its associated
async_nexus.Event sources and handlers.
- class async_nexus.boundaries.AsyncEventBoundary[source]¶
Bases:
EventProducer,EventConsumerForwards events between event domains. This is done by hooking into two different nexuses.
Acts as both a handler registered with
async_nexus.AsyncEventNexus(nexus1.add_handler(b)) in one event domain and a producer (nexus2.add_producer(b)) in another.TODO: Send reply events back to the queue of the sending nexus, so it can dispatch them.
- QUEUE_MAXLEN = 50¶
- async handle(event: Event, queue: Queue) None[source]¶
Handle an event.
- Parameters:
queue – The caller’s queue to which any secondary events should be sent.
- async start()[source]¶
Start
transfer_events()in the background.- Return asyncio.Task:
the task to be optionally awaited (in case it returns due to error or cancellation)
async_nexus.errors module¶
async_nexus.zmq module¶
Wrapper/helper classes and functions for interfacing with 0mq (ZeroMQ).
- class async_nexus.zmq.ZmqAbstractConverter(socket: Socket)[source]¶
Bases:
SimpleEventConverter
- class async_nexus.zmq.ZmqEventConverter(socket: Socket, *, event_type: int | str | Type[NamedEventTypeVar], event_factory: EventFactory | None)[source]¶
Bases:
ZmqAbstractConverterEvent source used in pull mode. Blocks until a string has been received, then converts it to a
async_nexus.Eventof a specified type.Can be used as a context manager (non-async), which closes the socket on exit.
- Variables:
event_type – Event type or class (subclass of
async_nexus.NamedEvent) to be used when an event is createdevent_factory – Object used to create
async_nexus.Eventobjects
- class async_nexus.zmq.ZmqEventReceiver(socket: Socket)[source]¶
Bases:
ZmqAbstractConverterEvent source used in pull mode. Blocks until a JSON string representing a full
async_nexus.Eventhas been received, including ID, then converts it.Can be used as a context manager (non-async), which closes the socket on exit.
- class async_nexus.zmq.ZmqEventSender(socket: Socket)[source]¶
Bases:
EventConsumerSend events over a ZMQ socket as JSON.
Registered as a handler with
AsyncEventNexus.- async handle(event: Event, queue: Queue) None[source]¶
Handle an
async_nexus.Event.- Parameters:
queue – The caller’s queue to which any secondary events should be sent.
Module contents¶
Top-level package for Async Nexus.
- class async_nexus.AsyncEventNexus(multiple: bool = False, bitmode: bool = False)[source]¶
Bases:
EventDispatcher,AbstractNexus,EventFactoryDistributes events to filters (see alias
FilterFunc) and/or handlers (see aliasGenericHandler) which must also accept a second argument, being the queue into which any secondary events are added.Can act as a context manager (non-async), which calls
cleanup().Either
loop_forever()must be awaited orstart()called (in which case it runs the event loop in a separate task).If
stop()is called or a task (external or internal) runningloop_forever()is cancelled, or an error occurs,cleanup()must then be run unless in awithblock.AsyncEventNexus objects can be chained, i.e. one nexus can indirectly be registered as a handler with another; see
boundaries.AsyncEventBoundary(this is an experimental feature).- Variables:
converters – Sequence of objects with async obtain_event() methods
producers – Sequence of objects with register_nexus() methods
handlers – Sequence of
GenericHandlerobjects/callablesfilters – Sequence of
FilterFuncobjects
- QUEUE_MAXLEN = 50¶
- add_converter(converter: SimpleEventConverter) None[source]¶
Add an Event source used in pull mode. Its
EventSource.start()coroutine is awaited by the loop, then the nexus continually awaits itsSimpleEventConverter.obtain_event()coroutine in the background with all the others.
- add_filter(handler: Callable[[Event, Queue], bool])[source]¶
Add a filter, which is passed all events and returns
Trueif a given event is considered consumed, i.e. no further processing by filters/handlers is required.- Parameters:
handler – A Callable to be awaited when an event is received
- add_handler(type: int | str | Type[NamedEventTypeVar], handler: Callable[[Event, Queue], None] | EventConsumer)[source]¶
Add a conditional event handler, where the type must match.
A handler is a coroutine function/method with the same signature as
EventConsumer.handleor an object with an equivalent method.- Parameters:
type – The type of event to handle, or -1 for events with no dedicated handler
handler – If it’s a Callable, it will be called or if it’s a Coroutine it will be awaited
- add_producer(producer: EventProducer) None[source]¶
Add an Event source used in push mode. Its
EventProducer.start()coroutine is awaited by the loop, then the nexus takes no further action, as the producer is responsible for callingingest().
- async ingest(event: Event) None[source]¶
Handle an event, with queueing.
All events (including secondary) are sequentially handled, i.e. not as tasks. Any background tasks should be created as such by handlers.
- async loop_forever() None[source]¶
The main Async Nexus loop. Starts producers, then loops forever consuming and distributing events from converters. In parallel to this, any registered producers will feed events into the queue unprompted.
Any
SimpleEventConverterobject that raiseserrors.NoMoreEventswill be removed.
- register(item: EventSource)[source]¶
Add an Event source used in push or pull mode.
Calls
add_converter()oradd_producer()as appropriate.
- class async_nexus.AsyncEventPriorityQueue[source]¶
Bases:
objectReplacement for asyncio.Queue supporting multiple sub-queues that operate on a priority basis.
- class async_nexus.Event(id: int, type: int | str, priority: int, payload: str)[source]¶
Bases:
objectA simple event dataclass.
- id: int¶
- payload: str¶
- priority: int¶
- type: int | str¶
- class async_nexus.EventConsumer[source]¶
Bases:
objectOptional parent class for classes that are registered as handlers with
AsyncEventNexus. Given that any coroutine function/method with thehandle()signature can also be a handler, this class has a different name in order to avoid confusion.Handlers must add secondary events (arising from the processing of events they receive) to the queue, so the nexus can dispatch them.
- class async_nexus.EventConverter[source]¶
Bases:
SimpleEventConverterEvent source used in pull mode that uses an internal generator to produce a stream of events. Subclass this (instead of
SimpleEventConverter) when more complicated processing is required, e.g. iterating over sequences.- async obtain_event() Event[source]¶
Gets one event from the internal generator.
- Raises:
errors.NoMoreEvents – If it is not possible to get an event
- class async_nexus.EventDiscarder[source]¶
Bases:
EventConsumerEvent consumer that discards each event.
- class async_nexus.EventFactory[source]¶
Bases:
objectCreates event IDs and/or whole Event objects, using convenience method(s).
- Variables:
current_event_id – The ID to be returned by the next call to next_id()
- AUTO = -1¶
- RANDOM = -2¶
- create_event(payload, *, event_type: int | str | Type[NamedEventTypeVar], id: int = -1, priority: int = 0) Event[source]¶
Convenient wrapper around
Eventconstructor. Also handles NamedEvent subclasses.Hint: for integer event types, use non-overlapping
enum.IntEnumsubclasses. Then when creating log messages etc., you can cast the event event_type back to the relevant subclass and use the.nameattribute of the resulting object. To determine the event_type category (group of types represented by a given subclass), attempt to cast the event_type to the first subclass and if you catchValueErrortry the next subclass, and so on.- Parameters:
payload – A generic payload to include in the event
event_type – Event type or class (subclass of
NamedEvent); keyword only parameterid – Numeric event ID, or a sequential event ID if
AUTO, or a random one ifRANDOMpriority – Optional priority, with lower integers representing higher priorities
- class async_nexus.EventFanout[source]¶
Bases:
EventConsumerEvent consumer that sends each event to every registered handler.
A handler is a coroutine function/method with the same signature as
handle()or an object with an equivalent method.- deregister(handler: Callable[[Event, Queue], None] | EventConsumer)[source]¶
Remove an unconditional event handler.
- register(handler: Callable[[Event, Queue], None] | EventConsumer)[source]¶
Add an unconditional event handler.
- class async_nexus.EventProducer(event_factory: EventFactory | None = None)[source]¶
Bases:
EventSourceEvent source used in push mode. Sends each event to one or more nexus objects. Unlike
SimpleEventConverter,EventProduceremits events whenever they are ready, without being asked. Each nexus will awaitstart()and callEventSource.close().It’s up to an object of each subclass to manage its own flow of control. They shouldn’t do anything other than allocate resources until
start()is awaited.Uses weak references to stop circular references from preventing garbage collection.
- Variables:
nexus_list – Maintains the list of AsyncEventNexus objects to send to :type nexus_list: List[AsyncEventNexus]
event_factory – Optional object that subclasses may use to create
Eventobjects :type event_factory: Optional[EventFactory]
- register_nexus(nexus: AbstractNexus) None[source]¶
Mandatory method that must be called for each nexus that this producer is to be associated with.
- class async_nexus.EventSource[source]¶
Bases:
objectAbstract base class for anything that creates events. Do not subclass.
- class async_nexus.NamedEvent(id: int, payload: str, *, priority: int = 0)[source]¶
Bases:
EventThe qualified name of any given subclass is now used as its type.
Note that the parameter order is different to
Event.
- class async_nexus.SimpleEventConverter[source]¶
Bases:
EventSourceEvent source used in pull mode. Blocks until an event is ready.
- class async_nexus.Timer(interval: float, *, event_type: int | str | Type[NamedEventTypeVar], type: int = 4, count: int = 0, event_factory: EventFactory | None = None)[source]¶
Bases:
EventProducerRepresents one of several types of integer ticker, or a one-shot, timer that can be started/stopped and emits an event each time it fires.
Timer intervals might be longer than specified if other tasks block.
Don’t use this class for anything other than creating events.
- Variables:
interval – How long (in seconds) the timer should run before emitting an event
type – What type of timer is being created (one of the below values)
starting_value – The value to start with
ending_value – The value to count up to or down from
direction_value – The value added each iteration
task – asyncio.Task
event_type – Event type or class (subclass of
NamedEvent) to be used when an event is createdevent_factory – Optional object used to create
Eventobjects
- COUNT_DOWN = 2¶
- COUNT_UP = 1¶
- ONE_SHOT = 4¶
- ONGOING = 3¶
- create_event(value: int) Event | None[source]¶
Called each time the timer fires.
Don’t call
super().create_event()if overriding.- Parameters:
value – The current count down or count up value
- Returns:
An event (if one is to be emitted on this cycle)
- async_nexus.create_handler_coro(handler: Callable[[Event, Queue], None] | EventConsumer | Callable[[Event, Queue], bool], event: Event, queue: Queue) Coroutine[source]¶
Supports calling dynamic handlers that are either a coroutine function/method, or a
EventConsumerobject. Note that this doesn’t actually call the handler.