Skip to content

Level 2 - Elemental Agents and Events

This layer is about building Agents and the protocol by which a system of Agents can accomplish work.

This layer is in flux! It will change frequently right now.

Simple Agents

Simple agents provide straightforward, synchronous approaches to problem-solving using LLMs.

  • IterativeProblemSolver: An agent that solves problems iteratively by applying a Local Language Model (LLM) to a problem until a solution is found or a maximum number of iterations is reached.

  • SimpleRecursiveAgent: An agent that solves problems recursively by applying an LLM to a problem until a solution is found or a maximum number of iterations is reached.

API Reference

mojentic.agents.IterativeProblemSolver

An agent that iteratively attempts to solve a problem using available tools.

This solver uses a chat-based approach to break down and solve complex problems. It will continue attempting to solve the problem until it either succeeds, fails explicitly, or reaches the maximum number of iterations.

Attributes:

Name Type Description
max_iterations int

Maximum number of attempts to solve the problem

chat ChatSession

The chat session used for problem-solving interaction

Source code in src/mojentic/agents/iterative_problem_solver.py
class IterativeProblemSolver:
    """An agent that iteratively attempts to solve a problem using available tools.

    This solver uses a chat-based approach to break down and solve complex problems.
    It will continue attempting to solve the problem until it either succeeds,
    fails explicitly, or reaches the maximum number of iterations.

    Attributes
    ----------
    max_iterations : int
        Maximum number of attempts to solve the problem
    chat : ChatSession
        The chat session used for problem-solving interaction
    """

    max_iterations: int
    chat: ChatSession

    def __init__(self, llm: LLMBroker, available_tools: Optional[List[LLMTool]] = None, max_iterations: int = 3,
                 system_prompt: Optional[str] = None):
        """Initialize the IterativeProblemSolver.

        Parameters
        ----------
        llm : LLMBroker
            The language model broker to use for generating responses
        available_tools : Optional[List[LLMTool]], optional
            List of tools that can be used to solve the problem, by default None
        max_iterations : int, optional
            Maximum number of attempts to solve the problem, by default 3
        """
        self.max_iterations = max_iterations
        self.available_tools = available_tools or []
        self.chat = ChatSession(
            llm=llm,
            system_prompt=system_prompt or "You are a problem-solving assistant that can solve complex problems step by step. "
                                           "You analyze problems, break them down into smaller parts, and solve them systematically. "
                                           "If you cannot solve a problem completely in one step, you make progress and identify what to do next.",
            tools=self.available_tools,
        )

    def solve(self, problem: str):
        """Execute the problem-solving process.

        This method runs the iterative problem-solving process, continuing until one of
        these conditions is met:
        - The task is completed successfully ("DONE")
        - The task fails explicitly ("FAIL")
        - The maximum number of iterations is reached

        Parameters
        ----------
        problem : str
            The problem or request to be solved

        Returns
        -------
        str
            A summary of the final result, excluding the process details
        """
        iterations_remaining = self.max_iterations

        while True:
            result = self._step(problem)

            if "FAIL".lower() in result.lower():
                logger.info("Task failed", user_request=problem, result=result)
                break
            elif "DONE".lower() in result.lower():
                logger.info("Task completed", user_request=problem, result=result)
                break

            iterations_remaining -= 1
            if iterations_remaining == 0:
                logger.info("Max iterations reached", max_iterations=self.max_iterations,
                            user_request=problem, result=result)
                break

        result = self.chat.send(
            "Summarize the final result, and only the final result, without commenting on the process by which you achieved it.")

        return result

    def _step(self, problem: str) -> str:
        """Execute a single problem-solving step.

        This method sends a prompt to the chat session asking it to work on the user's request
        using available tools. The response should indicate success ("DONE") or failure ("FAIL").

        Parameters
        ----------
        problem : str
            The problem or request to be solved

        Returns
        -------
        str
            The response from the chat session, indicating the step's outcome
        """
        prompt = f"""
Given the user request:
{problem}

Use the tools at your disposal to act on their request. You may wish to create a step-by-step plan for more complicated requests.

If you cannot provide an answer, say only "FAIL".
If you have the answer, say only "DONE".
"""
        return self.chat.send(prompt)

__init__(llm, available_tools=None, max_iterations=3, system_prompt=None)

Initialize the IterativeProblemSolver.

Parameters:

Name Type Description Default
llm LLMBroker

The language model broker to use for generating responses

required
available_tools Optional[List[LLMTool]]

List of tools that can be used to solve the problem, by default None

None
max_iterations int

Maximum number of attempts to solve the problem, by default 3

