Skip to content

Asynchronous Capabilities in Mojentic

Mojentic provides robust asynchronous capabilities through its AsyncDispatcher and asynchronous agent classes. These features enable you to build complex, high-performance agent systems that can process multiple events concurrently.

Small Units of Computation

Agents are the smallest unit of computation in Mojentic. They are responsible for processing events and emitting new events. Agents can be combined to form complex systems.

Agents have a single public method that receive an event and return a list of events. This method is called receive_event.

Each Agent has its own state, and you can use its constructor to configure it, and provide it simple mechanisms like shared memory or access to knowledge or data, that it can use in its computations.

Within that one method, is where the Agent does it's work, alone.

This will require you to break down the work you want to do into these small units of computation. Incoming events are like commands, think tell-don't-ask in the extreme.

Event-Driven Architecture

The event-driven nature of is also very much in flux. Early thread-based agents lead to more complex synchronization than I wanted, so I'm using different agent formations to explore eventing in a more practical way.

The key purpose of events will be to support traceability and debugging of agentic flows.

Mojentic provides both synchronous and asynchronous event processing capabilities. The AsyncDispatcher enables concurrent event processing, allowing multiple agents to work simultaneously for improved performance and responsiveness. This is particularly valuable for complex workflows involving multiple LLM calls or other high-latency operations.

Events are the only way agents communicate with each other. Events are simple data classes that are passed around the system. Events are immutable.

Here's a high-level overview of how events flow through the system:

flowchart LR
    A1[Agent 1] -->|Event 1| A2[Agent 2]
    A2 -->|Event 2| A3[Agent 3]
    A2 -->|Event 3| A4[Agent 4]
    A3 -->|Event 4| A1

    subgraph Shared Memory
        M[(Working Memory)]
    end

    A1 -.-> M
    A2 -.-> M
    A3 -.-> M
    A4 -.-> M

The diagram above shows how agents communicate through events (solid lines) and can access shared working memory (dotted lines) when needed.

Introduction to Asynchronous Processing

Asynchronous processing is particularly valuable in agent-based systems where:

  • Multiple agents need to process events concurrently
  • Some operations (like LLM calls) have significant latency
  • You need to maintain responsiveness while handling complex workflows
  • You want to maximize throughput in multi-agent systems

Mojentic's asynchronous capabilities are built on Python's asyncio library and provide a clean, event-driven architecture for building sophisticated agent systems.

Key Components

AsyncDispatcher

The AsyncDispatcher is the core component for asynchronous event processing in Mojentic. It manages an event queue and dispatches events to appropriate agents based on routing rules.

from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.router import Router

# Create a router and register agents
router = Router()
# ... register agents with router ...

# Create and start the dispatcher
dispatcher = await AsyncDispatcher(router).start()

# Dispatch events
dispatcher.dispatch(my_event)

# Wait for all events to be processed
await dispatcher.wait_for_empty_queue()

# Stop the dispatcher when done
await dispatcher.stop()

Key features of the AsyncDispatcher:

  • Event Queue Management: Efficiently manages a queue of events to be processed
  • Batch Processing: Processes events in configurable batch sizes for optimal performance
  • Asynchronous Dispatch: Uses asyncio for non-blocking event processing
  • Graceful Shutdown: Provides methods to stop processing and wait for the queue to empty
  • Tracing Support: Integrates with Mojentic's tracing system for observability

Asynchronous Agents

Mojentic provides base classes for creating asynchronous agents:

BaseAsyncAgent

The BaseAsyncAgent class is the foundation for all asynchronous agents in Mojentic. It defines the receive_event_async method that asynchronous agents must implement.

from mojentic.agents.base_async_agent import BaseAsyncAgent
from mojentic.event import Event
from typing import List

class MyAsyncAgent(BaseAsyncAgent):
    async def receive_event_async(self, event: Event) -> List[Event]:
        # Process the event asynchronously
        # ...
        return [new_event]  # Return any new events to be dispatched

BaseAsyncLLMAgent

The BaseAsyncLLMAgent extends BaseAsyncAgent with LLM capabilities, allowing you to create agents that use LLMs asynchronously.

from mojentic.agents.async_llm_agent import BaseAsyncLLMAgent
from mojentic.event import Event
from typing import List

