async_nexus package

Submodules

async_nexus.boundaries module

Classes for managing the interaction between event domains, each of which contains an async_nexus.AsyncEventNexus and its associated async_nexus.Event sources and handlers.

class async_nexus.boundaries.AsyncEventBoundary[source]

Bases: EventProducer, EventConsumer

Forwards events between event domains. This is done by hooking into two different nexuses.

Acts as both a handler registered with async_nexus.AsyncEventNexus (nexus1.add_handler(b)) in one event domain and a producer (nexus2.add_producer(b)) in another.

TODO: Send reply events back to the queue of the sending nexus, so it can dispatch them.

QUEUE_MAXLEN = 50
close()[source]
async handle(event: Event, queue: Queue) None[source]

Handle an event.

Parameters:

queue – The caller’s queue to which any secondary events should be sent.

async start()[source]

Start transfer_events() in the background.

Return asyncio.Task:

the task to be optionally awaited (in case it returns due to error or cancellation)

async transfer_events()[source]

Continuously feed events from the queue to registered nexuses.

async_nexus.errors module

exception async_nexus.errors.BadCall[source]

Bases: Exception

exception async_nexus.errors.InvalidLoopState[source]

Bases: BadCall

exception async_nexus.errors.LoopAlreadyStarted[source]

Bases: BadCall

exception async_nexus.errors.LoopStopped[source]

Bases: BadCall

exception async_nexus.errors.MisconfiguredEventConsumer[source]

Bases: BadCall

exception async_nexus.errors.MisconfiguredEventProducer[source]

Bases: BadCall

exception async_nexus.errors.MultipleStart[source]

Bases: Exception

exception async_nexus.errors.NoMoreEvents[source]

Bases: RuntimeError

exception async_nexus.errors.UnhandledEvent[source]

Bases: Exception

async_nexus.zmq module

Wrapper/helper classes and functions for interfacing with 0mq (ZeroMQ).

class async_nexus.zmq.ZmqAbstractConverter(socket: Socket)[source]

Bases: SimpleEventConverter

class async_nexus.zmq.ZmqEventConverter(socket: Socket, *, event_type: int | str | Type[NamedEventTypeVar], event_factory: EventFactory | None)[source]

Bases: ZmqAbstractConverter

Event source used in pull mode. Blocks until a string has been received, then converts it to a async_nexus.Event of a specified type.

Can be used as a context manager (non-async), which closes the socket on exit.

Variables:
convert_to_event(s: str)[source]

Make an event with the supplied string as its payload.

Override if not using self.event_factory

async obtain_event() Event[source]

Get a string from the socket then convert it to an event.

Override if needing to use multipart events (for example) or modify the string before conversion.

class async_nexus.zmq.ZmqEventReceiver(socket: Socket)[source]

Bases: ZmqAbstractConverter

Event source used in pull mode. Blocks until a JSON string representing a full async_nexus.Event has been received, including ID, then converts it.

Can be used as a context manager (non-async), which closes the socket on exit.

async obtain_event() Event[source]

Get a JSON string from the socket then convert it to an event.

class async_nexus.zmq.ZmqEventSender(socket: Socket)[source]

Bases: EventConsumer

Send events over a ZMQ socket as JSON.

Registered as a handler with AsyncEventNexus.

async handle(event: Event, queue: Queue) None[source]

Handle an async_nexus.Event.

Parameters:

queue – The caller’s queue to which any secondary events should be sent.

async_nexus.zmq.send_event(event: Event, socket: Socket)[source]

Send an event over a ZMQ socket as JSON.

Module contents

Top-level package for Async Nexus.

class async_nexus.AbstractNexus[source]

Bases: object

abstractmethod async ingest(event: Event) None[source]
class async_nexus.AsyncEventNexus(multiple: bool = False, bitmode: bool = False)[source]

Bases: EventDispatcher, AbstractNexus, EventFactory

Distributes events to filters (see alias FilterFunc) and/or handlers (see alias GenericHandler) which must also accept a second argument, being the queue into which any secondary events are added.

Can act as a context manager (non-async), which calls cleanup().

Either loop_forever() must be awaited or start() called (in which case it runs the event loop in a separate task).

If stop() is called or a task (external or internal) running loop_forever() is cancelled, or an error occurs, cleanup() must then be run unless in a with block.

AsyncEventNexus objects can be chained, i.e. one nexus can indirectly be registered as a handler with another; see boundaries.AsyncEventBoundary (this is an experimental feature).