3
Source code in src/mojentic/agents/iterative_problem_solver.py
def __init__(self, llm: LLMBroker, available_tools: Optional[List[LLMTool]] = None, max_iterations: int = 3,
             system_prompt: Optional[str] = None):
    """Initialize the IterativeProblemSolver.

    Parameters
    ----------
    llm : LLMBroker
        The language model broker to use for generating responses
    available_tools : Optional[List[LLMTool]], optional
        List of tools that can be used to solve the problem, by default None
    max_iterations : int, optional
        Maximum number of attempts to solve the problem, by default 3
    """
    self.max_iterations = max_iterations
    self.available_tools = available_tools or []
    self.chat = ChatSession(
        llm=llm,
        system_prompt=system_prompt or "You are a problem-solving assistant that can solve complex problems step by step. "
                                       "You analyze problems, break them down into smaller parts, and solve them systematically. "
                                       "If you cannot solve a problem completely in one step, you make progress and identify what to do next.",
        tools=self.available_tools,
    )

solve(problem)

Execute the problem-solving process.

This method runs the iterative problem-solving process, continuing until one of these conditions is met: - The task is completed successfully ("DONE") - The task fails explicitly ("FAIL") - The maximum number of iterations is reached

Parameters:

Name Type Description Default
problem str

The problem or request to be solved

required

Returns:

Type Description
str

A summary of the final result, excluding the process details

Source code in src/mojentic/agents/iterative_problem_solver.py
def solve(self, problem: str):
    """Execute the problem-solving process.

    This method runs the iterative problem-solving process, continuing until one of
    these conditions is met:
    - The task is completed successfully ("DONE")
    - The task fails explicitly ("FAIL")
    - The maximum number of iterations is reached

    Parameters
    ----------
    problem : str
        The problem or request to be solved

    Returns
    -------
    str
        A summary of the final result, excluding the process details
    """
    iterations_remaining = self.max_iterations

    while True:
        result = self._step(problem)

        if "FAIL".lower() in result.lower():
            logger.info("Task failed", user_request=problem, result=result)
            break
        elif "DONE".lower() in result.lower():
            logger.info("Task completed", user_request=problem, result=result)
            break

        iterations_remaining -= 1
        if iterations_remaining == 0:
            logger.info("Max iterations reached", max_iterations=self.max_iterations,
                        user_request=problem, result=result)
            break

    result = self.chat.send(
        "Summarize the final result, and only the final result, without commenting on the process by which you achieved it.")

    return result

mojentic.agents.SimpleRecursiveAgent

An agent that recursively attempts to solve a problem using available tools.

This agent uses an event-driven approach to manage the problem-solving process. It will continue attempting to solve the problem until it either succeeds, fails explicitly, or reaches the maximum number of recursions.

Attributes:

Name Type Description
max_iterations int

The maximum number of iterations to perform

llm LLMBroker

The language model broker to use for generating responses

emitter EventEmitter

The pubsub event emitter used to manage events

available_tools List[LLMTool]

List of tools that can be used to solve the problem

chat ChatSession

The chat session used for problem-solving interaction

