Spaces:
Runtime error
Runtime error
| from typing import Dict, List | |
| class Agent: | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.task_log = [] | |
| def receive_task(self, task: str) -> str: | |
| self.task_log.append(task) | |
| return f"Agent {self.name} received task: {task}" | |
| def get_tasks(self) -> List[str]: | |
| return self.task_log | |
| class MultiAgentCoordinator: | |
| def __init__(self): | |
| self.agents: Dict[str, Agent] = {} | |
| def register_agent(self, name: str) -> str: | |
| if name not in self.agents: | |
| self.agents[name] = Agent(name) | |
| return f"Agent '{name}' registered." | |
| return f"Agent '{name}' already exists." | |
| def assign_task(self, agent_name: str, task: str) -> str: | |
| if agent_name in self.agents: | |
| return self.agents[agent_name].receive_task(task) | |
| return f"Agent '{agent_name}' not found." | |
| def get_all_tasks(self) -> Dict[str, List[str]]: | |
| return {name: agent.get_tasks() for name, agent in self.agents.items()} | |