Variables:
  • converters – Sequence of objects with async obtain_event() methods

  • producers – Sequence of objects with register_nexus() methods

  • handlers – Sequence of GenericHandler objects/callables

  • filters – Sequence of FilterFunc objects

QUEUE_MAXLEN = 50
class States(*values)

Bases: Enum

LOOPING = 3
READY = 1
STARTING = 2
STOPPED = 4
add_converter(converter: SimpleEventConverter) None[source]

Add an Event source used in pull mode. Its EventSource.start() coroutine is awaited by the loop, then the nexus continually awaits its SimpleEventConverter.obtain_event() coroutine in the background with all the others.

add_filter(handler: Callable[[Event, Queue], bool])[source]

Add a filter, which is passed all events and returns True if a given event is considered consumed, i.e. no further processing by filters/handlers is required.

Parameters:

handler – A Callable to be awaited when an event is received

add_handler(type: int | str | Type[NamedEventTypeVar], handler: Callable[[Event, Queue], None] | EventConsumer)[source]

Add a conditional event handler, where the type must match.

A handler is a coroutine function/method with the same signature as EventConsumer.handle or an object with an equivalent method.

Parameters:
  • type – The type of event to handle, or -1 for events with no dedicated handler

  • handler – If it’s a Callable, it will be called or if it’s a Coroutine it will be awaited

add_producer(producer: EventProducer) None[source]

Add an Event source used in push mode. Its EventProducer.start() coroutine is awaited by the loop, then the nexus takes no further action, as the producer is responsible for calling ingest().

cleanup()[source]

Destroy all producers and converters, then forget all handlers and filters.

async ingest(event: Event) None[source]

Handle an event, with queueing.

All events (including secondary) are sequentially handled, i.e. not as tasks. Any background tasks should be created as such by handlers.

async loop_forever() None[source]

The main Async Nexus loop. Starts producers, then loops forever consuming and distributing events from converters. In parallel to this, any registered producers will feed events into the queue unprompted.

Any SimpleEventConverter object that raises errors.NoMoreEvents will be removed.

register(item: EventSource)[source]

Add an Event source used in push or pull mode.

Calls add_converter() or add_producer() as appropriate.

start() Task[source]

Start the event loop in the background.

Returns:

the task to be optionally awaited (in case it returns due to error or cancellation)

stop()[source]

Stop the task running the loop. cleanup() must then be run unless in a with block.

class async_nexus.AsyncEventPriorityQueue[source]

Bases: object

Replacement for asyncio.Queue supporting multiple sub-queues that operate on a priority basis.

get_nowait() Event[source]
async put(event: Event)[source]
class async_nexus.Event(id: int, type: int | str, priority: int, payload: str)[source]

Bases: object

A simple event dataclass.

id: int
payload: str
priority: int
type: int | str
class async_nexus.EventConsumer[source]

Bases: object

Optional parent class for classes that are registered as handlers with AsyncEventNexus. Given that any coroutine function/method with the handle() signature can also be a handler, this class has a different name in order to avoid confusion.

Handlers must add secondary events (arising from the processing of events they receive) to the queue, so the nexus can dispatch them.

abstractmethod async handle(event: Event, queue: Queue) None[source]

Handle an event.

Parameters:

queue – The caller’s queue to which any secondary events should be sent.

class async_nexus.EventConverter[source]

Bases: SimpleEventConverter

Event source used in pull mode that uses an internal generator to produce a stream of events. Subclass this (instead of SimpleEventConverter) when more complicated processing is required, e.g. iterating over sequences.

close()[source]

Stop the parent and the generator.

abstractmethod async event_generator()[source]
async obtain_event() Event[source]

Gets one event from the internal generator.

Raises:

errors.NoMoreEvents – If it is not possible to get an event

class async_nexus.EventDiscarder[source]

Bases: EventConsumer

Event consumer that discards each event.

async handle(event: Event, queue: Queue) None[source]

Handle an event.

Parameters:

queue – The caller’s queue to which any secondary events should be sent.

class async_nexus.EventDispatcher[source]

Bases: object

Dispatches events to registered handlers.

class async_nexus.EventFactory[source]

Bases: object

Creates event IDs and/or whole Event objects, using convenience method(s).

Variables:

current_event_id – The ID to be returned by the next call to next_id()

AUTO = -1
RANDOM = -2
create_event(payload, *, event_type: int | str | Type[NamedEventTypeVar], id: int = -1, priority: int = 0) Event[source]

Convenient wrapper around Event constructor. Also handles NamedEvent subclasses.