Source code in src/mojentic/agents/simple_recursive_agent.py
class SimpleRecursiveAgent:
    """An agent that recursively attempts to solve a problem using available tools.

    This agent uses an event-driven approach to manage the problem-solving process.
    It will continue attempting to solve the problem until it either succeeds,
    fails explicitly, or reaches the maximum number of recursions.

    Attributes
    ----------
    max_iterations : int
        The maximum number of iterations to perform
    llm : LLMBroker
        The language model broker to use for generating responses
    emitter : EventEmitter
        The pubsub event emitter used to manage events
    available_tools : List[LLMTool]
        List of tools that can be used to solve the problem
    chat : ChatSession
        The chat session used for problem-solving interaction
    """
    max_iterations: int
    llm: LLMBroker
    available_tools: List[LLMTool]
    emitter: EventEmitter
    chat: ChatSession

    def __init__(self, llm: LLMBroker, available_tools: Optional[List[LLMTool]] = None, max_iterations: int = 5, system_prompt: Optional[str] = None):
        """
        Initialize the SimpleRecursiveAgent.

        Parameters
        ----------
        llm : LLMBroker
            The language model broker to use for generating responses
        max_iterations : int
            The maximum number of iterations to perform
        available_tools : Optional[List[LLMTool]]
            List of tools that can be used to solve the problem
        """
        self.max_iterations = max_iterations
        self.llm = llm
        self.available_tools = available_tools or []
        self.emitter = EventEmitter()

        # Initialize the chat session
        self.chat = ChatSession(
            llm=llm,
            system_prompt=system_prompt or "You are a problem-solving assistant that can solve complex problems step by step. "
                         "You analyze problems, break them down into smaller parts, and solve them systematically. "
                         "If you cannot solve a problem completely in one step, you make progress and identify what to do next.",
            tools=self.available_tools
        )

        # Set up event handlers
        self.emitter.subscribe(GoalSubmittedEvent, self._handle_problem_submitted)
        self.emitter.subscribe(IterationCompletedEvent, self._handle_iteration_completed)

    async def solve(self, problem: str) -> str:
        """
        Solve a problem asynchronously.

        Parameters
        ----------
        problem : str
            The problem to solve

        Returns
        -------
        str
            The solution to the problem
        """
        # Create a future to signal when the solution is ready
        solution_future = asyncio.Future()

        # Create the initial problem state
        state = GoalState(goal=problem, max_iterations=self.max_iterations)

        # Define handlers for completion events
        async def handle_solution_event(event):
            if not solution_future.done():
                solution_future.set_result(event.state.solution)

        # Subscribe to completion events
        self.emitter.subscribe(GoalAchievedEvent, handle_solution_event)
        self.emitter.subscribe(GoalFailedEvent, handle_solution_event)
        self.emitter.subscribe(TimeoutEvent, handle_solution_event)

        # Start the solving process
        self.emitter.emit(GoalSubmittedEvent(state=state))

        # Wait for the solution or timeout
        try:
            return await asyncio.wait_for(solution_future, timeout=300)  # 5 minutes timeout
        except asyncio.TimeoutError:
            timeout_message = f"Timeout: Could not solve the problem within 300 seconds."
            if not solution_future.done():
                state.solution = timeout_message
                state.is_complete = True
                self.emitter.emit(TimeoutEvent(state=state))
            return timeout_message

    async def _handle_problem_submitted(self, event: GoalSubmittedEvent):
        """
        Handle a problem submitted event.

        Parameters
        ----------
        event : GoalSubmittedEvent
            The problem submitted event to handle
        """
        # Start the first iteration
        await self._process_iteration(event.state)

    async def _handle_iteration_completed(self, event: IterationCompletedEvent):
        """
        Handle an iteration completed event.

        Parameters
        ----------
        event : IterationCompletedEvent
            The iteration completed event to handle
        """
        state = event.state
        response = event.response

        # Check if the task failed or succeeded
        if "FAIL".lower() in response.lower():
            state.solution = f"Failed to solve after {state.iteration} iterations:\n{response}"
            state.is_complete = True
            self.emitter.emit(GoalFailedEvent(state=state))
            return
        elif "DONE".lower() in response.lower():
            state.solution = response
            state.is_complete = True
            self.emitter.emit(GoalAchievedEvent(state=state))
            return

        # Check if we've reached the maximum number of iterations
        if state.iteration >= state.max_iterations:
            state.solution = f"Best solution after {state.max_iterations} iterations:\n{response}"
            state.is_complete = True
            self.emitter.emit(GoalAchievedEvent(state=state))
            return

        # If the problem is not solved and we haven't reached max_iterations, continue with next iteration
        await self._process_iteration(state)

    async def _process_iteration(self, state: GoalState):
        """
        Process a single iteration of the problem-solving process.

        Parameters
        ----------
        state : GoalState
            The current state of the problem-solving process
        """
        # Increment the iteration counter
        state.iteration += 1

        # Create a prompt for the LLM
        prompt = f"""
Given the user request:
{state.goal}

Use the tools at your disposal to act on their request. You may wish to create a step-by-step plan for more complicated requests.

If you cannot provide an answer, say only "FAIL".
If you have the answer, say only "DONE".
"""

        # Generate a response using the LLM
        response = await self._generate(prompt)

        # Emit an event with the response
        self.emitter.emit(IterationCompletedEvent(state=state, response=response))

    async def _generate(self, prompt: str) -> str:
        """
        Generate a response using the ChatSession asynchronously.

        Parameters
        ----------
        prompt : str
            The prompt to send to the ChatSession

        Returns
        -------
        str
            The generated response
        """
        # Use asyncio.to_thread to run the synchronous send method in a separate thread
        # without blocking the event loop
        return await asyncio.to_thread(self.chat.send, prompt)

__init__(llm, available_tools=None, max_iterations=5, system_prompt=None)

Initialize the SimpleRecursiveAgent.

Parameters:

Name Type Description Default
llm LLMBroker

The language model broker to use for generating responses

required
max_iterations int

The maximum number of iterations to perform

5
available_tools Optional[List[LLMTool]]

List of tools that can be used to solve the problem

