1 / 3

Agent Coordination Implementation

Implementation of agent coordination patterns with async messaging and task allocation.

python
import asyncio
import json
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Message:
    sender: str
    receiver: str
    msg_type: str
    content: Dict[str, Any]
    timestamp: datetime
    priority: int = 1

class Agent:
    def __init__(self, agent_id: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.message_queue = asyncio.Queue()
        self.task_queue = asyncio.Queue()
        self.coordinator = None
        
    async def send_message(self, receiver: str, msg_type: str, content: Dict):
        """Send message to another agent"""
        message = Message(
            sender=self.agent_id,
            receiver=receiver,
            msg_type=msg_type,
            content=content,
            timestamp=datetime.now()
        )
        if self.coordinator:
            await self.coordinator.route_message(message)
    
    async def receive_message(self, message: Message):
        """Process incoming message"""
        await self.message_queue.put(message)
    
    async def process_messages(self):
        """Main message processing loop"""
        while True:
            message = await self.message_queue.get()
            
            if message.msg_type == "task_assignment":
                await self.handle_task(message.content)
            elif message.msg_type == "capability_query":
                await self.respond_capabilities(message.sender)
            elif message.msg_type == "result":
                await self.handle_result(message.content)
                
    async def handle_task(self, task: Dict):
        """Process assigned task"""
        print(f"{self.agent_id} processing task: {task['description']}")
        # Simulate task processing
        await asyncio.sleep(2)
        result = {"status": "completed", "output": f"Task {task['id']} done"}
        await self.send_message(
            task['requester'], 
            "result", 
            result
        )

class Coordinator:
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_allocation_strategy = "capability_based"
        
    def register_agent(self, agent: Agent):
        """Register agent with coordinator"""
        self.agents[agent.agent_id] = agent
        agent.coordinator = self
        
    async def route_message(self, message: Message):
        """Route messages between agents"""
        if message.receiver in self.agents:
            await self.agents[message.receiver].receive_message(message)
        elif message.receiver == "coordinator":
            await self.handle_coordinator_message(message)
            
    async def allocate_task(self, task: Dict):
        """Allocate task to best suited agent"""
        # Find agents with required capabilities
        capable_agents = [
            agent for agent in self.agents.values()
            if task['required_capability'] in agent.capabilities
        ]
        
        if not capable_agents:
            print(f"No agent available for task: {task['description']}")
            return
            
        # Simple allocation - choose first available
        selected_agent = capable_agents[0]
        
        await selected_agent.send_message(
            selected_agent.agent_id,
            "task_assignment",
            {
                "id": task['id'],
                "description": task['description'],
                "requester": "coordinator"
            }
        )

# Usage Example
async def main():
    # Create coordinator
    coordinator = Coordinator()
    
    # Create specialized agents
    data_agent = Agent("data_processor", ["data_analysis", "etl"])
    ml_agent = Agent("ml_engine", ["prediction", "classification"])
    report_agent = Agent("report_generator", ["visualization", "reporting"])
    
    # Register agents
    for agent in [data_agent, ml_agent, report_agent]:
        coordinator.register_agent(agent)
        asyncio.create_task(agent.process_messages())
    
    # Simulate task allocation
    tasks = [
        {"id": "1", "description": "Analyze sales data", "required_capability": "data_analysis"},
        {"id": "2", "description": "Predict customer churn", "required_capability": "prediction"},
        {"id": "3", "description": "Generate monthly report", "required_capability": "reporting"}
    ]
    
    for task in tasks:
        await coordinator.allocate_task(task)
    
    # Let agents process
    await asyncio.sleep(5)

# Run the system
# asyncio.run(main())

Explanation:

This implementation demonstrates a foundational multi-agent system with: **Key Components:** • Message class for structured communication • Agent base class with message handling • Coordinator for routing and task allocation • Asynchronous processing for concurrent operations **Business Applications:** • Customer service routing to specialized agents • Data pipeline with ETL, analysis, and reporting agents • Order processing with inventory, payment, and shipping agents **Scalability Features:** • Async design handles thousands of concurrent messages • Easy to add new agent types • Flexible task allocation strategies

Section Progress