class MyAsyncLLMAgent(BaseAsyncLLMAgent):
    def __init__(self, llm):
        super().__init__(
            llm=llm,
            behaviour="You are a helpful assistant.",
            response_model=MyResponseModel
        )

    async def receive_event_async(self, event: Event) -> List[Event]:
        # Use the LLM asynchronously
        response = await self.generate_response("Your prompt here")
        return [new_event]  # Return any new events to be dispatched

AsyncAggregatorAgent

The AsyncAggregatorAgent is a specialized asynchronous agent that collects multiple events before processing them together. This is useful for tasks that require combining information from multiple sources.

from mojentic.agents.async_aggregator_agent import AsyncAggregatorAgent
from mojentic.event import Event
from typing import List

class MyAggregatorAgent(AsyncAggregatorAgent):
    def __init__(self):
        super().__init__(event_types_needed=[EventType1, EventType2, EventType3])

    async def process_events(self, events):
        # Process the collected events
        # ...
        return [new_event]  # Return any new events to be dispatched

Example: Asynchronous Text Analysis System

The following example demonstrates how to build an asynchronous text analysis system using Mojentic's asynchronous capabilities. This system analyzes and summarizes text concurrently, then combines the results.

import asyncio
from typing import List
from pydantic import BaseModel, Field

from mojentic.agents.async_aggregator_agent import AsyncAggregatorAgent
from mojentic.agents.async_llm_agent import BaseAsyncLLMAgent
from mojentic.agents.base_async_agent import BaseAsyncAgent
from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.event import Event, TerminateEvent
from mojentic.llm import LLMBroker
from mojentic.router import Router

# Define events
class TextEvent(Event):
    text: str

class AnalysisEvent(Event):
    analysis: str

class SummaryEvent(Event):
    summary: str

class CombinedResultEvent(Event):
    text: str
    analysis: str
    summary: str
    combined: str

# Define response models
class AnalysisResponse(BaseModel):
    analysis: str

class SummaryResponse(BaseModel):
    summary: str

class CombinedResponse(BaseModel):
    combined: str

# Define agents
class TextAnalyzerAgent(BaseAsyncLLMAgent):
    def __init__(self, llm: LLMBroker):
        super().__init__(
            llm=llm,
            behaviour="You are a text analysis assistant.",
            response_model=AnalysisResponse
        )

    async def receive_event_async(self, event: Event) -> List[Event]:
        if isinstance(event, TextEvent):
            prompt = f"Analyze this text: {event.text}"
            response = await self.generate_response(prompt)
            return [AnalysisEvent(
                source=type(self), 
                correlation_id=event.correlation_id, 
                analysis=response.analysis
            )]
        return []

class TextSummarizerAgent(BaseAsyncLLMAgent):
    def __init__(self, llm: LLMBroker):
        super().__init__(
            llm=llm,
            behaviour="You are a text summarization assistant.",
            response_model=SummaryResponse
        )

    async def receive_event_async(self, event: Event) -> List[Event]:
        if isinstance(event, TextEvent):
            prompt = f"Summarize this text: {event.text}"
            response = await self.generate_response(prompt)
            return [SummaryEvent(
                source=type(self), 
                correlation_id=event.correlation_id, 
                summary=response.summary
            )]
        return []

class ResultCombinerAgent(AsyncAggregatorAgent):
    def __init__(self, llm: LLMBroker):
        super().__init__(event_types_needed=[TextEvent, AnalysisEvent, SummaryEvent])
        self.llm = llm
        self.response_model = CombinedResponse

    async def process_events(self, events):
        text_event = next((e for e in events if isinstance(e, TextEvent)), None)
        analysis_event = next((e for e in events if isinstance(e, AnalysisEvent)), None)
        summary_event = next((e for e in events if isinstance(e, SummaryEvent)), None)

        if text_event and analysis_event and summary_event:
            # Combine the results
            combined = f"Original text: {text_event.text}\n\nAnalysis: {analysis_event.analysis}\n\nSummary: {summary_event.summary}"

            return [CombinedResultEvent(
                source=type(self),
                correlation_id=text_event.correlation_id,
                text=text_event.text,
                analysis=analysis_event.analysis,
                summary=summary_event.summary,
                combined=combined
            )]
        return []

class ResultOutputAgent(BaseAsyncAgent):
    async def receive_event_async(self, event: Event) -> List[Event]:
        if isinstance(event, CombinedResultEvent):
            print("\n=== FINAL RESULT ===")
            print(event.combined)
            print("===================\n")

            return [TerminateEvent(source=type(self), correlation_id=event.correlation_id)]
        return []