None
Source code in src/mojentic/agents/simple_recursive_agent.py
def __init__(self, llm: LLMBroker, available_tools: Optional[List[LLMTool]] = None, max_iterations: int = 5, system_prompt: Optional[str] = None):
    """
    Initialize the SimpleRecursiveAgent.

    Parameters
    ----------
    llm : LLMBroker
        The language model broker to use for generating responses
    max_iterations : int
        The maximum number of iterations to perform
    available_tools : Optional[List[LLMTool]]
        List of tools that can be used to solve the problem
    """
    self.max_iterations = max_iterations
    self.llm = llm
    self.available_tools = available_tools or []
    self.emitter = EventEmitter()

    # Initialize the chat session
    self.chat = ChatSession(
        llm=llm,
        system_prompt=system_prompt or "You are a problem-solving assistant that can solve complex problems step by step. "
                     "You analyze problems, break them down into smaller parts, and solve them systematically. "
                     "If you cannot solve a problem completely in one step, you make progress and identify what to do next.",
        tools=self.available_tools
    )

    # Set up event handlers
    self.emitter.subscribe(GoalSubmittedEvent, self._handle_problem_submitted)
    self.emitter.subscribe(IterationCompletedEvent, self._handle_iteration_completed)

solve(problem) async

Solve a problem asynchronously.

Parameters:

Name Type Description Default
problem str

The problem to solve

required

Returns:

Type Description
str

The solution to the problem

Source code in src/mojentic/agents/simple_recursive_agent.py
async def solve(self, problem: str) -> str:
    """
    Solve a problem asynchronously.

    Parameters
    ----------
    problem : str
        The problem to solve

    Returns
    -------
    str
        The solution to the problem
    """
    # Create a future to signal when the solution is ready
    solution_future = asyncio.Future()

    # Create the initial problem state
    state = GoalState(goal=problem, max_iterations=self.max_iterations)

    # Define handlers for completion events
    async def handle_solution_event(event):
        if not solution_future.done():
            solution_future.set_result(event.state.solution)

    # Subscribe to completion events
    self.emitter.subscribe(GoalAchievedEvent, handle_solution_event)
    self.emitter.subscribe(GoalFailedEvent, handle_solution_event)
    self.emitter.subscribe(TimeoutEvent, handle_solution_event)

    # Start the solving process
    self.emitter.emit(GoalSubmittedEvent(state=state))

    # Wait for the solution or timeout
    try:
        return await asyncio.wait_for(solution_future, timeout=300)  # 5 minutes timeout
    except asyncio.TimeoutError:
        timeout_message = f"Timeout: Could not solve the problem within 300 seconds."
        if not solution_future.done():
            state.solution = timeout_message
            state.is_complete = True
            self.emitter.emit(TimeoutEvent(state=state))
        return timeout_message

Event Driven Agents

Event-driven agents provide asynchronous, event-based architectures for building complex agent systems that can process multiple events concurrently.

  • BaseAsyncAgent: The foundation for all asynchronous agents, providing the core event processing interface.

  • BaseAsyncLLMAgent: An asynchronous agent that integrates LLM capabilities for generating responses asynchronously.

  • AsyncAggregatorAgent: A specialized agent that collects and aggregates multiple events by correlation ID before processing them together.

  • AsyncDispatcher: The core dispatcher that manages asynchronous execution of tasks and coordinates event routing between agents.

API Reference

mojentic.agents.base_async_agent.BaseAsyncAgent

BaseAsyncAgent class is the base class for all asynchronous agents. It provides an async receive method for event processing.

Source code in src/mojentic/agents/base_async_agent.py
class BaseAsyncAgent:
    """
    BaseAsyncAgent class is the base class for all asynchronous agents.
    It provides an async receive method for event processing.
    """

    async def receive_event_async(self, event: Event) -> List[Event]:
        """
        receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of
        events as output.

        In this way, you can perform work based on the event, and generate whatever subsequent events may need to be
        processed next.

        This keeps the agent decoupled from the specifics of the event routing and processing.

        Events are subclasses of the Event class.

        :param event: The event to process
        :return: A list of events to be processed next
        """
        return []

receive_event_async(event) async

receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of events as output.

In this way, you can perform work based on the event, and generate whatever subsequent events may need to be processed next.

This keeps the agent decoupled from the specifics of the event routing and processing.

Events are subclasses of the Event class.

:param event: The event to process :return: A list of events to be processed next

Source code in src/mojentic/agents/base_async_agent.py
async def receive_event_async(self, event: Event) -> List[Event]:
    """
    receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of
    events as output.

    In this way, you can perform work based on the event, and generate whatever subsequent events may need to be
    processed next.

    This keeps the agent decoupled from the specifics of the event routing and processing.

    Events are subclasses of the Event class.

    :param event: The event to process
    :return: A list of events to be processed next
    """
    return []

mojentic.agents.async_llm_agent.BaseAsyncLLMAgent

Bases: BaseAsyncAgent

BaseAsyncLLMAgent is an asynchronous version of the BaseLLMAgent. It uses an LLM to generate responses asynchronously.

