Pokemon_MCP / utils /pokemon_utils.py
Jofthomas's picture
change
444a11a
import asyncio
import os
import random
import uuid
from typing import Optional, Dict, Any, List, Union
from poke_env import AccountConfiguration, ServerConfiguration
from poke_env.player import Player
from poke_env.environment.battle import Battle
from poke_env.environment.move import Move
from poke_env.environment.pokemon import Pokemon
from agents import LLMAgentBase
# Custom server configuration
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket"
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?'
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL)
# Global battle state management
active_battles = {}
player_instances = {}
class MCPPokemonAgent(LLMAgentBase):
"""
Special Pokemon agent controlled by MCP that allows external move selection.
"""
def __init__(self, username: str, *args, **kwargs):
# Add random suffix to make username unique
unique_username = f"{username}_{random.randint(1000, 9999)}"
account_config = AccountConfiguration(unique_username, None)
super().__init__(
account_configuration=account_config,
server_configuration=custom_config,
max_concurrent_battles=1,
save_replays="battle_replays",
avatar="ash",
log_level=25, # Reduce logging verbosity
*args, **kwargs
)
self.external_move_queue = asyncio.Queue()
self.battle_state_callback = None
self.current_battle_id = None
async def choose_move(self, battle: Battle) -> str:
"""Wait for external move selection via MCP"""
self.current_battle_id = battle.battle_tag
# Update active battle state
if battle.battle_tag in active_battles:
active_battles[battle.battle_tag]['battle_state'] = self._format_battle_state(battle)
active_battles[battle.battle_tag]['waiting_for_move'] = True
active_battles[battle.battle_tag]['available_moves'] = [move.id for move in battle.available_moves]
active_battles[battle.battle_tag]['available_switches'] = [pkmn.species for pkmn in battle.available_switches]
try:
# Wait for external move selection (no timeout - wait indefinitely)
move_order = await self.external_move_queue.get()
# Update battle state
if battle.battle_tag in active_battles:
active_battles[battle.battle_tag]['waiting_for_move'] = False
return move_order
except Exception as e:
print(f"Error in move selection for battle {battle.battle_tag}: {e}")
if battle.battle_tag in active_battles:
active_battles[battle.battle_tag]['waiting_for_move'] = False
# Don't choose random move - raise the exception to handle gracefully
raise
async def submit_move(self, move_name: str = None, pokemon_name: str = None, battle: Battle = None):
"""Submit a move or switch externally"""
if not battle and self.current_battle_id:
# Find the battle by ID
for battle_obj in self.battles.values():
if battle_obj.battle_tag == self.current_battle_id:
battle = battle_obj
break
if not battle:
raise ValueError("No active battle found")
if move_name:
# Find and execute move
chosen_move = self._find_move_by_name(battle, move_name)
if chosen_move and chosen_move in battle.available_moves:
move_order = self.create_order(chosen_move)
await self.external_move_queue.put(move_order)
return f"Submitted move: {chosen_move.id}"
else:
raise ValueError(f"Move '{move_name}' not available")
elif pokemon_name:
# Find and execute switch
chosen_switch = self._find_pokemon_by_name(battle, pokemon_name)
if chosen_switch and chosen_switch in battle.available_switches:
switch_order = self.create_order(chosen_switch)
await self.external_move_queue.put(switch_order)
return f"Submitted switch: {chosen_switch.species}"
else:
raise ValueError(f"Pokemon '{pokemon_name}' not available for switch")
else:
raise ValueError("Must specify either move_name or pokemon_name")
def normalize_name(name: str) -> str:
"""Lowercase and remove non-alphanumeric characters."""
return "".join(filter(str.isalnum, name)).lower()
def format_battle_state(battle: Battle) -> Dict[str, Any]:
"""
Format battle state into a structured dictionary.
Args:
battle (Battle): The current battle object
Returns:
dict: Formatted battle state
"""
active_pkmn = battle.active_pokemon
opponent_pkmn = battle.opponent_active_pokemon
# Format active Pokemon info
try:
active_info = {
'species': getattr(active_pkmn, 'species', 'unknown') if active_pkmn else None,
'types': [str(t) for t in getattr(active_pkmn, 'types', [])] if active_pkmn else [],
'hp_fraction': getattr(active_pkmn, 'current_hp_fraction', 0) if active_pkmn else 0,
'status': getattr(active_pkmn.status, 'name', None) if active_pkmn and hasattr(active_pkmn, 'status') and active_pkmn.status else None,
'boosts': dict(getattr(active_pkmn, 'boosts', {})) if active_pkmn else {},
'ability': str(active_pkmn.ability) if active_pkmn and hasattr(active_pkmn, 'ability') and active_pkmn.ability else None
}
except Exception as e:
active_info = {
'species': 'error',
'types': [],
'hp_fraction': 0,
'status': None,
'boosts': {},
'ability': None,
'error': f"Error formatting active Pokemon: {e}"
}
# Format opponent Pokemon info
try:
opponent_info = {
'species': getattr(opponent_pkmn, 'species', 'unknown') if opponent_pkmn else None,
'types': [str(t) for t in getattr(opponent_pkmn, 'types', [])] if opponent_pkmn else [],
'hp_fraction': getattr(opponent_pkmn, 'current_hp_fraction', 0) if opponent_pkmn else 0,
'status': getattr(opponent_pkmn.status, 'name', None) if opponent_pkmn and hasattr(opponent_pkmn, 'status') and opponent_pkmn.status else None,
'boosts': dict(getattr(opponent_pkmn, 'boosts', {})) if opponent_pkmn else {},
'ability': str(opponent_pkmn.ability) if opponent_pkmn and hasattr(opponent_pkmn, 'ability') and opponent_pkmn.ability else None
}
except Exception as e:
opponent_info = {
'species': 'error',
'types': [],
'hp_fraction': 0,
'status': None,
'boosts': {},
'ability': None,
'error': f"Error formatting opponent Pokemon: {e}"
}
# Format available moves
available_moves = []
if battle.available_moves:
for move in battle.available_moves:
try:
available_moves.append({
'id': move.id,
'name': getattr(move, 'name', move.id), # Fallback to id if no name
'type': str(move.type) if hasattr(move, 'type') else 'unknown',
'base_power': getattr(move, 'base_power', 0),
'accuracy': getattr(move, 'accuracy', 100),
'pp': f"{getattr(move, 'current_pp', '?')}/{getattr(move, 'max_pp', '?')}",
'category': getattr(move.category, 'name', 'unknown') if hasattr(move, 'category') else 'unknown',
'description': getattr(move, 'description', '') or ""
})
except Exception as e:
# If there's any error with a specific move, add minimal info
available_moves.append({
'id': getattr(move, 'id', 'unknown'),
'name': str(move),
'type': 'unknown',
'base_power': 0,
'accuracy': 100,
'pp': '?/?',
'category': 'unknown',
'description': f"Error formatting move: {e}"
})
# Format available switches
available_switches = []
if battle.available_switches:
for pkmn in battle.available_switches:
try:
available_switches.append({
'species': getattr(pkmn, 'species', 'unknown'),
'hp_fraction': getattr(pkmn, 'current_hp_fraction', 0),
'status': getattr(pkmn.status, 'name', None) if hasattr(pkmn, 'status') and pkmn.status else None,
'types': [str(t) for t in getattr(pkmn, 'types', [])]
})
except Exception as e:
available_switches.append({
'species': 'error',
'hp_fraction': 0,
'status': None,
'types': [],
'error': f"Error formatting switch: {e}"
})
# Safely build the return dictionary
try:
result = {
'battle_id': getattr(battle, 'battle_tag', 'unknown'),
'turn': getattr(battle, 'turn', 0),
'active_pokemon': active_info,
'opponent_pokemon': opponent_info,
'available_moves': available_moves,
'available_switches': available_switches,
'weather': str(battle.weather) if hasattr(battle, 'weather') and battle.weather else None,
'fields': [str(field) for field in getattr(battle, 'fields', [])],
'side_conditions': [str(cond) for cond in getattr(battle, 'side_conditions', [])],
'opponent_side_conditions': [str(cond) for cond in getattr(battle, 'opponent_side_conditions', [])],
'force_switch': getattr(battle, 'force_switch', False),
'can_z_move': getattr(battle, 'can_z_move', False),
'can_dynamax': getattr(battle, 'can_dynamax', False),
'can_mega_evolve': getattr(battle, 'can_mega_evolve', False)
}
return result
except Exception as e:
# Fallback minimal battle state
return {
'battle_id': str(battle) if battle else 'error',
'turn': 0,
'active_pokemon': active_info,
'opponent_pokemon': opponent_info,
'available_moves': available_moves,
'available_switches': available_switches,
'weather': None,
'fields': [],
'side_conditions': [],
'opponent_side_conditions': [],
'force_switch': False,
'can_z_move': False,
'can_dynamax': False,
'can_mega_evolve': False,
'formatting_error': f"Error formatting battle state: {e}"
}
async def _start_ladder_task(player, username: str):
"""Background task to start ladder search - fire and forget"""
try:
await player.ladder(1) # Play 1 ladder battle
print(f"Ladder search started for {username}")
except Exception as e:
print(f"Failed to start ladder search for {username}: {e}")
def start_ladder_battle(username: str) -> Dict[str, str]:
"""
Start a ladder battle for the MCP-controlled player (fire-and-forget).
Args:
username (str): Username for the MCP player
Returns:
dict: Status information about the ladder request
"""
if username in player_instances:
player = player_instances[username]
else:
player = MCPPokemonAgent(username)
player_instances[username] = player
# Start ladder battle (this will connect to showdown and find a match)
try:
# Start ladder search in background task (fire-and-forget)
asyncio.create_task(_start_ladder_task(player, username))
# Return immediately
return {
'status': 'ladder_search_queued',
'player_username': username,
'message': f'Ladder search queued for {username}. Use find_recent_battles() or get_player_status() to check when match is found.'
}
except Exception as e:
# Clean up player instance on failure
if username in player_instances:
del player_instances[username]
raise Exception(f"Failed to queue ladder search: {str(e)}")
async def start_battle_against_agent(username: str, opponent_agent, battle_format: str = "gen9randombattle") -> str:
"""
Start a battle against a specific agent.
Args:
username (str): Username for the MCP player
opponent_agent: The opponent agent to battle against
battle_format (str): Battle format
Returns:
str: Battle ID
"""
if username in player_instances:
player = player_instances[username]
else:
player = MCPPokemonAgent(username)
player_instances[username] = player
try:
# Start battle against opponent
await player.battle_against(opponent_agent, n_battles=1)
# Wait for battle to start with shorter intervals for responsiveness
max_wait_time = 10 # Maximum 10 seconds
check_interval = 0.5 # Check every 0.5 seconds
wait_time = 0
while wait_time < max_wait_time:
await asyncio.sleep(check_interval)
wait_time += check_interval
# Check if a new battle has started
if player.battles:
battle_id = list(player.battles.keys())[-1]
battle = player.battles[battle_id]
# Store battle info
active_battles[battle_id] = {
'type': 'agent',
'player_username': username,
'opponent': opponent_agent.username,
'battle_state': format_battle_state(battle),
'waiting_for_move': False,
'completed': False,
'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}"
}
return battle_id
# If we get here, no battle started within timeout
raise Exception(f"Battle against {opponent_agent.username} requested, but no battle started within {max_wait_time} seconds.")
except Exception as e:
raise Exception(f"Failed to start battle against agent: {str(e)}")
async def _send_challenge_task(player, opponent_username: str):
"""Background task to send challenge - fire and forget"""
try:
await player.send_challenges(opponent_username, n_challenges=1)
print(f"Challenge sent to {opponent_username}")
except Exception as e:
print(f"Failed to send challenge to {opponent_username}: {e}")
def start_battle_against_player(username: str, opponent_username: str) -> Dict[str, str]:
"""
Start a battle against a specific player (fire-and-forget).
Args:
username (str): Username for the MCP player
opponent_username (str): Username of the opponent
Returns:
dict: Status information about the challenge request
"""
if username in player_instances:
player = player_instances[username]
else:
player = MCPPokemonAgent(username)
player_instances[username] = player
try:
# Start challenge in background task (fire-and-forget)
asyncio.create_task(_send_challenge_task(player, opponent_username))
# Return immediately
return {
'status': 'challenge_queued',
'player_username': username,
'opponent': opponent_username,
'message': f'Challenge to {opponent_username} queued. Use find_recent_battles() or get_player_status() to check if battle started.'
}
except Exception as e:
# Clean up player instance on failure
if username in player_instances:
del player_instances[username]
raise Exception(f"Failed to queue challenge to {opponent_username}: {str(e)}")
async def submit_move_for_battle(battle_id: str, move_name: str = None, pokemon_name: str = None) -> str:
"""
Submit a move or switch for a specific battle.
Args:
battle_id (str): The battle ID
move_name (str, optional): Name of move to use
pokemon_name (str, optional): Name of Pokemon to switch to
Returns:
str: Result message
"""
if battle_id not in active_battles:
raise ValueError(f"Battle {battle_id} not found")
battle_info = active_battles[battle_id]
username = battle_info['player_username']
if username not in player_instances:
raise ValueError(f"Player {username} not found")
player = player_instances[username]
# Find the battle object
battle = None
for battle_obj in player.battles.values():
if battle_obj.battle_tag == battle_id:
battle = battle_obj
break
if not battle:
raise ValueError(f"Battle object for {battle_id} not found")
# Submit the move
result = await player.submit_move(move_name=move_name, pokemon_name=pokemon_name, battle=battle)
# Update battle state
active_battles[battle_id]['battle_state'] = format_battle_state(battle)
return result
def get_battle_state(battle_id: str) -> Dict[str, Any]:
"""
Get the current state of a battle.
Args:
battle_id (str): The battle ID
Returns:
dict: Current battle state
"""
if battle_id not in active_battles:
raise ValueError(f"Battle {battle_id} not found")
return active_battles[battle_id]
def list_active_battles() -> List[Dict[str, Any]]:
"""
List all active battles.
Returns:
list: List of active battle info
"""
return [
{
'battle_id': battle_id,
'type': info['type'],
'opponent': info['opponent'],
'waiting_for_move': info['waiting_for_move'],
'completed': info['completed'],
'battle_url': info.get('battle_url', f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}")
}
for battle_id, info in active_battles.items()
]
def check_recent_battles(username: str) -> List[Dict[str, Any]]:
"""
Check for recent battles that may have started after a timeout.
Args:
username (str): Username to check battles for
Returns:
list: List of recent battle info
"""
if username not in player_instances:
return []
player = player_instances[username]
recent_battles = []
for battle_id, battle in player.battles.items():
if battle_id not in active_battles:
# This is a new battle that wasn't tracked yet
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}"
battle_info = {
'battle_id': battle_id,
'battle_url': battle_url,
'opponent': getattr(battle, 'opponent_username', 'unknown'),
'format': battle.format if hasattr(battle, 'format') else 'unknown',
'turn': battle.turn if hasattr(battle, 'turn') else 0,
'battle_state': format_battle_state(battle)
}
# Add to active battles tracking
active_battles[battle_id] = {
'type': 'recovered',
'player_username': username,
'opponent': battle_info['opponent'],
'battle_state': battle_info['battle_state'],
'waiting_for_move': False,
'completed': False,
'battle_url': battle_url
}
recent_battles.append(battle_info)
return recent_battles
async def download_battle_replay(battle_id: str) -> str:
"""
Download the replay for a completed battle.
Args:
battle_id (str): The battle ID
Returns:
str: Path to the replay file or replay content
"""
if battle_id not in active_battles:
raise ValueError(f"Battle {battle_id} not found")
battle_info = active_battles[battle_id]
username = battle_info['player_username']
if username not in player_instances:
raise ValueError(f"Player {username} not found")
player = player_instances[username]
# Find the battle object
battle = None
for battle_obj in player.battles.values():
if battle_obj.battle_tag == battle_id:
battle = battle_obj
break
if not battle:
raise ValueError(f"Battle object for {battle_id} not found")
# Create replay directory with proper permissions
# Use /tmp for temporary files or create in current directory with explicit permissions
replay_dir = os.path.join(os.getcwd(), "battle_replays")
try:
os.makedirs(replay_dir, mode=0o755, exist_ok=True)
except PermissionError:
# Fallback to /tmp if we can't write to current directory
replay_dir = "/tmp/battle_replays"
os.makedirs(replay_dir, mode=0o755, exist_ok=True)
# Look for replay file
potential_files = [
f"{replay_dir}/{battle_id}.html",
f"{replay_dir}/{battle_id}.txt",
f"{replay_dir}/{battle.battle_tag}.html",
f"{replay_dir}/{battle.battle_tag}.txt"
]
for file_path in potential_files:
if os.path.exists(file_path):
return file_path
# If no replay file found, return the battle's replay data if available
if hasattr(battle, 'replay') and battle.replay:
replay_path = os.path.join(replay_dir, f"{battle_id}_replay.txt")
try:
with open(replay_path, 'w') as f:
f.write(str(battle.replay))
return replay_path
except PermissionError:
raise ValueError(f"Permission denied when writing replay file to {replay_path}. Check directory permissions.")
raise ValueError(f"No replay found for battle {battle_id}")
def cleanup_completed_battles():
"""Clean up completed battles to free memory."""
global active_battles
active_battles = {
battle_id: info for battle_id, info in active_battles.items()
if not info.get('completed', False)
}
def debug_move_attributes(battle) -> Dict[str, Any]:
"""Debug function to inspect Move object attributes"""
debug_info = {
"available_moves_count": len(battle.available_moves) if hasattr(battle, 'available_moves') and battle.available_moves else 0,
"move_attributes": []
}
if hasattr(battle, 'available_moves') and battle.available_moves:
for i, move in enumerate(battle.available_moves[:3]): # Check first 3 moves only
try:
move_attrs = {
"move_index": i,
"move_type": type(move).__name__,
"available_attributes": [attr for attr in dir(move) if not attr.startswith('_')],
"str_representation": str(move),
"has_id": hasattr(move, 'id'),
"has_name": hasattr(move, 'name'),
"id_value": getattr(move, 'id', 'NO_ID'),
"name_value": getattr(move, 'name', 'NO_NAME') if hasattr(move, 'name') else 'NO_NAME_ATTR'
}
debug_info["move_attributes"].append(move_attrs)
except Exception as e:
debug_info["move_attributes"].append({
"move_index": i,
"error": str(e)
})
return debug_info