# Main function
async def main():
    # Initialize the LLM broker
    llm = LLMBroker(model="your-model-name")

    # Create a router and register agents
    router = Router()

    # Create agents
    analyzer = TextAnalyzerAgent(llm)
    summarizer = TextSummarizerAgent(llm)
    combiner = ResultCombinerAgent(llm)
    output_agent = ResultOutputAgent()

    # Register agents with the router
    router.add_route(TextEvent, analyzer)
    router.add_route(TextEvent, summarizer)
    router.add_route(TextEvent, combiner)
    router.add_route(AnalysisEvent, combiner)
    router.add_route(SummaryEvent, combiner)
    router.add_route(CombinedResultEvent, output_agent)

    # Create and start the dispatcher
    dispatcher = await AsyncDispatcher(router).start()

    # Create a text event
    text = "Your text to analyze and summarize"
    event = TextEvent(source="ExampleSource", text=text)

    # Dispatch the event
    dispatcher.dispatch(event)

    # Wait for all events to be processed
    await dispatcher.wait_for_empty_queue()

    # Stop the dispatcher
    await dispatcher.stop()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage: Customizing AsyncDispatcher

You can customize the AsyncDispatcher to fit your specific needs:

from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.router import Router
from mojentic.tracer import Tracer

# Create a router
router = Router()

# Create a tracer for observability
tracer = Tracer()

# Create a customized AsyncDispatcher
dispatcher = AsyncDispatcher(
    router=router,
    shared_working_memory=my_shared_memory,  # Optional shared memory
    batch_size=10,  # Process 10 events per batch
    tracer=tracer  # Use custom tracer
)

# Start the dispatcher
await dispatcher.start()

API Reference

AsyncDispatcher

mojentic.async_dispatcher.AsyncDispatcher

AsyncDispatcher class is an asynchronous version of the Dispatcher class. It uses asyncio and deque for event processing.

Source code in src/mojentic/async_dispatcher.py
class AsyncDispatcher:
    """
    AsyncDispatcher class is an asynchronous version of the Dispatcher class.
    It uses asyncio and deque for event processing.
    """
    def __init__(self, router, shared_working_memory=None, batch_size=5, tracer=None):
        """
        Initialize the AsyncDispatcher.

        Parameters
        ----------
        router : Router
            The router to use for routing events to agents
        shared_working_memory : SharedWorkingMemory, optional
            The shared working memory to use
        batch_size : int, optional
            The number of events to process in each batch
        tracer : Tracer, optional
            The tracer to use for tracing events
        """
        self.router = router
        self.batch_size = batch_size
        self.event_queue = deque()
        self._stop_event = asyncio.Event()
        self._task = None

        # Use null_tracer if no tracer is provided
        from mojentic.tracer import null_tracer
        self.tracer = tracer or null_tracer

    async def start(self):
        """
        Start the event dispatch task.
        """
        logger.debug("Starting event dispatch task")
        self._task = asyncio.create_task(self._dispatch_events())
        return self

    async def stop(self):
        """
        Stop the event dispatch task.
        """
        self._stop_event.set()
        if self._task:
            await self._task

    async def wait_for_empty_queue(self, timeout=None):
        """
        Wait for the event queue to be empty.

        Parameters
        ----------
        timeout : float, optional
            The timeout in seconds

        Returns
        -------
        bool
            True if the queue is empty, False if the timeout was reached
        """
        start_time = asyncio.get_event_loop().time()
        while len(self.event_queue) > 0:
            if timeout is not None and asyncio.get_event_loop().time() - start_time > timeout:
                return False
            await asyncio.sleep(0.1)
        return True

    def dispatch(self, event):
        """
        Dispatch an event to the event queue.

        Parameters
        ----------
        event : Event
            The event to dispatch
        """
        logger.log(logging.DEBUG, f"Dispatching event: {event}")
        if event.correlation_id is None:
            event.correlation_id = str(uuid4())
        self.event_queue.append(event)

    async def _dispatch_events(self):
        """
        Dispatch events from the event queue to agents.
        """
        while not self._stop_event.is_set():
            for _ in range(self.batch_size):
                logger.debug("Checking for events")
                if len(self.event_queue) > 0:
                    logger.debug(f"{len(self.event_queue)} events in queue")
                    event = self.event_queue.popleft()
                    logger.debug(f"Processing event: {event}")
                    agents = self.router.get_agents(event)
                    logger.debug(f"Found {len(agents)} agents for event type {type(event)}")
                    events = []
                    for agent in agents:
                        logger.debug(f"Sending event to agent {agent}")

                        # Record agent interaction in tracer system
                        self.tracer.record_agent_interaction(
                            from_agent=str(event.source),
                            to_agent=str(type(agent)),
                            event_type=str(type(event).__name__),
                            event_id=event.correlation_id,
                            source=type(self)
                        )

                        # Process the event through the agent
                        # If the agent is an async agent, await its receive_event method
                        if hasattr(agent, 'receive_event_async'):
                            received_events = await agent.receive_event_async(event)
                        else:
                            received_events = agent.receive_event(event)

                        logger.debug(f"Agent {agent} returned {len(received_events)} events")
                        events.extend(received_events)
                    for fe in events:
                        if type(fe) is TerminateEvent:
                            self._stop_event.set()
                        self.dispatch(fe)
            await asyncio.sleep(0.1)  # Use asyncio.sleep instead of time.sleep

