Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
from dotenv import load_dotenv | |
from mcp.server.fastmcp import FastMCP | |
from utils.agent_factory import create_agent, get_supported_agent_types, get_default_models | |
from utils.pokemon_utils import ( | |
start_ladder_battle, start_battle_against_agent, start_battle_against_player, | |
submit_move_for_battle, get_battle_state, list_active_battles, | |
download_battle_replay, cleanup_completed_battles, format_battle_state, | |
check_recent_battles, debug_move_attributes, active_battles, player_instances | |
) | |
load_dotenv() | |
# --- MCP Server Setup --- | |
mcp = FastMCP( | |
name="PokemonBattleAgent", | |
host="0.0.0.0", | |
port=7860, | |
) | |
# --- Global state for current user session --- | |
current_session = { | |
'username': None, | |
'active_battle_id': None | |
} | |
# --- MCP Tools --- | |
def start_ladder_match(username: str = "MCPTrainer") -> dict: | |
""" | |
Start a ladder battle on Pokemon Showdown. | |
Args: | |
username (str): Username for the MCP-controlled player | |
Returns: | |
dict: Battle information including battle ID and initial state | |
""" | |
global current_session | |
try: | |
# Start ladder search (fire-and-forget) | |
ladder_result = start_ladder_battle(username) | |
current_session['username'] = username | |
# Don't set active_battle_id yet since battle hasn't started | |
return { | |
"status": "ladder_search_queued", | |
"message": f"Ladder search queued for {username}. Waiting for opponent match.", | |
"instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once a match is found.", | |
"note": "Check server console for 'Ladder search started' confirmation." | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to start ladder battle: {str(e)}" | |
} | |
# @mcp.tool() | |
# async def battle_agent(agent_type: str, username: str = "MCPTrainer", api_key: str = None, model: str = None) -> dict: | |
# """ | |
# Start a battle against an AI agent. | |
# Args: | |
# agent_type (str): Type of agent ('openai', 'gemini', 'mistral', 'maxdamage', 'random') | |
# username (str): Username for the MCP-controlled player | |
# api_key (str, optional): API key for AI agents (required for openai, gemini, mistral) | |
# model (str, optional): Specific model to use (will use default if not specified) | |
# Returns: | |
# dict: Battle information including battle ID and initial state | |
# """ | |
# global current_session | |
# try: | |
# # Validate agent type | |
# if agent_type.lower() not in get_supported_agent_types(): | |
# return { | |
# "status": "error", | |
# "message": f"Unsupported agent type. Supported types: {get_supported_agent_types()}" | |
# } | |
# | |
# # Create opponent agent | |
# opponent = create_agent(agent_type, api_key=api_key, model=model) | |
# | |
# # Start battle | |
# battle_id = await start_battle_against_agent(username, opponent) | |
# | |
# current_session['username'] = username | |
# current_session['active_battle_id'] = battle_id | |
# | |
# # Get initial battle state | |
# battle_info = get_battle_state(battle_id) | |
# | |
# return { | |
# "status": "success", | |
# "battle_id": battle_id, | |
# "message": f"Battle started against {agent_type} agent", | |
# "opponent": opponent.username, | |
# "battle_url": battle_info.get('battle_url', f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}"), | |
# "battle_state": battle_info['battle_state'], | |
# "waiting_for_move": battle_info['waiting_for_move'] | |
# } | |
# except Exception as e: | |
# return { | |
# "status": "error", | |
# "message": f"Failed to start battle against agent: {str(e)}" | |
# } | |
def setup_battle() -> dict: | |
""" | |
Get setup information for playing Pokemon battles. | |
Provides instructions on how to join and play battles on the platform. | |
Returns: | |
dict: Setup instructions and platform information | |
""" | |
return { | |
"status": "success", | |
"message": "Pokemon Battle Setup Information", | |
"instructions": { | |
"how_to_play": [ | |
"1. Visit the Pokemon Showdown platform at: https://huggingface.co/spaces/Jofthomas/Pokemon_showdown", | |
"2. Choose a random username for yourself", | |
"3. Use the MCP tools to start battles, make moves, and interact with the game", | |
"4. You can start ladder matches, challenge specific players, or join existing battles" | |
], | |
"platform_url": "https://huggingface.co/spaces/Jofthomas/Pokemon_showdown", | |
"username_tip": "Choose any random username you like - it will be your identity in the Pokemon battle arena", | |
"available_battle_types": [ | |
"Ladder battles (automatic matchmaking)", | |
"Player vs Player challenges", | |
"Spectate ongoing battles" | |
] | |
}, | |
"next_steps": [ | |
"Visit the platform and choose your username", | |
"Use start_ladder_match() to find a random opponent", | |
"Use battle_player() to challenge a specific player", | |
"Use get_current_battle_state() to see your battle status" | |
], | |
"platform_info": { | |
"name": "Pokemon Showdown on Hugging Face", | |
"url": "https://huggingface.co/spaces/Jofthomas/Pokemon_showdown", | |
"type": "Web-based Pokemon battle simulator", | |
"access": "Free and open to all users" | |
} | |
} | |
def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict: | |
""" | |
Start a battle against a specific player. | |
Args: | |
opponent_username (str): Username of the opponent player | |
username (str): Username for the MCP-controlled player | |
Returns: | |
dict: Battle information including battle ID and initial state | |
""" | |
global current_session | |
try: | |
# Send battle challenge (fire-and-forget) | |
challenge_result = start_battle_against_player(username, opponent_username) | |
current_session['username'] = username | |
# Don't set active_battle_id yet since battle hasn't started | |
return { | |
"status": "challenge_queued", | |
"message": f"Challenge to {opponent_username} queued. Battle will start when accepted.", | |
"opponent": opponent_username, | |
"instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.", | |
"note": "Check server console for 'Challenge sent' confirmation." | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to challenge player: {str(e)}" | |
} | |
async def choose_move(move_name: str, battle_id: str = None) -> dict: | |
""" | |
Choose and execute a move in the current battle. | |
Args: | |
move_name (str): Name or ID of the move to use | |
battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
Returns: | |
dict: Result of the move and updated battle state | |
""" | |
global current_session | |
try: | |
# Use provided battle_id or current active battle | |
target_battle_id = battle_id or current_session.get('active_battle_id') | |
if not target_battle_id: | |
return { | |
"status": "error", | |
"message": "No active battle. Start a battle first." | |
} | |
# Submit move | |
result = await submit_move_for_battle(target_battle_id, move_name=move_name) | |
# Get updated battle state | |
battle_info = get_battle_state(target_battle_id) | |
return { | |
"status": "success", | |
"message": result, | |
"battle_state": battle_info['battle_state'], | |
"waiting_for_move": battle_info['waiting_for_move'] | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to execute move: {str(e)}" | |
} | |
async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict: | |
""" | |
Switch to a different Pokemon in the current battle. | |
Args: | |
pokemon_name (str): Name of the Pokemon to switch to | |
battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
Returns: | |
dict: Result of the switch and updated battle state | |
""" | |
global current_session | |
try: | |
# Use provided battle_id or current active battle | |
target_battle_id = battle_id or current_session.get('active_battle_id') | |
if not target_battle_id: | |
return { | |
"status": "error", | |
"message": "No active battle. Start a battle first." | |
} | |
# Submit switch | |
result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name) | |
# Get updated battle state | |
battle_info = get_battle_state(target_battle_id) | |
return { | |
"status": "success", | |
"message": result, | |
"battle_state": battle_info['battle_state'], | |
"waiting_for_move": battle_info['waiting_for_move'] | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to switch Pokemon: {str(e)}" | |
} | |
def get_current_battle_state(battle_id: str = None) -> dict: | |
""" | |
Get the current state of a battle. | |
Args: | |
battle_id (str, optional): Specific battle ID (uses current active battle if not provided) | |
Returns: | |
dict: Current battle state with all relevant information | |
""" | |
global current_session | |
try: | |
# Use provided battle_id or current active battle | |
target_battle_id = battle_id or current_session.get('active_battle_id') | |
if not target_battle_id: | |
return { | |
"status": "error", | |
"message": "No active battle. Start a battle first." | |
} | |
# Get battle state | |
battle_info = get_battle_state(target_battle_id) | |
return { | |
"status": "success", | |
"battle_id": target_battle_id, | |
**battle_info | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to get battle state: {str(e)}" | |
} | |
def list_battles() -> dict: | |
""" | |
List all active battles. | |
Returns: | |
dict: List of all active battles and their status | |
""" | |
try: | |
battles = list_active_battles() | |
return { | |
"status": "success", | |
"active_battles": battles, | |
"count": len(battles) | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to list battles: {str(e)}" | |
} | |
async def download_replay(battle_id: str = None) -> dict: | |
""" | |
Download the replay for a completed battle. | |
Args: | |
battle_id (str, optional): Battle ID (uses current active battle if not provided) | |
Returns: | |
dict: Information about the downloaded replay | |
""" | |
global current_session | |
try: | |
# Use provided battle_id or current active battle | |
target_battle_id = battle_id or current_session.get('active_battle_id') | |
if not target_battle_id: | |
return { | |
"status": "error", | |
"message": "No battle ID provided. Specify a battle_id or start a battle first." | |
} | |
# Download replay | |
replay_path = await download_battle_replay(target_battle_id) | |
return { | |
"status": "success", | |
"message": f"Replay downloaded successfully", | |
"replay_path": replay_path, | |
"battle_id": target_battle_id | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to download replay: {str(e)}" | |
} | |
def get_supported_agents() -> dict: | |
""" | |
Get information about supported agent types and their default models. | |
Returns: | |
dict: Information about available agents | |
""" | |
return { | |
"status": "success", | |
"supported_agents": get_supported_agent_types(), | |
"default_models": get_default_models(), | |
"description": { | |
"openai": "OpenAI GPT models (requires API key) - Deterministic decisions", | |
"gemini": "Google Gemini models (requires API key) - Deterministic decisions", | |
"mistral": "Mistral AI models (requires API key) - Deterministic decisions", | |
"maxdamage": "Simple agent that chooses moves with highest base power - Deterministic decisions" | |
} | |
} | |
def get_player_status(username: str = None) -> dict: | |
""" | |
Get the current status of a player including any active battles. | |
Args: | |
username (str, optional): Username to check (uses current session if not provided) | |
Returns: | |
dict: Player status and battle information | |
""" | |
global current_session | |
try: | |
# Use provided username or current session username | |
target_username = username or current_session.get('username') | |
if not target_username: | |
return { | |
"status": "error", | |
"message": "No username provided and no active session." | |
} | |
if target_username not in player_instances: | |
return { | |
"status": "no_player", | |
"username": target_username, | |
"message": f"No player instance found for {target_username}" | |
} | |
player = player_instances[target_username] | |
player_battles = [] | |
for battle_id, battle in player.battles.items(): | |
battle_info = { | |
'battle_id': battle_id, | |
'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}", | |
'turn': getattr(battle, 'turn', 0), | |
'finished': getattr(battle, 'finished', False), | |
'won': getattr(battle, 'won', None) | |
} | |
player_battles.append(battle_info) | |
return { | |
"status": "success", | |
"username": target_username, | |
"player_username": player.username, | |
"total_battles": len(player.battles), | |
"battles": player_battles, | |
"n_won_battles": getattr(player, 'n_won_battles', 0), | |
"n_finished_battles": getattr(player, 'n_finished_battles', 0) | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to get player status: {str(e)}" | |
} | |
def find_recent_battles(username: str = None) -> dict: | |
""" | |
Check for recent battles that may have started after a timeout. | |
Useful when battle requests time out but battles actually started. | |
Args: | |
username (str, optional): Username to check (uses current session if not provided) | |
Returns: | |
dict: List of recent battles with viewing links | |
""" | |
global current_session | |
try: | |
# Use provided username or current session username | |
target_username = username or current_session.get('username') | |
if not target_username: | |
return { | |
"status": "error", | |
"message": "No username provided and no active session." | |
} | |
# Check for recent battles using the imported function | |
recent_battles = check_recent_battles(target_username) | |
if recent_battles: | |
# Update current session with the most recent battle | |
current_session['active_battle_id'] = recent_battles[0]['battle_id'] | |
return { | |
"status": "success", | |
"username": target_username, | |
"recent_battles": recent_battles, | |
"count": len(recent_battles), | |
"message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found" | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to check recent battles: {str(e)}" | |
} | |
def debug_battle_objects(username: str = None, battle_id: str = None) -> dict: | |
""" | |
Debug tool to inspect battle objects and their attributes. | |
Useful for troubleshooting formatting errors. | |
Args: | |
username (str, optional): Username to debug (uses current session if not provided) | |
battle_id (str, optional): Specific battle ID to debug | |
Returns: | |
dict: Debug information about battle objects | |
""" | |
global current_session | |
try: | |
# Use provided username or current session username | |
target_username = username or current_session.get('username') | |
target_battle_id = battle_id or current_session.get('active_battle_id') | |
if not target_username: | |
return { | |
"status": "error", | |
"message": "No username provided and no active session." | |
} | |
if target_username not in player_instances: | |
return { | |
"status": "error", | |
"message": f"No player instance found for {target_username}" | |
} | |
player = player_instances[target_username] | |
debug_result = { | |
"status": "success", | |
"username": target_username, | |
"player_username": player.username, | |
"total_battles": len(player.battles), | |
"battles_debug": [] | |
} | |
battles_to_debug = [] | |
if target_battle_id and target_battle_id in player.battles: | |
battles_to_debug = [(target_battle_id, player.battles[target_battle_id])] | |
else: | |
# Debug all battles (limit to 3 most recent) | |
battles_to_debug = list(player.battles.items())[-3:] | |
for bid, battle in battles_to_debug: | |
try: | |
battle_debug = { | |
"battle_id": bid, | |
"battle_type": type(battle).__name__, | |
"battle_attributes": [attr for attr in dir(battle) if not attr.startswith('_')], | |
"move_debug": debug_move_attributes(battle) | |
} | |
debug_result["battles_debug"].append(battle_debug) | |
except Exception as e: | |
debug_result["battles_debug"].append({ | |
"battle_id": bid, | |
"error": f"Error debugging battle: {e}" | |
}) | |
return debug_result | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to debug battle objects: {str(e)}" | |
} | |
def cleanup_battles() -> dict: | |
""" | |
Clean up completed battles to free memory. | |
Returns: | |
dict: Status of cleanup operation | |
""" | |
try: | |
cleanup_completed_battles() | |
return { | |
"status": "success", | |
"message": "Completed battles cleaned up successfully" | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to cleanup battles: {str(e)}" | |
} | |
async def wait_30_seconds() -> dict: | |
""" | |
Wait for 30 seconds. Useful for giving processes time to complete or timing operations. | |
Returns: | |
dict: Status of the wait operation | |
""" | |
try: | |
await asyncio.sleep(30) | |
return { | |
"status": "success", | |
"message": "Waited 30 seconds successfully" | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Failed to wait: {str(e)}" | |
} | |
# --- Server Execution --- | |
if __name__ == "__main__": | |
print(f"Pokemon Battle MCP Server starting on port 7860...") | |
print("Available battle types:") | |
print("- Ladder battles") | |
print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage) - NO RANDOM FALLBACKS") | |
print("- Human player battles") | |
print("Running Pokemon Battle MCP server with SSE transport") | |
mcp.run(transport="sse") |