Hint: for integer event types, use non-overlapping enum.IntEnum subclasses. Then when creating log messages etc., you can cast the event event_type back to the relevant subclass and use the .name attribute of the resulting object. To determine the event_type category (group of types represented by a given subclass), attempt to cast the event_type to the first subclass and if you catch ValueError try the next subclass, and so on.

Parameters:
  • payload – A generic payload to include in the event

  • event_type – Event type or class (subclass of NamedEvent); keyword only parameter

  • id – Numeric event ID, or a sequential event ID if AUTO, or a random one if RANDOM

  • priority – Optional priority, with lower integers representing higher priorities

next_id() int[source]
random_id() int[source]
class async_nexus.EventFanout[source]

Bases: EventConsumer

Event consumer that sends each event to every registered handler.

A handler is a coroutine function/method with the same signature as handle() or an object with an equivalent method.

deregister(handler: Callable[[Event, Queue], None] | EventConsumer)[source]

Remove an unconditional event handler.

async handle(event: Event, queue: Queue) None[source]

Process the event through all handlers.

register(handler: Callable[[Event, Queue], None] | EventConsumer)[source]

Add an unconditional event handler.

class async_nexus.EventProducer(event_factory: EventFactory | None = None)[source]

Bases: EventSource

Event source used in push mode. Sends each event to one or more nexus objects. Unlike SimpleEventConverter, EventProducer emits events whenever they are ready, without being asked. Each nexus will await start() and call EventSource.close().

It’s up to an object of each subclass to manage its own flow of control. They shouldn’t do anything other than allocate resources until start() is awaited.

Uses weak references to stop circular references from preventing garbage collection.

Variables:
  • nexus_list – Maintains the list of AsyncEventNexus objects to send to :type nexus_list: List[AsyncEventNexus]

  • event_factory – Optional object that subclasses may use to create Event objects :type event_factory: Optional[EventFactory]

async distribute_event(event: Event) None[source]

Process the event through all nexuses.

register_nexus(nexus: AbstractNexus) None[source]

Mandatory method that must be called for each nexus that this producer is to be associated with.

async start() None[source]

Optionally, kicks off any actions the producer needs to do in order to start producing events. Subclasses must call await super().start().

class async_nexus.EventSource[source]

Bases: object

Abstract base class for anything that creates events. Do not subclass.

close()[source]
async start() None[source]

Any subclass’s start() coroutine (if any) must call await super().start().

class async_nexus.NamedEvent(id: int, payload: str, *, priority: int = 0)[source]

Bases: Event

The qualified name of any given subclass is now used as its type.

Note that the parameter order is different to Event.

class async_nexus.SimpleEventConverter[source]

Bases: EventSource

Event source used in pull mode. Blocks until an event is ready.

abstractmethod async obtain_event() Event[source]
class async_nexus.Timer(interval: float, *, event_type: int | str | Type[NamedEventTypeVar], type: int = 4, count: int = 0, event_factory: EventFactory | None = None)[source]

Bases: EventProducer

Represents one of several types of integer ticker, or a one-shot, timer that can be started/stopped and emits an event each time it fires.

Timer intervals might be longer than specified if other tasks block.

Don’t use this class for anything other than creating events.

Variables:
  • interval – How long (in seconds) the timer should run before emitting an event

  • type – What type of timer is being created (one of the below values)

  • starting_value – The value to start with

  • ending_value – The value to count up to or down from

  • direction_value – The value added each iteration

  • task – asyncio.Task

  • event_type – Event type or class (subclass of NamedEvent) to be used when an event is created

  • event_factory – Optional object used to create Event objects

COUNT_DOWN = 2
COUNT_UP = 1
ONE_SHOT = 4
ONGOING = 3
create_event(value: int) Event | None[source]

Called each time the timer fires.

Don’t call super().create_event() if overriding.

Parameters:

value – The current count down or count up value

Returns:

An event (if one is to be emitted on this cycle)

async start() Task[source]

Kicks off actions the producer needs to do in order to start producing events. Subclasses must call await super().start().

stop() bool[source]

Cancel the timer’s task.

Returns:

True if the timer task was cancelled (or never run) or False if it had already run

async async_nexus.anext(g)[source]
async_nexus.create_handler_coro(handler: Callable[[Event, Queue], None] | EventConsumer | Callable[[Event, Queue], bool], event: Event, queue: Queue) Coroutine[source]

Supports calling dynamic handlers that are either a coroutine function/method, or a EventConsumer object. Note that this doesn’t actually call the handler.