__init__(router, shared_working_memory=None, batch_size=5, tracer=None)

Initialize the AsyncDispatcher.

Parameters:

Name Type Description Default
router Router

The router to use for routing events to agents

required
shared_working_memory SharedWorkingMemory

The shared working memory to use

None
batch_size int

The number of events to process in each batch

5
tracer Tracer

The tracer to use for tracing events

None
Source code in src/mojentic/async_dispatcher.py
def __init__(self, router, shared_working_memory=None, batch_size=5, tracer=None):
    """
    Initialize the AsyncDispatcher.

    Parameters
    ----------
    router : Router
        The router to use for routing events to agents
    shared_working_memory : SharedWorkingMemory, optional
        The shared working memory to use
    batch_size : int, optional
        The number of events to process in each batch
    tracer : Tracer, optional
        The tracer to use for tracing events
    """
    self.router = router
    self.batch_size = batch_size
    self.event_queue = deque()
    self._stop_event = asyncio.Event()
    self._task = None

    # Use null_tracer if no tracer is provided
    from mojentic.tracer import null_tracer
    self.tracer = tracer or null_tracer

dispatch(event)

Dispatch an event to the event queue.

Parameters:

Name Type Description Default
event Event

The event to dispatch

required
Source code in src/mojentic/async_dispatcher.py
def dispatch(self, event):
    """
    Dispatch an event to the event queue.

    Parameters
    ----------
    event : Event
        The event to dispatch
    """
    logger.log(logging.DEBUG, f"Dispatching event: {event}")
    if event.correlation_id is None:
        event.correlation_id = str(uuid4())
    self.event_queue.append(event)

start() async

Start the event dispatch task.

Source code in src/mojentic/async_dispatcher.py
async def start(self):
    """
    Start the event dispatch task.
    """
    logger.debug("Starting event dispatch task")
    self._task = asyncio.create_task(self._dispatch_events())
    return self

stop() async

Stop the event dispatch task.

Source code in src/mojentic/async_dispatcher.py
async def stop(self):
    """
    Stop the event dispatch task.
    """
    self._stop_event.set()
    if self._task:
        await self._task

wait_for_empty_queue(timeout=None) async

Wait for the event queue to be empty.

Parameters:

Name Type Description Default
timeout float

The timeout in seconds

None

Returns:

Type Description
bool

True if the queue is empty, False if the timeout was reached

Source code in src/mojentic/async_dispatcher.py
async def wait_for_empty_queue(self, timeout=None):
    """
    Wait for the event queue to be empty.

    Parameters
    ----------
    timeout : float, optional
        The timeout in seconds

    Returns
    -------
    bool
        True if the queue is empty, False if the timeout was reached
    """
    start_time = asyncio.get_event_loop().time()
    while len(self.event_queue) > 0:
        if timeout is not None and asyncio.get_event_loop().time() - start_time > timeout:
            return False
        await asyncio.sleep(0.1)
    return True

BaseAsyncAgent

mojentic.agents.base_async_agent.BaseAsyncAgent

BaseAsyncAgent class is the base class for all asynchronous agents. It provides an async receive method for event processing.