Source code in src/mojentic/agents/async_llm_agent.py
class BaseAsyncLLMAgent(BaseAsyncAgent):
    """
    BaseAsyncLLMAgent is an asynchronous version of the BaseLLMAgent.
    It uses an LLM to generate responses asynchronously.
    """
    llm: LLMBroker
    behaviour: Annotated[str, "The personality and behavioural traits of the agent."]

    def __init__(self, llm: LLMBroker, behaviour: str = "You are a helpful assistant.",
                 tools: Optional[List[LLMTool]] = None, response_model: Optional[Type[BaseModel]] = None):
        """
        Initialize the BaseAsyncLLMAgent.

        Parameters
        ----------
        llm : LLMBroker
            The LLM broker to use for generating responses
        behaviour : str, optional
            The personality and behavioural traits of the agent
        tools : List[LLMTool], optional
            The tools available to the agent
        response_model : Type[BaseModel], optional
            The model to use for responses
        """
        super().__init__()
        self.llm = llm
        self.behaviour = behaviour
        self.response_model = response_model
        self.tools = tools or []

    def _create_initial_messages(self):
        """
        Create the initial messages for the LLM.

        Returns
        -------
        list
            The initial messages for the LLM
        """
        return [
            LLMMessage(role=MessageRole.System, content=self.behaviour),
        ]

    def add_tool(self, tool):
        """
        Add a tool to the agent.

        Parameters
        ----------
        tool : LLMTool
            The tool to add
        """
        self.tools.append(tool)

    async def generate_response(self, content):
        """
        Generate a response using the LLM asynchronously.

        Parameters
        ----------
        content : str
            The content to generate a response for

        Returns
        -------
        str or BaseModel
            The generated response
        """
        messages = self._create_initial_messages()
        messages.append(LLMMessage(content=content))

        if self.response_model is not None:
            # Use asyncio.to_thread to run the synchronous generate_object method in a separate thread
            import asyncio
            response = await asyncio.to_thread(self.llm.generate_object, messages, object_model=self.response_model)
        else:
            # Use asyncio.to_thread to run the synchronous generate method in a separate thread
            import asyncio
            response = await asyncio.to_thread(self.llm.generate, messages, tools=self.tools)

        return response

    async def receive_event_async(self, event: Event) -> List[Event]:
        """
        Receive an event and process it asynchronously.
        This method should be overridden by subclasses.

        Parameters
        ----------
        event : Event
            The event to process

        Returns
        -------
        List[Event]
            The events to be processed next
        """
        return []

__init__(llm, behaviour='You are a helpful assistant.', tools=None, response_model=None)

Initialize the BaseAsyncLLMAgent.

Parameters:

Name Type Description Default
llm LLMBroker

The LLM broker to use for generating responses

required
behaviour str

The personality and behavioural traits of the agent

'You are a helpful assistant.'
tools List[LLMTool]

The tools available to the agent

None
response_model Type[BaseModel]

The model to use for responses

None
Source code in src/mojentic/agents/async_llm_agent.py
def __init__(self, llm: LLMBroker, behaviour: str = "You are a helpful assistant.",
             tools: Optional[List[LLMTool]] = None, response_model: Optional[Type[BaseModel]] = None):
    """
    Initialize the BaseAsyncLLMAgent.

    Parameters
    ----------
    llm : LLMBroker
        The LLM broker to use for generating responses
    behaviour : str, optional
        The personality and behavioural traits of the agent
    tools : List[LLMTool], optional
        The tools available to the agent
    response_model : Type[BaseModel], optional
        The model to use for responses
    """
    super().__init__()
    self.llm = llm
    self.behaviour = behaviour
    self.response_model = response_model
    self.tools = tools or []

add_tool(tool)

Add a tool to the agent.

Parameters:

Name Type Description Default
tool LLMTool

The tool to add

required
Source code in src/mojentic/agents/async_llm_agent.py
def add_tool(self, tool):
    """
    Add a tool to the agent.

    Parameters
    ----------
    tool : LLMTool
        The tool to add
    """
    self.tools.append(tool)

generate_response(content) async

Generate a response using the LLM asynchronously.

Parameters:

Name Type Description Default
content str

The content to generate a response for

required

Returns:

Type Description
str or BaseModel

The generated response

Source code in src/mojentic/agents/async_llm_agent.py
async def generate_response(self, content):
    """
    Generate a response using the LLM asynchronously.

    Parameters
    ----------
    content : str
        The content to generate a response for

    Returns
    -------
    str or BaseModel
        The generated response
    """
    messages = self._create_initial_messages()
    messages.append(LLMMessage(content=content))

    if self.response_model is not None:
        # Use asyncio.to_thread to run the synchronous generate_object method in a separate thread
        import asyncio
        response = await asyncio.to_thread(self.llm.generate_object, messages, object_model=self.response_model)
    else:
        # Use asyncio.to_thread to run the synchronous generate method in a separate thread
        import asyncio
        response = await asyncio.to_thread(self.llm.generate, messages, tools=self.tools)

    return response

