Agent Coordination Implementation
Implementation of agent coordination patterns with async messaging and task allocation.
python
import asyncio
import json
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Message:
sender: str
receiver: str
msg_type: str
content: Dict[str, Any]
timestamp: datetime
priority: int = 1
class Agent:
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.message_queue = asyncio.Queue()
self.task_queue = asyncio.Queue()
self.coordinator = None
async def send_message(self, receiver: str, msg_type: str, content: Dict):
"""Send message to another agent"""
message = Message(
sender=self.agent_id,
receiver=receiver,
msg_type=msg_type,
content=content,
timestamp=datetime.now()
)
if self.coordinator:
await self.coordinator.route_message(message)
async def receive_message(self, message: Message):
"""Process incoming message"""
await self.message_queue.put(message)
async def process_messages(self):
"""Main message processing loop"""
while True:
message = await self.message_queue.get()
if message.msg_type == "task_assignment":
await self.handle_task(message.content)
elif message.msg_type == "capability_query":
await self.respond_capabilities(message.sender)
elif message.msg_type == "result":
await self.handle_result(message.content)
async def handle_task(self, task: Dict):
"""Process assigned task"""
print(f"{self.agent_id} processing task: {task['description']}")
# Simulate task processing
await asyncio.sleep(2)
result = {"status": "completed", "output": f"Task {task['id']} done"}
await self.send_message(
task['requester'],
"result",
result
)
class Coordinator:
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_allocation_strategy = "capability_based"
def register_agent(self, agent: Agent):
"""Register agent with coordinator"""
self.agents[agent.agent_id] = agent
agent.coordinator = self
async def route_message(self, message: Message):
"""Route messages between agents"""
if message.receiver in self.agents:
await self.agents[message.receiver].receive_message(message)
elif message.receiver == "coordinator":
await self.handle_coordinator_message(message)
async def allocate_task(self, task: Dict):
"""Allocate task to best suited agent"""
# Find agents with required capabilities
capable_agents = [
agent for agent in self.agents.values()
if task['required_capability'] in agent.capabilities
]
if not capable_agents:
print(f"No agent available for task: {task['description']}")
return
# Simple allocation - choose first available
selected_agent = capable_agents[0]
await selected_agent.send_message(
selected_agent.agent_id,
"task_assignment",
{
"id": task['id'],
"description": task['description'],
"requester": "coordinator"
}
)
# Usage Example
async def main():
# Create coordinator
coordinator = Coordinator()
# Create specialized agents
data_agent = Agent("data_processor", ["data_analysis", "etl"])
ml_agent = Agent("ml_engine", ["prediction", "classification"])
report_agent = Agent("report_generator", ["visualization", "reporting"])
# Register agents
for agent in [data_agent, ml_agent, report_agent]:
coordinator.register_agent(agent)
asyncio.create_task(agent.process_messages())
# Simulate task allocation
tasks = [
{"id": "1", "description": "Analyze sales data", "required_capability": "data_analysis"},
{"id": "2", "description": "Predict customer churn", "required_capability": "prediction"},
{"id": "3", "description": "Generate monthly report", "required_capability": "reporting"}
]
for task in tasks:
await coordinator.allocate_task(task)
# Let agents process
await asyncio.sleep(5)
# Run the system
# asyncio.run(main())Explanation:
This implementation demonstrates a foundational multi-agent system with: **Key Components:** • Message class for structured communication • Agent base class with message handling • Coordinator for routing and task allocation • Asynchronous processing for concurrent operations **Business Applications:** • Customer service routing to specialized agents • Data pipeline with ETL, analysis, and reporting agents • Order processing with inventory, payment, and shipping agents **Scalability Features:** • Async design handles thousands of concurrent messages • Easy to add new agent types • Flexible task allocation strategies