Source code in src/mojentic/agents/base_async_agent.py
class BaseAsyncAgent:
    """
    BaseAsyncAgent class is the base class for all asynchronous agents.
    It provides an async receive method for event processing.
    """

    async def receive_event_async(self, event: Event) -> List[Event]:
        """
        receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of
        events as output.

        In this way, you can perform work based on the event, and generate whatever subsequent events may need to be
        processed next.

        This keeps the agent decoupled from the specifics of the event routing and processing.

        Events are subclasses of the Event class.

        :param event: The event to process
        :return: A list of events to be processed next
        """
        return []

receive_event_async(event) async

receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of events as output.

In this way, you can perform work based on the event, and generate whatever subsequent events may need to be processed next.

This keeps the agent decoupled from the specifics of the event routing and processing.

Events are subclasses of the Event class.

:param event: The event to process :return: A list of events to be processed next

Source code in src/mojentic/agents/base_async_agent.py
async def receive_event_async(self, event: Event) -> List[Event]:
    """
    receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of
    events as output.

    In this way, you can perform work based on the event, and generate whatever subsequent events may need to be
    processed next.

    This keeps the agent decoupled from the specifics of the event routing and processing.

    Events are subclasses of the Event class.

    :param event: The event to process
    :return: A list of events to be processed next
    """
    return []

AsyncAggregatorAgent

mojentic.agents.async_aggregator_agent.AsyncAggregatorAgent

Bases: BaseAsyncAgent

AsyncAggregatorAgent is an asynchronous version of the BaseAggregatingAgent. It aggregates events based on their correlation_id and processes them when all required events are available.

Source code in src/mojentic/agents/async_aggregator_agent.py
class AsyncAggregatorAgent(BaseAsyncAgent):
    """
    AsyncAggregatorAgent is an asynchronous version of the BaseAggregatingAgent.
    It aggregates events based on their correlation_id and processes them when all required events are available.
    """
    def __init__(self, event_types_needed=None):
        """
        Initialize the AsyncAggregatorAgent.

        Parameters
        ----------
        event_types_needed : list, optional
            List of event types that need to be captured before processing
        """
        super().__init__()
        self.results = {}
        self.event_types_needed = event_types_needed or []
        self.futures = {}  # Maps correlation_id to Future objects

    async def _get_and_reset_results(self, event):
        """
        Get and reset the results for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to get results for

        Returns
        -------
        list
            The results for the event
        """
        results = self.results[event.correlation_id]
        self.results[event.correlation_id] = None
        return results

    async def _capture_results_if_needed(self, event):
        """
        Capture results for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to capture results for
        """
        results = self.results.get(event.correlation_id, [])
        results.append(event)
        self.results[event.correlation_id] = results

        # Check if we have all needed events and set the future if we do
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

        if finished and event.correlation_id in self.futures:
            future = self.futures[event.correlation_id]
            if not future.done():
                future.set_result(self.results[event.correlation_id])

    async def _has_all_needed(self, event):
        """
        Check if all needed event types have been captured for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to check

        Returns
        -------
        bool
            True if all needed event types have been captured, False otherwise
        """
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])
        logger.debug(f"Captured: {event_types_captured}, Needed: {self.event_types_needed}, Finished: {finished}")
        return finished

    async def wait_for_events(self, correlation_id, timeout=None):
        """
        Wait for all needed events for a specific correlation_id.

        Parameters
        ----------
        correlation_id : str
            The correlation_id to wait for
        timeout : float, optional
            The timeout in seconds

        Returns
        -------
        list
            The events for the correlation_id
        """
        if correlation_id not in self.futures:
            self.futures[correlation_id] = asyncio.Future()

        # If we already have all needed events, return them
        if correlation_id in self.results:
            event_types_captured = [type(e) for e in self.results.get(correlation_id, [])]
            if all([event_type in event_types_captured for event_type in self.event_types_needed]):
                return self.results[correlation_id]

        # Otherwise, wait for the future to be set
        try:
            return await asyncio.wait_for(self.futures[correlation_id], timeout)
        except asyncio.TimeoutError:
            logger.warning(f"Timeout waiting for events for correlation_id {correlation_id}")
            return self.results.get(correlation_id, [])

    async def receive_event_async(self, event: Event) -> list:
        """
        Receive an event and process it if all needed events are available.

        Parameters
        ----------
        event : Event
            The event to process

        Returns
        -------
        list
            The events to be processed next
        """
        # First capture the event
        await self._capture_results_if_needed(event)

        # Then check if we have all needed events
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

        # If we have all needed events, process them
        if finished:
            return await self.process_events(await self._get_and_reset_results(event))

        return []

    async def process_events(self, events):
        """
        Process a list of events.
        This method should be overridden by subclasses.

        Parameters
        ----------
        events : list
            The events to process

        Returns
        -------
        list
            The events to be processed next
        """
        return []