receive_event_async(event) async

Receive an event and process it asynchronously. This method should be overridden by subclasses.

Parameters:

Name Type Description Default
event Event

The event to process

required

Returns:

Type Description
List[Event]

The events to be processed next

Source code in src/mojentic/agents/async_llm_agent.py
async def receive_event_async(self, event: Event) -> List[Event]:
    """
    Receive an event and process it asynchronously.
    This method should be overridden by subclasses.

    Parameters
    ----------
    event : Event
        The event to process

    Returns
    -------
    List[Event]
        The events to be processed next
    """
    return []

mojentic.agents.async_aggregator_agent.AsyncAggregatorAgent

Bases: BaseAsyncAgent

AsyncAggregatorAgent is an asynchronous version of the BaseAggregatingAgent. It aggregates events based on their correlation_id and processes them when all required events are available.

Source code in src/mojentic/agents/async_aggregator_agent.py
class AsyncAggregatorAgent(BaseAsyncAgent):
    """
    AsyncAggregatorAgent is an asynchronous version of the BaseAggregatingAgent.
    It aggregates events based on their correlation_id and processes them when all required events are available.
    """
    def __init__(self, event_types_needed=None):
        """
        Initialize the AsyncAggregatorAgent.

        Parameters
        ----------
        event_types_needed : list, optional
            List of event types that need to be captured before processing
        """
        super().__init__()
        self.results = {}
        self.event_types_needed = event_types_needed or []
        self.futures = {}  # Maps correlation_id to Future objects

    async def _get_and_reset_results(self, event):
        """
        Get and reset the results for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to get results for

        Returns
        -------
        list
            The results for the event
        """
        results = self.results[event.correlation_id]
        self.results[event.correlation_id] = None
        return results

    async def _capture_results_if_needed(self, event):
        """
        Capture results for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to capture results for
        """
        results = self.results.get(event.correlation_id, [])
        results.append(event)
        self.results[event.correlation_id] = results

        # Check if we have all needed events and set the future if we do
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

        if finished and event.correlation_id in self.futures:
            future = self.futures[event.correlation_id]
            if not future.done():
                future.set_result(self.results[event.correlation_id])

    async def _has_all_needed(self, event):
        """
        Check if all needed event types have been captured for a specific correlation_id.

        Parameters
        ----------
        event : Event
            The event to check

        Returns
        -------
        bool
            True if all needed event types have been captured, False otherwise
        """
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])
        logger.debug(f"Captured: {event_types_captured}, Needed: {self.event_types_needed}, Finished: {finished}")
        return finished

    async def wait_for_events(self, correlation_id, timeout=None):
        """
        Wait for all needed events for a specific correlation_id.

        Parameters
        ----------
        correlation_id : str
            The correlation_id to wait for
        timeout : float, optional
            The timeout in seconds

        Returns
        -------
        list
            The events for the correlation_id
        """
        if correlation_id not in self.futures:
            self.futures[correlation_id] = asyncio.Future()

        # If we already have all needed events, return them
        if correlation_id in self.results:
            event_types_captured = [type(e) for e in self.results.get(correlation_id, [])]
            if all([event_type in event_types_captured for event_type in self.event_types_needed]):
                return self.results[correlation_id]

        # Otherwise, wait for the future to be set
        try:
            return await asyncio.wait_for(self.futures[correlation_id], timeout)
        except asyncio.TimeoutError:
            logger.warning(f"Timeout waiting for events for correlation_id {correlation_id}")
            return self.results.get(correlation_id, [])

    async def receive_event_async(self, event: Event) -> list:
        """
        Receive an event and process it if all needed events are available.

        Parameters
        ----------
        event : Event
            The event to process

        Returns
        -------
        list
            The events to be processed next
        """
        # First capture the event
        await self._capture_results_if_needed(event)

        # Then check if we have all needed events
        event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
        finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

        # If we have all needed events, process them
        if finished:
            return await self.process_events(await self._get_and_reset_results(event))

        return []

    async def process_events(self, events):
        """
        Process a list of events.
        This method should be overridden by subclasses.

        Parameters
        ----------
        events : list
            The events to process

        Returns
        -------
        list
            The events to be processed next
        """
        return []

__init__(event_types_needed=None)

Initialize the AsyncAggregatorAgent.

Parameters:

Name Type Description Default
event_types_needed list

List of event types that need to be captured before processing

