#!/usr/bin/env python3
"""
Modal Implementation for MCO Hackathon
This module defines the Modal app, including the MCOClient for communicating
with the MCO MCP server via WebSockets, and the Claude 3 Opus agent function.
This version is fully asynchronous and uses a clean network-based client.
"""
import os
import json
import asyncio
import websockets
import modal
from typing import Dict, Any, Optional
# --- Modal App Setup ---
stub = modal.App(
name="mco-autogpt-agent",
secrets=[modal.Secret.from_dotenv()],
image=modal.Image.debian_slim(python_version="3.10").pip_install(
"websockets",
)
)
# --- MCPClient: Generic client for interacting with MCP Servers via WebSocket ---
class MCPClient:
"""A generic client for interacting with any MCP-compliant server via WebSockets."""
def __init__(self, server_url: str):
self.server_url = server_url
self.websocket = None
self.client_id = None # Provided by the server upon connection
self.request_counter = 0
self.available_tools = {} # Populated from server's welcome message
async def connect(self):
"""Establishes a WebSocket connection to the MCP server."""
try:
self.websocket = await websockets.connect(self.server_url)
welcome_message = await self.websocket.recv()
welcome_data = json.loads(welcome_message)
self.client_id = welcome_data.get("clientId")
self.available_tools = welcome_data.get("tools", {})
print(f"Connected to MCP server with client ID: {self.client_id}")
print(f"Available tools: {json.dumps(self.available_tools, indent=2)}")
except Exception as e:
print(f"Failed to connect to WebSocket server at {self.server_url}: {e}")
raise
async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Sends a tool call to the MCP server and awaits the response."""
if not self.websocket or not self.websocket.open:
raise ConnectionError("WebSocket connection is not established or has been closed.")
self.request_counter += 1
request_id = f"{self.client_id}-{self.request_counter}"
message = {
"type": "tool_call",
"tool": tool_name,
"params": params,
"requestId": request_id
}
try:
await self.websocket.send(json.dumps(message))
response_message = await asyncio.wait_for(self.websocket.recv(), timeout=60.0)
response_data = json.loads(response_message)
if response_data.get("requestId") != request_id:
print(f"Warning: Response ID mismatch. Expected {request_id}, got {response_data.get('requestId')}")
# The 'result' field is standard for successful tool calls in MCP
return response_data.get("result", response_data) # Return full response if no 'result'
except asyncio.TimeoutError:
print(f"Timeout waiting for response for tool call '{tool_name}'.")
# Consider returning an error structure or raising a specific exception
return {"error": f"Timeout waiting for response for tool call '{tool_name}'."}
except Exception as e:
print(f"An error occurred during tool call '{tool_name}': {e}")
# Consider returning an error structure or raising a specific exception
return {"error": f"An error occurred during tool call '{tool_name}': {str(e)}"}
async def close(self):
"""Closes the WebSocket connection."""
if self.websocket:
await self.websocket.close()
print("WebSocket connection closed.")
# --- Agent's Local Tools ---
def create_file_locally(filename: str, content: str) -> dict:
"""Creates a file in the agent's local (container) filesystem."""
try:
# Note: In Modal, files written here are typically ephemeral to the container's current run.
# For persistent storage across invocations or for sharing, Modal's NetworkFileSystem
# or other storage solutions (e.g., an S3 upload tool) would be needed.
# This basic tool writes to the current working directory of the Modal function execution.
with open(filename, "w") as f:
f.write(content)
return {"status": "success", "message": f"File '{filename}' created successfully in agent's environment."}
except Exception as e:
return {"status": "error", "message": f"Failed to create file '{filename}': {str(e)}"}
async def call_external_mcp_tool(server_url: str, target_tool_name: str, target_tool_params: Dict[str, Any]) -> Dict[str, Any]:
"""Connects to an external MCP server and calls a specified tool. Returns the result."""
print(f"[Agent Tool] Attempting to call tool '{target_tool_name}' on external MCP server: {server_url}")
# Ensure server_url is a WebSocket URL for the MCPClient
ws_server_url = server_url
if server_url.startswith("http://"):
ws_server_url = server_url.replace("http://", "ws://", 1)
elif server_url.startswith("https://"):
ws_server_url = server_url.replace("https://", "wss://", 1)
external_mcp_client = MCPClient(server_url=ws_server_url)
try:
await external_mcp_client.connect()
print(f"[Agent Tool] Connected to external MCP server: {server_url}")
result = await external_mcp_client.call_tool(target_tool_name, target_tool_params)
print(f"[Agent Tool] Successfully called '{target_tool_name}' on {server_url}. Result: {json.dumps(result)}")
return result
except ConnectionError as e:
err_msg = f"Connection error with external MCP server {server_url}: {str(e)}"
print(f"[Agent Tool] {err_msg}")
return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "CONNECTION_ERROR"}
except Exception as e:
err_msg = f"Error calling tool '{target_tool_name}' on external MCP server {server_url}: {str(e)}"
print(f"[Agent Tool] {err_msg}")
return {"error": err_msg, "tool_name": target_tool_name, "external_server_url": server_url, "status_code": "EXECUTION_ERROR"}
finally:
if external_mcp_client and external_mcp_client.websocket:
await external_mcp_client.close()
print(f"[Agent Tool] Closed connection to external MCP server: {server_url}")
async def read_file_locally(filename: str) -> Dict[str, Any]:
"""Reads a file from the local filesystem and returns its content."""
print(f"[Agent Tool] Attempting to read file: {filename}")
try:
# Security check: Basic path traversal prevention
base_dir = os.path.abspath(".") # Define a base directory if needed, for now current dir
requested_path = os.path.abspath(filename)
if not requested_path.startswith(base_dir):
err_msg = f"Security error: Path traversal attempt detected for filename '{filename}'."
print(f"[Agent Tool] {err_msg}")
return {"error": err_msg, "filename": filename, "status_code": "SECURITY_ERROR"}
if not os.path.exists(filename):
err_msg = f"File not found: {filename}"
print(f"[Agent Tool] {err_msg}")
return {"error": err_msg, "filename": filename, "status_code": "FILE_NOT_FOUND"}
with open(filename, "r") as f:
content = f.read()
print(f"[Agent Tool] Successfully read file: {filename}")
return {"filename": filename, "content": content, "status_code": "SUCCESS"}
except Exception as e:
err_msg = f"Error reading file '{filename}': {str(e)}"
print(f"[Agent Tool] {err_msg}")
return {"error": err_msg, "filename": filename, "status_code": "READ_ERROR"}
AGENT_LOCAL_TOOLS = {
"create_file_locally": create_file_locally,
"call_external_mcp_tool": call_external_mcp_tool,
"read_file_locally": read_file_locally,
# Add other local tools here
}
# --- Agent (Orchestrated by an MCO Server) ---
async def arun_mco_agent(user_prompt: str):
"""Async generator that runs an agent orchestrated by an MCO server.
This function connects to the MCO server (which is an MCP server),
initiates an orchestration by calling the 'start_orchestration' tool,
and then enters a loop to get and process directives. If a directive is
a 'tool_call' for one of the agent's own local tools, it executes it.
Otherwise, it yields status updates for the Gradio UI to stream.
"""
print(f"Starting MCO agent with prompt: '{user_prompt}'")
yield {"type": "status", "message": "Initializing MCP Client and preparing for orchestration..."}
server_url = os.environ.get("MCO_SERVER_URL", "http://localhost:3000").replace("http", "ws")
mcp_client = MCPClient(server_url=server_url)
try:
await mcp_client.connect()
yield {"type": "status", "message": "Starting orchestration with MCO server..."}
start_params = {"config": {"user_prompt": user_prompt}}
start_result = await mcp_client.call_tool("start_orchestration", start_params)
orchestration_id = start_result.get("orchestration_id")
if not orchestration_id:
yield {"type": "error", "message": "Error: Failed to start orchestration. No orchestration_id received."}
return
yield {"type": "status", "message": f"Orchestration started with ID: {orchestration_id}. Goal: '{user_prompt}'. MCO server will use configured SNLP."}
print(f"Orchestration started. ID: {orchestration_id}, Prompt: {user_prompt}. MCO server using configured SNLP.")
last_tool_result = None
max_turns = 20 # Max turns to prevent infinite loops
for turn in range(max_turns):
get_directive_params = {"orchestration_id": orchestration_id}
if last_tool_result:
get_directive_params["last_tool_result"] = last_tool_result
print(f"Sending last_tool_result to get_next_directive: {json.dumps(last_tool_result, indent=2)}")
last_tool_result = None
directive = await mcp_client.call_tool("get_next_directive", get_directive_params)
directive_type = directive.get("type")
# Summarize directive for UI to avoid excessive length
directive_summary = {k: (str(v)[:100] + '...' if isinstance(v, str) and len(str(v)) > 100 else v)
for k, v in directive.items()}
yield {"type": "status", "message": f"Turn {turn + 1}/{max_turns}: Received directive of type '{directive_type}'. Details: {json.dumps(directive_summary)}"}
print(f"Turn {turn + 1}: Received directive: {json.dumps(directive, indent=2)}")
if not directive_type:
yield {"type": "error", "message": "Error: Received an empty or invalid directive."}
break
if directive_type == "tool_call":
tool_name = directive.get("tool")
tool_params = directive.get("params", {})
tool_id = directive.get("tool_id", f"local_tool_{turn + 1}") # Ensure tool_id is present
if tool_name in AGENT_LOCAL_TOOLS:
tool_function = AGENT_LOCAL_TOOLS[tool_name]
tool_params_summary = {k: (str(v)[:70] + '...' if isinstance(v, str) and len(str(v)) > 70 else v)
for k, v in tool_params.items()}
yield {"type": "status", "message": f"Preparing to execute local tool: '{tool_name}' with parameters: {json.dumps(tool_params_summary)} as directed by MCO."}
print(f"Executing local tool: {tool_name}, Params: {json.dumps(tool_params, indent=2)}")
try:
tool_result_payload = await tool_function(**tool_params)
last_tool_result = {
"tool_id": tool_id,
"tool_name": tool_name,
"status": "success",
"result": tool_result_payload
}
# Yielding the raw tool_result_payload. Gradio UI can decide how to format it.
yield {"type": "tool_result", "tool_name": tool_name, "result": tool_result_payload, "orchestration_id": orchestration_id, "tool_id": tool_id}
print(f"Tool {tool_name} executed. Result: {json.dumps(tool_result_payload, indent=2)}")
except Exception as e:
error_message = f"Error executing tool {tool_name}: {str(e)}"
yield {"type": "error", "message": f"Error executing tool {tool_name}: {error_message}"}
print(error_message)
last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": error_message}
else:
unknown_tool_msg = f"Unknown tool requested by MCO: {tool_name}"
yield {"type": "error", "message": f"Error: {unknown_tool_msg}"}
print(unknown_tool_msg)
last_tool_result = {"tool_id": tool_id, "tool_name": tool_name, "status": "error", "error": "Unknown tool"}
elif directive_type == "assistant_message":
content = directive.get("content", "")
# Ensure MCO messages are consistently wrapped for UI display as agent thoughts/observations.
processed_content = f"MCO Message: {content}"
yield {"type": "status", "message": processed_content}
print(f"MCO Message: {content}")
last_tool_result = None
elif directive_type == "final_answer":
final_answer = directive.get("answer", "Orchestration complete.")
yield {"type": "final_result", "result": f"Final Answer from MCO: {final_answer}", "orchestration_id": orchestration_id}
print(f"Orchestration finished. Final answer: {final_answer}")
break
elif directive_type == "orchestration_error":
error_details = directive.get("details", "Unknown orchestration error.")
yield {"type": "error", "message": f"Orchestration Error from MCO: {error_details}"}
print(f"Orchestration Error from MCO: {error_details}")
break
else:
unknown_directive_msg = f"Unknown directive type received: '{directive_type}'"
yield {"type": "error", "message": f"Error: {unknown_directive_msg}. Directive: {json.dumps(directive_summary)}"}
print(f"{unknown_directive_msg}. Directive: {json.dumps(directive, indent=2)}")
break
if turn == max_turns - 1 and directive_type != "final_answer": # Avoid double message if final_answer was on last turn
yield {"type": "status", "message": "Max turns reached. Ending orchestration."}
print("Max turns reached.")
except ConnectionError as e:
yield {"type": "error", "message": f"Connection error: {str(e)}"}
except Exception as e:
yield {"type": "error", "message": f"An unexpected error occurred in the agent: {str(e)}"}
finally:
if mcp_client and mcp_client.websocket:
await mcp_client.close()
yield {"type": "status", "message": "MCP client shut down."}
@stub.function(
secrets=[modal.Secret.from_dotenv()],
timeout=600,
scaledown_window=120,
)
def run_mco_agent(user_prompt: str):
"""Synchronous wrapper for the async generator, exposed as a Modal Function.
This allows Gradio to call the agent and stream results.
"""
import asyncio
g = arun_mco_agent(user_prompt)
try:
while True:
yield asyncio.run(g.__anext__())
except StopAsyncIteration:
pass