__init__(event_types_needed=None)

Initialize the AsyncAggregatorAgent.

Parameters:

Name Type Description Default
event_types_needed list

List of event types that need to be captured before processing

None
Source code in src/mojentic/agents/async_aggregator_agent.py
def __init__(self, event_types_needed=None):
    """
    Initialize the AsyncAggregatorAgent.

    Parameters
    ----------
    event_types_needed : list, optional
        List of event types that need to be captured before processing
    """
    super().__init__()
    self.results = {}
    self.event_types_needed = event_types_needed or []
    self.futures = {}  # Maps correlation_id to Future objects

process_events(events) async

Process a list of events. This method should be overridden by subclasses.

Parameters:

Name Type Description Default
events list

The events to process

required

Returns:

Type Description
list

The events to be processed next

Source code in src/mojentic/agents/async_aggregator_agent.py
async def process_events(self, events):
    """
    Process a list of events.
    This method should be overridden by subclasses.

    Parameters
    ----------
    events : list
        The events to process

    Returns
    -------
    list
        The events to be processed next
    """
    return []

receive_event_async(event) async

Receive an event and process it if all needed events are available.

Parameters:

Name Type Description Default
event Event

The event to process

required

Returns:

Type Description
list

The events to be processed next

Source code in src/mojentic/agents/async_aggregator_agent.py
async def receive_event_async(self, event: Event) -> list:
    """
    Receive an event and process it if all needed events are available.

    Parameters
    ----------
    event : Event
        The event to process

    Returns
    -------
    list
        The events to be processed next
    """
    # First capture the event
    await self._capture_results_if_needed(event)

    # Then check if we have all needed events
    event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
    finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

    # If we have all needed events, process them
    if finished:
        return await self.process_events(await self._get_and_reset_results(event))

    return []

wait_for_events(correlation_id, timeout=None) async

Wait for all needed events for a specific correlation_id.

Parameters:

Name Type Description Default
correlation_id str

The correlation_id to wait for

required
timeout float

The timeout in seconds

None

Returns:

Type Description
list

The events for the correlation_id

Source code in src/mojentic/agents/async_aggregator_agent.py
async def wait_for_events(self, correlation_id, timeout=None):
    """
    Wait for all needed events for a specific correlation_id.

    Parameters
    ----------
    correlation_id : str
        The correlation_id to wait for
    timeout : float, optional
        The timeout in seconds

    Returns
    -------
    list
        The events for the correlation_id
    """
    if correlation_id not in self.futures:
        self.futures[correlation_id] = asyncio.Future()

    # If we already have all needed events, return them
    if correlation_id in self.results:
        event_types_captured = [type(e) for e in self.results.get(correlation_id, [])]
        if all([event_type in event_types_captured for event_type in self.event_types_needed]):
            return self.results[correlation_id]

    # Otherwise, wait for the future to be set
    try:
        return await asyncio.wait_for(self.futures[correlation_id], timeout)
    except asyncio.TimeoutError:
        logger.warning(f"Timeout waiting for events for correlation_id {correlation_id}")
        return self.results.get(correlation_id, [])

Best Practices

When working with Mojentic's asynchronous capabilities, keep these best practices in mind:

  1. Use Correlation IDs: Always ensure events have correlation IDs to track related events through the system.

  2. Handle Exceptions: Implement proper exception handling in your asynchronous agents to prevent crashes.

  3. Monitor Queue Size: For production systems, monitor the event queue size to detect potential bottlenecks.

  4. Batch Size Tuning: Adjust the batch size based on your workload characteristics for optimal performance.

  5. Graceful Shutdown: Always use the stop() method to gracefully shut down the dispatcher.

  6. Tracing: Use the tracing system to monitor and debug your asynchronous agent system.

  7. Timeout Management: Implement timeouts for wait_for_empty_queue() to prevent indefinite waiting.

  8. Event Design: Design your events to carry all necessary information without requiring additional lookups.

Conclusion

Mojentic's asynchronous capabilities provide a powerful foundation for building complex, high-performance agent systems. By leveraging the AsyncDispatcher and asynchronous agent classes, you can create systems that efficiently process events concurrently, making the most of available resources and providing responsive, scalable solutions.