None
Source code in src/mojentic/agents/async_aggregator_agent.py
def __init__(self, event_types_needed=None):
    """
    Initialize the AsyncAggregatorAgent.

    Parameters
    ----------
    event_types_needed : list, optional
        List of event types that need to be captured before processing
    """
    super().__init__()
    self.results = {}
    self.event_types_needed = event_types_needed or []
    self.futures = {}  # Maps correlation_id to Future objects

process_events(events) async

Process a list of events. This method should be overridden by subclasses.

Parameters:

Name Type Description Default
events list

The events to process

required

Returns:

Type Description
list

The events to be processed next

Source code in src/mojentic/agents/async_aggregator_agent.py
async def process_events(self, events):
    """
    Process a list of events.
    This method should be overridden by subclasses.

    Parameters
    ----------
    events : list
        The events to process

    Returns
    -------
    list
        The events to be processed next
    """
    return []

receive_event_async(event) async

Receive an event and process it if all needed events are available.

Parameters:

Name Type Description Default
event Event

The event to process

required

Returns:

Type Description
list

The events to be processed next

Source code in src/mojentic/agents/async_aggregator_agent.py
async def receive_event_async(self, event: Event) -> list:
    """
    Receive an event and process it if all needed events are available.

    Parameters
    ----------
    event : Event
        The event to process

    Returns
    -------
    list
        The events to be processed next
    """
    # First capture the event
    await self._capture_results_if_needed(event)

    # Then check if we have all needed events
    event_types_captured = [type(e) for e in self.results.get(event.correlation_id, [])]
    finished = all([event_type in event_types_captured for event_type in self.event_types_needed])

    # If we have all needed events, process them
    if finished:
        return await self.process_events(await self._get_and_reset_results(event))

    return []

wait_for_events(correlation_id, timeout=None) async

Wait for all needed events for a specific correlation_id.

Parameters:

Name Type Description Default
correlation_id str

The correlation_id to wait for

required
timeout float

The timeout in seconds

None

Returns:

Type Description
list

The events for the correlation_id

Source code in src/mojentic/agents/async_aggregator_agent.py
async def wait_for_events(self, correlation_id, timeout=None):
    """
    Wait for all needed events for a specific correlation_id.

    Parameters
    ----------
    correlation_id : str
        The correlation_id to wait for
    timeout : float, optional
        The timeout in seconds

    Returns
    -------
    list
        The events for the correlation_id
    """
    if correlation_id not in self.futures:
        self.futures[correlation_id] = asyncio.Future()

    # If we already have all needed events, return them
    if correlation_id in self.results:
        event_types_captured = [type(e) for e in self.results.get(correlation_id, [])]
        if all([event_type in event_types_captured for event_type in self.event_types_needed]):
            return self.results[correlation_id]

    # Otherwise, wait for the future to be set
    try:
        return await asyncio.wait_for(self.futures[correlation_id], timeout)
    except asyncio.TimeoutError:
        logger.warning(f"Timeout waiting for events for correlation_id {correlation_id}")
        return self.results.get(correlation_id, [])

mojentic.async_dispatcher.AsyncDispatcher

AsyncDispatcher class is an asynchronous version of the Dispatcher class. It uses asyncio and deque for event processing.

