Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Modal Implementation for MCO Hackathon | |
This module defines the Modal app, including the MCOClient for communicating | |
with the MCO MCP server via WebSockets, and the Claude 3 Opus agent function. | |
This version is fully asynchronous and uses a clean network-based client. | |
""" | |
import os | |
import json | |
import asyncio | |
import websockets | |
import modal | |
from typing import Dict, Any, Optional | |
# --- Modal App Setup --- | |
stub = modal.App( | |
name="mco-autogpt-agent", | |
secrets=[modal.Secret.from_dotenv()], | |
image=modal.Image.debian_slim(python_version="3.10").pip_install( | |
"websockets", | |
) | |
) | |
# --- MCPClient: Generic client for interacting with MCP Servers via WebSocket --- | |
class MCPClient: | |
"""A generic client for interacting with any MCP-compliant server via WebSockets.""" | |
def __init__(self, server_url: str): | |
self.server_url = server_url | |
self.websocket = None | |
self.client_id = None # Provided by the server upon connection | |
self.request_counter = 0 | |
self.available_tools = {} # Populated from server's welcome message | |
async def connect(self): | |
"""Establishes a WebSocket connection to the MCP server.""" | |
try: | |
self.websocket = await websockets.connect(self.server_url) | |
welcome_message = await self.websocket.recv() | |
welcome_data = json.loads(welcome_message) | |
self.client_id = welcome_data.get("clientId") | |
self.available_tools = welcome_data.get("tools", {}) | |
print(f"Connected to MCP server with client ID: {self.client_id}") | |
print(f"Available tools: {json.dumps(self.available_tools, indent=2)}") | |
except Exception as e: | |
print(f"Failed to connect to WebSocket server at {self.server_url}: {e}") | |
raise | |
async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
"""Sends a tool call to the MCP server and awaits the response.""" | |
if not self.websocket or not self.websocket.open: | |
raise ConnectionError("WebSocket connection is not established or has been closed.") | |
self.request_counter += 1 | |
request_id = f"{self.client_id}-{self.request_counter}" | |
message = { | |
"type": "tool_call", | |
"tool": tool_name, | |
"params": params, | |
"requestId": request_id | |
} | |
try: | |
await self.websocket.send(json.dumps(message)) | |
response_message = await asyncio.wait_for(self.websocket.recv(), timeout=60.0) | |
response_data = json.loads(response_message) | |
if response_data.get("requestId") != request_id: | |
print(f"Warning: Response ID mismatch. Expected {request_id}, got {response_data.get('requestId')}") | |
# The 'result' field is standard for successful tool calls in MCP | |
return response_data.get("result", response_data) # Return full response if no 'result' | |
except asyncio.TimeoutError: | |
print(f"Timeout waiting for response for tool call '{tool_name}'.") | |
# Consider returning an error structure or raising a specific exception | |
return {"error": f"Timeout waiting for response for tool call '{tool_name}'."} | |
except Exception as e: | |
print(f"An error occurred during tool call '{tool_name}': {e}") | |
# Consider returning an error structure or raising a specific exception | |
return {"error": f"An error occurred during tool call '{tool_name}': {str(e)}"} | |
async def close(self): | |
"""Closes the WebSocket connection.""" | |
if self.websocket: | |
await self.websocket.close() | |
print("WebSocket connection closed.") | |
# --- Agent's Local Tools --- | |
def create_file_locally(filename: str, content: str) -> dict: | |
"""Creates a file in the agent's local (container) filesystem.""" | |
try: | |
# Note: In Modal, files written here are typically ephemeral to the container's current run. | |
# For persistent storage across invocations or for sharing, Modal's NetworkFileSystem | |
# or other storage solutions (e.g., an S3 upload tool) would be needed. | |
# This basic tool writes to the current working directory of the Modal function execution. | |
with open(filename, "w") as f: | |
f.write(content) | |
return {"status": "success", "message": f"File '{filename}' created successfully in agent's environment."} | |
except Exception as e: | |
return {"status": "error", "message": f"Failed to create file '{filename}': {str(e)}"} | |
async def call_external_mcp_tool(server_url: str, target_tool_name: str, target_tool_params: Dict[str, Any]) -> Dict[str, Any]: | |
"""Connects to an external MCP server and calls a specified tool. Returns the result.""" | |
print(f"[Agent Tool] Attempting to call tool '{target_tool_name}' on external MCP server: {server_url}") | |
# Ensure server_url is a WebSocket URL for the MCPClient | |
ws_server_url = server_url | |
if server_url.startswith("http://"): | |
ws_server_url = server_url.replace("http://", "ws://", 1) | |
elif server_url.startswith("https://"): | |
ws_server_url = server_url.replace("https://", "wss://", 1) | |
external_mcp_client = MCPClient(server_url=ws_server_url) | |
try: | |
await external_mcp_client.connect() | |
print(f"[Agent Tool] Connected to external MCP server: {server_url}") | |
result = await external_mcp_client.call_tool(target_tool_name, target_tool_params) | |
print(f"[Agent Tool] Successfully called '{target_tool_name}' on {server_url}. Result: {json.dumps(result)}") | |
return result | |
except ConnectionError as e: | |
err_msg = f"Connection error with external MCP server {server_url}: {str(e)}" | |
print(f"[Agent Tool] {err_msg}") | |
return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "CONNECTION_ERROR"} | |
except Exception as e: | |
err_msg = f"Error calling tool '{target_tool_name}' on external MCP server {server_url}: {str(e)}" | |
print(f"[Agent Tool] {err_msg}") | |
return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "EXECUTION_ERROR"} | |
finally: | |
if external_mcp_client and external_mcp_client.websocket: | |
await external_mcp_client.close() | |
print(f"[Agent Tool] Closed connection to external MCP server: {server_url}") | |
async def read_file_locally(filename: str) -> Dict[str, Any]: | |
"""Reads a file from the local filesystem and returns its content.""" | |
print(f"[Agent Tool] Attempting to read file: {filename}") | |
try: | |
# Security check: Basic path traversal prevention | |
base_dir = os.path.abspath(".") # Define a base directory if needed, for now current dir | |
requested_path = os.path.abspath(filename) | |
if not requested_path.startswith(base_dir): | |
err_msg = f"Security error: Path traversal attempt detected for filename '{filename}'." | |
print(f"[Agent Tool] {err_msg}") | |
return {"error": err_msg, "filename": filename, "status_code": "SECURITY_ERROR"} | |
if not os.path.exists(filename): | |
err_msg = f"File not found: {filename}" | |
print(f"[Agent Tool] {err_msg}") | |
return {"error": err_msg, "filename": filename, "status_code": "FILE_NOT_FOUND"} | |
with open(filename, "r") as f: | |
content = f.read() | |
print(f"[Agent Tool] Successfully read file: {filename}") | |
return {"filename": filename, "content": content, "status_code": "SUCCESS"} | |
except Exception as e: | |
err_msg = f"Error reading file '{filename}': {str(e)}" | |
print(f"[Agent Tool] {err_msg}") | |
return {"error": err_msg, "filename": filename, "status_code": "READ_ERROR"} | |
AGENT_LOCAL_TOOLS = { | |
"create_file_locally": create_file_locally, | |
"call_external_mcp_tool": call_external_mcp_tool, | |
"read_file_locally": read_file_locally, | |
# Add other local tools here | |
} | |
# --- Agent (Orchestrated by an MCO Server) --- | |
async def arun_mco_agent(user_prompt: str): | |
"""Async generator that runs an agent orchestrated by an MCO server. | |
This function connects to the MCO server (which is an MCP server), | |
initiates an orchestration by calling the 'start_orchestration' tool, | |
and then enters a loop to get and process directives. If a directive is | |
a 'tool_call' for one of the agent's own local tools, it executes it. | |
Otherwise, it yields status updates for the Gradio UI to stream. | |
""" | |
print(f"Starting MCO agent with prompt: '{user_prompt}'") | |
yield {"type": "status", "message": "<thinking>Initializing MCP Client and preparing for orchestration...</thinking>"} | |
server_url = os.environ.get("MCO_SERVER_URL", "http://localhost:3000").replace("http", "ws") | |
mcp_client = MCPClient(server_url=server_url) | |
try: | |
await mcp_client.connect() | |
yield {"type": "status", "message": "<thinking>Starting orchestration with MCO server...</thinking>"} | |
start_params = {"config": {"user_prompt": user_prompt}} | |
start_result = await mcp_client.call_tool("start_orchestration", start_params) | |
orchestration_id = start_result.get("orchestration_id") | |
if not orchestration_id: | |
yield {"type": "error", "message": "<thinking>Error: Failed to start orchestration. No orchestration_id received.</thinking>"} | |
return | |
yield {"type": "status", "message": f"<thinking>Orchestration started with ID: {orchestration_id}. Goal: '{user_prompt}'. MCO server will use configured SNLP.</thinking>"} | |
print(f"Orchestration started. ID: {orchestration_id}, Prompt: {user_prompt}. MCO server using configured SNLP.") | |
last_tool_result = None | |
max_turns = 20 # Max turns to prevent infinite loops | |
for turn in range(max_turns): | |
get_directive_params = {"orchestration_id": orchestration_id} | |
if last_tool_result: | |
get_directive_params["last_tool_result"] = last_tool_result | |
print(f"Sending last_tool_result to get_next_directive: {json.dumps(last_tool_result, indent=2)}") | |
last_tool_result = None | |
directive = await mcp_client.call_tool("get_next_directive", get_directive_params) | |
directive_type = directive.get("type") | |
# Summarize directive for UI to avoid excessive length | |
directive_summary = {k: (str(v)[:100] + '...' if isinstance(v, str) and len(str(v)) > 100 else v) | |
for k, v in directive.items()} | |
yield {"type": "status", "message": f"<thinking>Turn {turn + 1}/{max_turns}: Received directive of type '{directive_type}'. Details: {json.dumps(directive_summary)}</thinking>"} | |
print(f"Turn {turn + 1}: Received directive: {json.dumps(directive, indent=2)}") | |
if not directive_type: | |
yield {"type": "error", "message": "<thinking>Error: Received an empty or invalid directive.</thinking>"} | |
break | |
if directive_type == "tool_call": | |
tool_name = directive.get("tool") | |
tool_params = directive.get("params", {}) | |
tool_id = directive.get("tool_id", f"local_tool_{turn + 1}") # Ensure tool_id is present | |
if tool_name in AGENT_LOCAL_TOOLS: | |
tool_function = AGENT_LOCAL_TOOLS[tool_name] | |
tool_params_summary = {k: (str(v)[:70] + '...' if isinstance(v, str) and len(str(v)) > 70 else v) | |
for k, v in tool_params.items()} | |
yield {"type": "status", "message": f"<thinking>Preparing to execute local tool: '{tool_name}' with parameters: {json.dumps(tool_params_summary)} as directed by MCO.</thinking>"} | |
print(f"Executing local tool: {tool_name}, Params: {json.dumps(tool_params, indent=2)}") | |
try: | |
tool_result_payload = await tool_function(**tool_params) | |
last_tool_result = { | |
"tool_id": tool_id, | |
"tool_name": tool_name, | |
"status": "success", | |
"result": tool_result_payload | |
} | |
# Yielding the raw tool_result_payload. Gradio UI can decide how to format it. | |
yield {"type": "tool_result", "tool_name": tool_name, "result": tool_result_payload, "orchestration_id": orchestration_id, "tool_id": tool_id} | |
print(f"Tool {tool_name} executed. Result: {json.dumps(tool_result_payload, indent=2)}") | |
except Exception as e: | |
error_message = f"Error executing tool {tool_name}: {str(e)}" | |
yield {"type": "error", "message": f"<thinking>Error executing tool {tool_name}: {error_message}</thinking>"} | |
print(error_message) | |
last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": error_message} | |
else: | |
unknown_tool_msg = f"Unknown tool requested by MCO: {tool_name}" | |
yield {"type": "error", "message": f"<thinking>Error: {unknown_tool_msg}</thinking>"} | |
print(unknown_tool_msg) | |
last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": "Unknown tool"} | |
elif directive_type == "assistant_message": | |
content = directive.get("content", "") | |
# Ensure MCO messages are consistently wrapped for UI display as agent thoughts/observations. | |
processed_content = f"<thinking>MCO Message: {content}</thinking>" | |
yield {"type": "status", "message": processed_content} | |
print(f"MCO Message: {content}") | |
last_tool_result = None | |
elif directive_type == "final_answer": | |
final_answer = directive.get("answer", "Orchestration complete.") | |
yield {"type": "final_result", "result": f"<thinking>Final Answer from MCO: {final_answer}</thinking>", "orchestration_id": orchestration_id} | |
print(f"Orchestration finished. Final answer: {final_answer}") | |
break | |
elif directive_type == "orchestration_error": | |
error_details = directive.get("details", "Unknown orchestration error.") | |
yield {"type": "error", "message": f"<thinking>Orchestration Error from MCO: {error_details}</thinking>"} | |
print(f"Orchestration Error from MCO: {error_details}") | |
break | |
else: | |
unknown_directive_msg = f"Unknown directive type received: '{directive_type}'" | |
yield {"type": "error", "message": f"<thinking>Error: {unknown_directive_msg}. Directive: {json.dumps(directive_summary)}</thinking>"} | |
print(f"{unknown_directive_msg}. Directive: {json.dumps(directive, indent=2)}") | |
break | |
if turn == max_turns - 1 and directive_type != "final_answer": # Avoid double message if final_answer was on last turn | |
yield {"type": "status", "message": "<thinking>Max turns reached. Ending orchestration.</thinking>"} | |
print("Max turns reached.") | |
except ConnectionError as e: | |
yield {"type": "error", "message": f"<thinking>Connection error: {str(e)}</thinking>"} | |
except Exception as e: | |
yield {"type": "error", "message": f"<thinking>An unexpected error occurred in the agent: {str(e)}</thinking>"} | |
finally: | |
if mcp_client and mcp_client.websocket: | |
await mcp_client.close() | |
yield {"type": "status", "message": "<thinking>MCP client shut down.</thinking>"} | |
def run_mco_agent(user_prompt: str): | |
"""Synchronous wrapper for the async generator, exposed as a Modal Function. | |
This allows Gradio to call the agent and stream results. | |
""" | |
import asyncio | |
g = arun_mco_agent(user_prompt) | |
try: | |
while True: | |
yield asyncio.run(g.__anext__()) | |
except StopAsyncIteration: | |
pass | |