Usage

To use Async Nexus in a project:

import async_nexus

The first thing you’ll need to do in your code is create a async_nexus.AsyncEventNexus object:

nexus = async_nexus.AsyncEventNexus()

The purpose of this object is direct events to where they need to go. However, it does nothing without sources of events that it will ingest and distribute. These sources are written independently of the handlers of the events, so they can all be developed in a decoupled manner.

Events are typically obtained from objects created from subclasses of async_nexus.EventSource. Most of these subclasses in async_nexus are abstract, because they exist for you to subclass. That is how you implement the logic that creates async_nexus.Event objects in the way that suits your application architecture.

(Note that Async Nexus currently does not make use of an event object’s priority field.)

The next step is to create one or more event sources, e.g.:

timer = async_nexus.Timer(interval=1.5, type=async_nexus.Timer.COUNT_DOWN, count=5, event_type=98, event_factory=nexus)
nexus.add_producer(timer)

This uses async_nexus.Timer, the only event source that is usable as-is, thanks to its constructor’s event_factory parameter: async_nexus.AsyncEventNexus is able to create events with the event_type given. Each event’s payload will be the current countdown value and the id will be auto-generated.

A more general example:

async def teapot(nexus: async_nexus.AsyncEventNexus) -> None:
    await asyncio.sleep(3)
    event = async_nexus.Event(1001, 99, 0, "TEAPOT!")
    await nexus.ingest(event)

teapot_task = asyncio.create_task(teapot(nexus))

This is the simplest kind of event source, one that doesn’t use async_nexus.EventSource at all. It just creates a task that will send events to the nexus, e.g. by using a loop. However, it is not the most convenient, as we will see. (async_nexus.EventSource subclasses are detailed in “Event Sources” below.)

Then, a way to consume the events must be written, for example:

async def teapot_handler(event: async_nexus.Event, queue: asyncio.Queue):
    print("teapot ID: ", event.id)

nexus.add_handler(99, teapot_handler)
nexus.add_handler(-1, lambda e, q: print(e))

See “Event Handlers” below for more information on how events are handled by async_nexus.AsyncEventNexus.

Lastly, either start the nexus in blocking mode:

nexus.loop_forever()

Or, create a task for it to run in the background:

nexus_task = nexus.start()
...

Event Sources

To create a simple event source, subclass async_nexus.SimpleEventConverter and override its obtain_event() method, e.g.:

class MyEventConverter(async_nexus.SimpleEventConverter):
    """async_nexus.Event source used in pull mode."""

    id = 1000

    async def obtain_event(self) -> async_nexus.Event:
        await asyncio.sleep(0.2)
        self.id += 1
        return async_nexus.Event(self.id, 50, 0, "hi")

nexus.register(MyEventConverter())

Note that a sleep mimics some amount of processing rather than creating wall-to-wall events as fast as possible.

For a way to use a generator (and thus not have to use object properties to track state), subclass async_nexus.EventConverter and just override its event_generator() method.

See DemoEventConverter in demo/async_events.py for an example of how to use async_nexus.AsyncEventNexus.next_id().

Another kind of event source is an async_nexus.EventProducer subclass. These must have their own “push mode” interface or some kind of queue logic, which calls the supplied async_nexus.EventProducer.distribute_event() method. Subclasses should override the async_nexus.EventProducer.start() coroutine and async_nexus.EventSource.close() method.

To register a producer:

nexus.add_producer(MyEventProducer(event_factory=nexus))

Because it uses the parent constructor’s optional event_factory parameter, it can call self.event_factory.create_event(..., event_type=...) to create events.

The constructor can allocate resources, but the object shouldn’t begin creating events until async_nexus.EventProducer.start() has been awaited.

Event Handlers

If any event is ingested by a nexus and not consumed, this is a logic error and a async_nexus.errors.UnhandledEvent exception will be raised. To consume events, register filters (callables that take a async_nexus.Event and a asyncio.Queue parameter and return a bool) and/or handlers (either callables that take a async_nexus.Event and a asyncio.Queue parameter, or async_nexus.EventConsumer subclasses) with a nexus. These operate as follows, in this order:

  1. A filter function indicates that an event has been consumed, i.e. processing should stop, by returning True. Otherwise processing continues. Use async_nexus.AsyncEventNexus.add_filter() to register these. Filter functions are run in the order they were registered.

  2. A regular handler object (of a async_nexus.EventConsumer subclass) or function will only handle events of a given type. Use async_nexus.AsyncEventNexus.add_handler() to register these.

  3. A default handler object or function will handle all events not already consumed. Use async_nexus.AsyncEventNexus.add_handler() with event type -1 to register these.

async_nexus.EventDiscarder is provided for use as a handler for useless events.

Ideally, use of filter functions should be kept to a minimum, because they decide which events to consume based on code rather than letting async_nexus.AsyncEventNexus check the event type as is the case for handlers. A bad use case for a filter would be checking if the event type is in a list; instead, this could be done by registering the same handler for each event type. A good use case would be checking if an event type is in a range*. Another good use case would be to check if the event’s payload is invalid and discarding it by returning True; valid events will then be consumed by subsequent handlers.

* Although an event type range should be handled by modifying async_nexus.AsyncEventNexus.add_handler() to allow a (start, end) tuple specifying a range

Context Managers

E.g.:

with async_nexus.AsyncEventNexus() as nexus:
    ...
    nexus.loop_forever()

This will correctly call async_nexus.AsyncEventNexus.cleanup() even if an exception is thrown or async_nexus.AsyncEventNexus.stop() is called.

Event buses

There are several ways to use “publish-subscribe” architecture within an async_nexus-based program. (Of course, message brokers can be used outside of it or even within, alongside async_nexus.) One is to use an EventProducer subclass that is registered with multiple async_nexus.AsyncEventNexus objects. This approach is probably only useful when writing plugins, however, because usually a single nexus is sufficient for a given program.

Another way is to use a special handler called async_nexus.EventFanout, which sequentially sends copies of all received events to multiple handlers. This is more simple than creating a new async_nexus.AsyncEventNexus, because it doesn’t use queuing or asyncio.Task, for example.

The most complex approach is to use async_nexus.AsyncEventBoundary, which introduces the topic of event domains. This class is ready to use and acts as both a handler (registered with a nexus in one event domain) and a producer (for a nexus in another event domain). This allows only some events to be distributed from one program component to another. This approach would probably be used when doing something like creating an interchangeable logging (or alerting) backend, which has its own nexus that can receive log events from multiple sources. This component would expose an async_nexus.AsyncEventBoundary object, which other components could use as a handler. That way the component’s nexus would be kept internal.