Source code in src/mojentic/async_dispatcher.py
class AsyncDispatcher:
    """
    AsyncDispatcher class is an asynchronous version of the Dispatcher class.
    It uses asyncio and deque for event processing.
    """
    def __init__(self, router, shared_working_memory=None, batch_size=5, tracer=None):
        """
        Initialize the AsyncDispatcher.

        Parameters
        ----------
        router : Router
            The router to use for routing events to agents
        shared_working_memory : SharedWorkingMemory, optional
            The shared working memory to use
        batch_size : int, optional
            The number of events to process in each batch
        tracer : Tracer, optional
            The tracer to use for tracing events
        """
        self.router = router
        self.batch_size = batch_size
        self.event_queue = deque()
        self._stop_event = asyncio.Event()
        self._task = None

        # Use null_tracer if no tracer is provided
        from mojentic.tracer import null_tracer
        self.tracer = tracer or null_tracer

    async def start(self):
        """
        Start the event dispatch task.
        """
        logger.debug("Starting event dispatch task")
        self._task = asyncio.create_task(self._dispatch_events())
        return self

    async def stop(self):
        """
        Stop the event dispatch task.
        """
        self._stop_event.set()
        if self._task:
            await self._task

    async def wait_for_empty_queue(self, timeout=None):
        """
        Wait for the event queue to be empty.

        Parameters
        ----------
        timeout : float, optional
            The timeout in seconds

        Returns
        -------
        bool
            True if the queue is empty, False if the timeout was reached
        """
        start_time = asyncio.get_event_loop().time()
        while len(self.event_queue) > 0:
            if timeout is not None and asyncio.get_event_loop().time() - start_time > timeout:
                return False
            await asyncio.sleep(0.1)
        return True

    def dispatch(self, event):
        """
        Dispatch an event to the event queue.

        Parameters
        ----------
        event : Event
            The event to dispatch
        """
        logger.log(logging.DEBUG, f"Dispatching event: {event}")
        if event.correlation_id is None:
            event.correlation_id = str(uuid4())
        self.event_queue.append(event)

    async def _dispatch_events(self):
        """
        Dispatch events from the event queue to agents.
        """
        while not self._stop_event.is_set():
            for _ in range(self.batch_size):
                logger.debug("Checking for events")
                if len(self.event_queue) > 0:
                    logger.debug(f"{len(self.event_queue)} events in queue")
                    event = self.event_queue.popleft()
                    logger.debug(f"Processing event: {event}")
                    agents = self.router.get_agents(event)
                    logger.debug(f"Found {len(agents)} agents for event type {type(event)}")
                    events = []
                    for agent in agents:
                        logger.debug(f"Sending event to agent {agent}")

                        # Record agent interaction in tracer system
                        self.tracer.record_agent_interaction(
                            from_agent=str(event.source),
                            to_agent=str(type(agent)),
                            event_type=str(type(event).__name__),
                            event_id=event.correlation_id,
                            source=type(self)
                        )

                        # Process the event through the agent
                        # If the agent is an async agent, await its receive_event method
                        if hasattr(agent, 'receive_event_async'):
                            received_events = await agent.receive_event_async(event)
                        else:
                            received_events = agent.receive_event(event)

                        logger.debug(f"Agent {agent} returned {len(received_events)} events")
                        events.extend(received_events)
                    for fe in events:
                        if type(fe) is TerminateEvent:
                            self._stop_event.set()
                        self.dispatch(fe)
            await asyncio.sleep(0.1)  # Use asyncio.sleep instead of time.sleep

__init__(router, shared_working_memory=None, batch_size=5, tracer=None)

Initialize the AsyncDispatcher.

Parameters:

Name Type Description Default
router Router

The router to use for routing events to agents

required
shared_working_memory SharedWorkingMemory

The shared working memory to use

None
batch_size int

The number of events to process in each batch

5
tracer Tracer

The tracer to use for tracing events

None
Source code in src/mojentic/async_dispatcher.py
def __init__(self, router, shared_working_memory=None, batch_size=5, tracer=None):
    """
    Initialize the AsyncDispatcher.

    Parameters
    ----------
    router : Router
        The router to use for routing events to agents
    shared_working_memory : SharedWorkingMemory, optional
        The shared working memory to use
    batch_size : int, optional
        The number of events to process in each batch
    tracer : Tracer, optional
        The tracer to use for tracing events
    """
    self.router = router
    self.batch_size = batch_size
    self.event_queue = deque()
    self._stop_event = asyncio.Event()
    self._task = None

    # Use null_tracer if no tracer is provided
    from mojentic.tracer import null_tracer
    self.tracer = tracer or null_tracer

dispatch(event)

Dispatch an event to the event queue.

Parameters:

Name Type Description Default
event Event

The event to dispatch

required
Source code in src/mojentic/async_dispatcher.py
def dispatch(self, event):
    """
    Dispatch an event to the event queue.

    Parameters
    ----------
    event : Event
        The event to dispatch
    """
    logger.log(logging.DEBUG, f"Dispatching event: {event}")
    if event.correlation_id is None:
        event.correlation_id = str(uuid4())
    self.event_queue.append(event)

start() async

Start the event dispatch task.

Source code in src/mojentic/async_dispatcher.py
async def start(self):
    """
    Start the event dispatch task.
    """
    logger.debug("Starting event dispatch task")
    self._task = asyncio.create_task(self._dispatch_events())
    return self

stop() async

Stop the event dispatch task.

Source code in src/mojentic/async_dispatcher.py
async def stop(self):
    """
    Stop the event dispatch task.
    """
    self._stop_event.set()
    if self._task:
        await self._task

wait_for_empty_queue(timeout=None) async

Wait for the event queue to be empty.

Parameters:

Name Type Description Default
timeout float

The timeout in seconds

None

Returns:

Type Description
bool

True if the queue is empty, False if the timeout was reached

Source code in src/mojentic/async_dispatcher.py
async def wait_for_empty_queue(self, timeout=None):
    """
    Wait for the event queue to be empty.

    Parameters
    ----------
    timeout : float, optional
        The timeout in seconds

    Returns
    -------
    bool
        True if the queue is empty, False if the timeout was reached
    """
    start_time = asyncio.get_event_loop().time()
    while len(self.event_queue) > 0:
        if timeout is not None and asyncio.get_event_loop().time() - start_time > timeout:
            return False
        await asyncio.sleep(0.1)
    return True