Source code for async_nexus.boundaries
"""
Classes for managing the interaction between event domains, each of which
contains an :class:`async_nexus.AsyncEventNexus` and its associated
:class:`async_nexus.Event` sources and handlers.
"""
import asyncio
import async_nexus
[docs]
class AsyncEventBoundary(async_nexus.EventProducer, async_nexus.EventConsumer):
"""
Forwards events between event domains. This is done by hooking into two
different nexuses.
Acts as both a handler registered with :class:`async_nexus.AsyncEventNexus`
(``nexus1.add_handler(b)``) in one event domain and a producer
(``nexus2.add_producer(b)``) in another.
TODO: Send reply events back to the queue of the sending nexus, so it can dispatch them.
"""
QUEUE_MAXLEN = 50
def __init__(self):
super().__init__()
self.queue = asyncio.Queue(maxsize=self.QUEUE_MAXLEN)
self.ready: bool = True
[docs]
async def start(self):
"""
Start :meth:`transfer_events` in the background.
:return asyncio.Task: the task to be optionally awaited (in case it returns due to error or cancellation)
"""
await super().start()
self.task = asyncio.create_task(self.transfer_events())
return self.task
[docs]
async def transfer_events(self):
"""Continuously feed events from the queue to registered nexuses."""
while True:
event = await self.queue.get()
await self.distribute_event(event)
[docs]
async def handle(self, event: async_nexus.Event, queue: asyncio.Queue) -> None:
"""
Handle an event.
:param queue: The caller's queue to which any secondary events should be sent.
"""
if not self.ready:
raise asyncio.errors.BadCall("AsyncEventBoundary closed")
await self.queue.put(event)
# TODO: associate ``queue`` with event somehow, and use the latter to direct replies
[docs]
def close(self):
self.ready = False
asyncio.run(self.queue.join())