Asynchronous Capabilities in Mojentic
Mojentic provides robust asynchronous capabilities through its AsyncDispatcher
and asynchronous agent classes. These features enable you to build complex, high-performance agent systems that can process multiple events concurrently.
Small Units of Computation
Agents are the smallest unit of computation in Mojentic. They are responsible for processing events and emitting new events. Agents can be combined to form complex systems.
Agents have a single public method that receive an event and return a list of events. This method is called
receive_event
.
Each Agent has its own state, and you can use its constructor to configure it, and provide it simple mechanisms like shared memory or access to knowledge or data, that it can use in its computations.
Within that one method, is where the Agent does it's work, alone.
This will require you to break down the work you want to do into these small units of computation. Incoming events are like commands, think tell-don't-ask in the extreme.
Event-Driven Architecture
The event-driven nature of is also very much in flux. Early thread-based agents lead to more complex synchronization than I wanted, so I'm using different agent formations to explore eventing in a more practical way.
The key purpose of events will be to support traceability and debugging of agentic flows.
Mojentic provides both synchronous and asynchronous event processing capabilities. The AsyncDispatcher enables concurrent event processing, allowing multiple agents to work simultaneously for improved performance and responsiveness. This is particularly valuable for complex workflows involving multiple LLM calls or other high-latency operations.
Events are the only way agents communicate with each other. Events are simple data classes that are passed around the system. Events are immutable.
Here's a high-level overview of how events flow through the system:
flowchart LR
A1[Agent 1] -->|Event 1| A2[Agent 2]
A2 -->|Event 2| A3[Agent 3]
A2 -->|Event 3| A4[Agent 4]
A3 -->|Event 4| A1
subgraph Shared Memory
M[(Working Memory)]
end
A1 -.-> M
A2 -.-> M
A3 -.-> M
A4 -.-> M
The diagram above shows how agents communicate through events (solid lines) and can access shared working memory (dotted lines) when needed.
Introduction to Asynchronous Processing
Asynchronous processing is particularly valuable in agent-based systems where:
- Multiple agents need to process events concurrently
- Some operations (like LLM calls) have significant latency
- You need to maintain responsiveness while handling complex workflows
- You want to maximize throughput in multi-agent systems
Mojentic's asynchronous capabilities are built on Python's asyncio
library and provide a clean, event-driven architecture for building sophisticated agent systems.
Key Components
AsyncDispatcher
The AsyncDispatcher
is the core component for asynchronous event processing in Mojentic. It manages an event queue and dispatches events to appropriate agents based on routing rules.
from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.router import Router
# Create a router and register agents
router = Router()
# ... register agents with router ...
# Create and start the dispatcher
dispatcher = await AsyncDispatcher(router).start()
# Dispatch events
dispatcher.dispatch(my_event)
# Wait for all events to be processed
await dispatcher.wait_for_empty_queue()
# Stop the dispatcher when done
await dispatcher.stop()
Key features of the AsyncDispatcher
:
- Event Queue Management: Efficiently manages a queue of events to be processed
- Batch Processing: Processes events in configurable batch sizes for optimal performance
- Asynchronous Dispatch: Uses
asyncio
for non-blocking event processing - Graceful Shutdown: Provides methods to stop processing and wait for the queue to empty
- Tracing Support: Integrates with Mojentic's tracing system for observability
Asynchronous Agents
Mojentic provides base classes for creating asynchronous agents:
BaseAsyncAgent
The BaseAsyncAgent
class is the foundation for all asynchronous agents in Mojentic. It defines the receive_event_async
method that asynchronous agents must implement.
from mojentic.agents.base_async_agent import BaseAsyncAgent
from mojentic.event import Event
from typing import List
class MyAsyncAgent(BaseAsyncAgent):
async def receive_event_async(self, event: Event) -> List[Event]:
# Process the event asynchronously
# ...
return [new_event] # Return any new events to be dispatched
BaseAsyncLLMAgent
The BaseAsyncLLMAgent
extends BaseAsyncAgent
with LLM capabilities, allowing you to create agents that use LLMs asynchronously.
from mojentic.agents.async_llm_agent import BaseAsyncLLMAgent
from mojentic.event import Event
from typing import List
class MyAsyncLLMAgent(BaseAsyncLLMAgent):
def __init__(self, llm):
super().__init__(
llm=llm,
behaviour="You are a helpful assistant.",
response_model=MyResponseModel
)
async def receive_event_async(self, event: Event) -> List[Event]:
# Use the LLM asynchronously
response = await self.generate_response("Your prompt here")
return [new_event] # Return any new events to be dispatched
AsyncAggregatorAgent
The AsyncAggregatorAgent
is a specialized asynchronous agent that collects multiple events before processing them together. This is useful for tasks that require combining information from multiple sources.
from mojentic.agents.async_aggregator_agent import AsyncAggregatorAgent
from mojentic.event import Event
from typing import List
class MyAggregatorAgent(AsyncAggregatorAgent):
def __init__(self):
super().__init__(event_types_needed=[EventType1, EventType2, EventType3])
async def process_events(self, events):
# Process the collected events
# ...
return [new_event] # Return any new events to be dispatched
Example: Asynchronous Text Analysis System
The following example demonstrates how to build an asynchronous text analysis system using Mojentic's asynchronous capabilities. This system analyzes and summarizes text concurrently, then combines the results.
import asyncio
from typing import List
from pydantic import BaseModel, Field
from mojentic.agents.async_aggregator_agent import AsyncAggregatorAgent
from mojentic.agents.async_llm_agent import BaseAsyncLLMAgent
from mojentic.agents.base_async_agent import BaseAsyncAgent
from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.event import Event, TerminateEvent
from mojentic.llm import LLMBroker
from mojentic.router import Router
# Define events
class TextEvent(Event):
text: str
class AnalysisEvent(Event):
analysis: str
class SummaryEvent(Event):
summary: str
class CombinedResultEvent(Event):
text: str
analysis: str
summary: str
combined: str
# Define response models
class AnalysisResponse(BaseModel):
analysis: str
class SummaryResponse(BaseModel):
summary: str
class CombinedResponse(BaseModel):
combined: str
# Define agents
class TextAnalyzerAgent(BaseAsyncLLMAgent):
def __init__(self, llm: LLMBroker):
super().__init__(
llm=llm,
behaviour="You are a text analysis assistant.",
response_model=AnalysisResponse
)
async def receive_event_async(self, event: Event) -> List[Event]:
if isinstance(event, TextEvent):
prompt = f"Analyze this text: {event.text}"
response = await self.generate_response(prompt)
return [AnalysisEvent(
source=type(self),
correlation_id=event.correlation_id,
analysis=response.analysis
)]
return []
class TextSummarizerAgent(BaseAsyncLLMAgent):
def __init__(self, llm: LLMBroker):
super().__init__(
llm=llm,
behaviour="You are a text summarization assistant.",
response_model=SummaryResponse
)
async def receive_event_async(self, event: Event) -> List[Event]:
if isinstance(event, TextEvent):
prompt = f"Summarize this text: {event.text}"
response = await self.generate_response(prompt)
return [SummaryEvent(
source=type(self),
correlation_id=event.correlation_id,
summary=response.summary
)]
return []
class ResultCombinerAgent(AsyncAggregatorAgent):
def __init__(self, llm: LLMBroker):
super().__init__(event_types_needed=[TextEvent, AnalysisEvent, SummaryEvent])
self.llm = llm
self.response_model = CombinedResponse
async def process_events(self, events):
text_event = next((e for e in events if isinstance(e, TextEvent)), None)
analysis_event = next((e for e in events if isinstance(e, AnalysisEvent)), None)
summary_event = next((e for e in events if isinstance(e, SummaryEvent)), None)
if text_event and analysis_event and summary_event:
# Combine the results
combined = f"Original text: {text_event.text}\n\nAnalysis: {analysis_event.analysis}\n\nSummary: {summary_event.summary}"
return [CombinedResultEvent(
source=type(self),
correlation_id=text_event.correlation_id,
text=text_event.text,
analysis=analysis_event.analysis,
summary=summary_event.summary,
combined=combined
)]
return []
class ResultOutputAgent(BaseAsyncAgent):
async def receive_event_async(self, event: Event) -> List[Event]:
if isinstance(event, CombinedResultEvent):
print("\n=== FINAL RESULT ===")
print(event.combined)
print("===================\n")
return [TerminateEvent(source=type(self), correlation_id=event.correlation_id)]
return []
# Main function
async def main():
# Initialize the LLM broker
llm = LLMBroker(model="your-model-name")
# Create a router and register agents
router = Router()
# Create agents
analyzer = TextAnalyzerAgent(llm)
summarizer = TextSummarizerAgent(llm)
combiner = ResultCombinerAgent(llm)
output_agent = ResultOutputAgent()
# Register agents with the router
router.add_route(TextEvent, analyzer)
router.add_route(TextEvent, summarizer)
router.add_route(TextEvent, combiner)
router.add_route(AnalysisEvent, combiner)
router.add_route(SummaryEvent, combiner)
router.add_route(CombinedResultEvent, output_agent)
# Create and start the dispatcher
dispatcher = await AsyncDispatcher(router).start()
# Create a text event
text = "Your text to analyze and summarize"
event = TextEvent(source="ExampleSource", text=text)
# Dispatch the event
dispatcher.dispatch(event)
# Wait for all events to be processed
await dispatcher.wait_for_empty_queue()
# Stop the dispatcher
await dispatcher.stop()
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage: Customizing AsyncDispatcher
You can customize the AsyncDispatcher
to fit your specific needs:
from mojentic.async_dispatcher import AsyncDispatcher
from mojentic.router import Router
from mojentic.tracer import Tracer
# Create a router
router = Router()
# Create a tracer for observability
tracer = Tracer()
# Create a customized AsyncDispatcher
dispatcher = AsyncDispatcher(
router=router,
shared_working_memory=my_shared_memory, # Optional shared memory
batch_size=10, # Process 10 events per batch
tracer=tracer # Use custom tracer
)
# Start the dispatcher
await dispatcher.start()
API Reference
AsyncDispatcher
mojentic.async_dispatcher.AsyncDispatcher
AsyncDispatcher class is an asynchronous version of the Dispatcher class. It uses asyncio and deque for event processing.
Source code in src/mojentic/async_dispatcher.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
|
__init__(router, shared_working_memory=None, batch_size=5, tracer=None)
Initialize the AsyncDispatcher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
router
|
Router
|
The router to use for routing events to agents |
required |
shared_working_memory
|
SharedWorkingMemory
|
The shared working memory to use |
None
|
batch_size
|
int
|
The number of events to process in each batch |
5
|
tracer
|
Tracer
|
The tracer to use for tracing events |
None
|
Source code in src/mojentic/async_dispatcher.py
dispatch(event)
Dispatch an event to the event queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
|
Event
|
The event to dispatch |
required |
Source code in src/mojentic/async_dispatcher.py
start()
async
stop()
async
wait_for_empty_queue(timeout=None)
async
Wait for the event queue to be empty.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout
|
float
|
The timeout in seconds |
None
|
Returns:
Type | Description |
---|---|
bool
|
True if the queue is empty, False if the timeout was reached |
Source code in src/mojentic/async_dispatcher.py
BaseAsyncAgent
mojentic.agents.base_async_agent.BaseAsyncAgent
BaseAsyncAgent class is the base class for all asynchronous agents. It provides an async receive method for event processing.
Source code in src/mojentic/agents/base_async_agent.py
receive_event_async(event)
async
receive_event_async is the method that all async agents must implement. It takes an event as input and returns a list of events as output.
In this way, you can perform work based on the event, and generate whatever subsequent events may need to be processed next.
This keeps the agent decoupled from the specifics of the event routing and processing.
Events are subclasses of the Event class.
:param event: The event to process :return: A list of events to be processed next
Source code in src/mojentic/agents/base_async_agent.py
AsyncAggregatorAgent
mojentic.agents.async_aggregator_agent.AsyncAggregatorAgent
Bases: BaseAsyncAgent
AsyncAggregatorAgent is an asynchronous version of the BaseAggregatingAgent. It aggregates events based on their correlation_id and processes them when all required events are available.
Source code in src/mojentic/agents/async_aggregator_agent.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
|
__init__(event_types_needed=None)
Initialize the AsyncAggregatorAgent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_types_needed
|
list
|
List of event types that need to be captured before processing |
None
|
Source code in src/mojentic/agents/async_aggregator_agent.py
process_events(events)
async
receive_event_async(event)
async
Receive an event and process it if all needed events are available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
|
Event
|
The event to process |
required |
Returns:
Type | Description |
---|---|
list
|
The events to be processed next |
Source code in src/mojentic/agents/async_aggregator_agent.py
wait_for_events(correlation_id, timeout=None)
async
Wait for all needed events for a specific correlation_id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
correlation_id
|
str
|
The correlation_id to wait for |
required |
timeout
|
float
|
The timeout in seconds |
None
|
Returns:
Type | Description |
---|---|
list
|
The events for the correlation_id |
Source code in src/mojentic/agents/async_aggregator_agent.py
Best Practices
When working with Mojentic's asynchronous capabilities, keep these best practices in mind:
-
Use Correlation IDs: Always ensure events have correlation IDs to track related events through the system.
-
Handle Exceptions: Implement proper exception handling in your asynchronous agents to prevent crashes.
-
Monitor Queue Size: For production systems, monitor the event queue size to detect potential bottlenecks.
-
Batch Size Tuning: Adjust the batch size based on your workload characteristics for optimal performance.
-
Graceful Shutdown: Always use the
stop()
method to gracefully shut down the dispatcher. -
Tracing: Use the tracing system to monitor and debug your asynchronous agent system.
-
Timeout Management: Implement timeouts for
wait_for_empty_queue()
to prevent indefinite waiting. -
Event Design: Design your events to carry all necessary information without requiring additional lookups.
Conclusion
Mojentic's asynchronous capabilities provide a powerful foundation for building complex, high-performance agent systems. By leveraging the AsyncDispatcher
and asynchronous agent classes, you can create systems that efficiently process events concurrently, making the most of available resources and providing responsive, scalable solutions.