import asyncio import os import random import uuid from typing import Optional, Dict, Any, List, Union from poke_env import AccountConfiguration, ServerConfiguration from poke_env.player import Player from poke_env.environment.battle import Battle from poke_env.environment.move import Move from poke_env.environment.pokemon import Pokemon from agents import LLMAgentBase # Custom server configuration CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket" CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?' custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL) # Global battle state management active_battles = {} player_instances = {} class MCPPokemonAgent(LLMAgentBase): """ Special Pokemon agent controlled by MCP that allows external move selection. """ def __init__(self, username: str, *args, **kwargs): # Add random suffix to make username unique unique_username = f"{username}_{random.randint(1000, 9999)}" account_config = AccountConfiguration(unique_username, None) super().__init__( account_configuration=account_config, server_configuration=custom_config, max_concurrent_battles=1, save_replays="battle_replays", avatar="ash", log_level=25, # Reduce logging verbosity *args, **kwargs ) self.external_move_queue = asyncio.Queue() self.battle_state_callback = None self.current_battle_id = None async def choose_move(self, battle: Battle) -> str: """Wait for external move selection via MCP""" self.current_battle_id = battle.battle_tag # Update active battle state if battle.battle_tag in active_battles: active_battles[battle.battle_tag]['battle_state'] = self._format_battle_state(battle) active_battles[battle.battle_tag]['waiting_for_move'] = True active_battles[battle.battle_tag]['available_moves'] = [move.id for move in battle.available_moves] active_battles[battle.battle_tag]['available_switches'] = [pkmn.species for pkmn in battle.available_switches] try: # Wait for external move selection (no timeout - wait indefinitely) move_order = await self.external_move_queue.get() # Update battle state if battle.battle_tag in active_battles: active_battles[battle.battle_tag]['waiting_for_move'] = False return move_order except Exception as e: print(f"Error in move selection for battle {battle.battle_tag}: {e}") if battle.battle_tag in active_battles: active_battles[battle.battle_tag]['waiting_for_move'] = False # Don't choose random move - raise the exception to handle gracefully raise async def submit_move(self, move_name: str = None, pokemon_name: str = None, battle: Battle = None): """Submit a move or switch externally""" if not battle and self.current_battle_id: # Find the battle by ID for battle_obj in self.battles.values(): if battle_obj.battle_tag == self.current_battle_id: battle = battle_obj break if not battle: raise ValueError("No active battle found") if move_name: # Find and execute move chosen_move = self._find_move_by_name(battle, move_name) if chosen_move and chosen_move in battle.available_moves: move_order = self.create_order(chosen_move) await self.external_move_queue.put(move_order) return f"Submitted move: {chosen_move.id}" else: raise ValueError(f"Move '{move_name}' not available") elif pokemon_name: # Find and execute switch chosen_switch = self._find_pokemon_by_name(battle, pokemon_name) if chosen_switch and chosen_switch in battle.available_switches: switch_order = self.create_order(chosen_switch) await self.external_move_queue.put(switch_order) return f"Submitted switch: {chosen_switch.species}" else: raise ValueError(f"Pokemon '{pokemon_name}' not available for switch") else: raise ValueError("Must specify either move_name or pokemon_name") def normalize_name(name: str) -> str: """Lowercase and remove non-alphanumeric characters.""" return "".join(filter(str.isalnum, name)).lower() def format_battle_state(battle: Battle) -> Dict[str, Any]: """ Format battle state into a structured dictionary. Args: battle (Battle): The current battle object Returns: dict: Formatted battle state """ active_pkmn = battle.active_pokemon opponent_pkmn = battle.opponent_active_pokemon # Format active Pokemon info try: active_info = { 'species': getattr(active_pkmn, 'species', 'unknown') if active_pkmn else None, 'types': [str(t) for t in getattr(active_pkmn, 'types', [])] if active_pkmn else [], 'hp_fraction': getattr(active_pkmn, 'current_hp_fraction', 0) if active_pkmn else 0, 'status': getattr(active_pkmn.status, 'name', None) if active_pkmn and hasattr(active_pkmn, 'status') and active_pkmn.status else None, 'boosts': dict(getattr(active_pkmn, 'boosts', {})) if active_pkmn else {}, 'ability': str(active_pkmn.ability) if active_pkmn and hasattr(active_pkmn, 'ability') and active_pkmn.ability else None } except Exception as e: active_info = { 'species': 'error', 'types': [], 'hp_fraction': 0, 'status': None, 'boosts': {}, 'ability': None, 'error': f"Error formatting active Pokemon: {e}" } # Format opponent Pokemon info try: opponent_info = { 'species': getattr(opponent_pkmn, 'species', 'unknown') if opponent_pkmn else None, 'types': [str(t) for t in getattr(opponent_pkmn, 'types', [])] if opponent_pkmn else [], 'hp_fraction': getattr(opponent_pkmn, 'current_hp_fraction', 0) if opponent_pkmn else 0, 'status': getattr(opponent_pkmn.status, 'name', None) if opponent_pkmn and hasattr(opponent_pkmn, 'status') and opponent_pkmn.status else None, 'boosts': dict(getattr(opponent_pkmn, 'boosts', {})) if opponent_pkmn else {}, 'ability': str(opponent_pkmn.ability) if opponent_pkmn and hasattr(opponent_pkmn, 'ability') and opponent_pkmn.ability else None } except Exception as e: opponent_info = { 'species': 'error', 'types': [], 'hp_fraction': 0, 'status': None, 'boosts': {}, 'ability': None, 'error': f"Error formatting opponent Pokemon: {e}" } # Format available moves available_moves = [] if battle.available_moves: for move in battle.available_moves: try: available_moves.append({ 'id': move.id, 'name': getattr(move, 'name', move.id), # Fallback to id if no name 'type': str(move.type) if hasattr(move, 'type') else 'unknown', 'base_power': getattr(move, 'base_power', 0), 'accuracy': getattr(move, 'accuracy', 100), 'pp': f"{getattr(move, 'current_pp', '?')}/{getattr(move, 'max_pp', '?')}", 'category': getattr(move.category, 'name', 'unknown') if hasattr(move, 'category') else 'unknown', 'description': getattr(move, 'description', '') or "" }) except Exception as e: # If there's any error with a specific move, add minimal info available_moves.append({ 'id': getattr(move, 'id', 'unknown'), 'name': str(move), 'type': 'unknown', 'base_power': 0, 'accuracy': 100, 'pp': '?/?', 'category': 'unknown', 'description': f"Error formatting move: {e}" }) # Format available switches available_switches = [] if battle.available_switches: for pkmn in battle.available_switches: try: available_switches.append({ 'species': getattr(pkmn, 'species', 'unknown'), 'hp_fraction': getattr(pkmn, 'current_hp_fraction', 0), 'status': getattr(pkmn.status, 'name', None) if hasattr(pkmn, 'status') and pkmn.status else None, 'types': [str(t) for t in getattr(pkmn, 'types', [])] }) except Exception as e: available_switches.append({ 'species': 'error', 'hp_fraction': 0, 'status': None, 'types': [], 'error': f"Error formatting switch: {e}" }) # Safely build the return dictionary try: result = { 'battle_id': getattr(battle, 'battle_tag', 'unknown'), 'turn': getattr(battle, 'turn', 0), 'active_pokemon': active_info, 'opponent_pokemon': opponent_info, 'available_moves': available_moves, 'available_switches': available_switches, 'weather': str(battle.weather) if hasattr(battle, 'weather') and battle.weather else None, 'fields': [str(field) for field in getattr(battle, 'fields', [])], 'side_conditions': [str(cond) for cond in getattr(battle, 'side_conditions', [])], 'opponent_side_conditions': [str(cond) for cond in getattr(battle, 'opponent_side_conditions', [])], 'force_switch': getattr(battle, 'force_switch', False), 'can_z_move': getattr(battle, 'can_z_move', False), 'can_dynamax': getattr(battle, 'can_dynamax', False), 'can_mega_evolve': getattr(battle, 'can_mega_evolve', False) } return result except Exception as e: # Fallback minimal battle state return { 'battle_id': str(battle) if battle else 'error', 'turn': 0, 'active_pokemon': active_info, 'opponent_pokemon': opponent_info, 'available_moves': available_moves, 'available_switches': available_switches, 'weather': None, 'fields': [], 'side_conditions': [], 'opponent_side_conditions': [], 'force_switch': False, 'can_z_move': False, 'can_dynamax': False, 'can_mega_evolve': False, 'formatting_error': f"Error formatting battle state: {e}" } async def _start_ladder_task(player, username: str): """Background task to start ladder search - fire and forget""" try: await player.ladder(1) # Play 1 ladder battle print(f"Ladder search started for {username}") except Exception as e: print(f"Failed to start ladder search for {username}: {e}") def start_ladder_battle(username: str) -> Dict[str, str]: """ Start a ladder battle for the MCP-controlled player (fire-and-forget). Args: username (str): Username for the MCP player Returns: dict: Status information about the ladder request """ if username in player_instances: player = player_instances[username] else: player = MCPPokemonAgent(username) player_instances[username] = player # Start ladder battle (this will connect to showdown and find a match) try: # Start ladder search in background task (fire-and-forget) asyncio.create_task(_start_ladder_task(player, username)) # Return immediately return { 'status': 'ladder_search_queued', 'player_username': username, 'message': f'Ladder search queued for {username}. Use find_recent_battles() or get_player_status() to check when match is found.' } except Exception as e: # Clean up player instance on failure if username in player_instances: del player_instances[username] raise Exception(f"Failed to queue ladder search: {str(e)}") async def start_battle_against_agent(username: str, opponent_agent, battle_format: str = "gen9randombattle") -> str: """ Start a battle against a specific agent. Args: username (str): Username for the MCP player opponent_agent: The opponent agent to battle against battle_format (str): Battle format Returns: str: Battle ID """ if username in player_instances: player = player_instances[username] else: player = MCPPokemonAgent(username) player_instances[username] = player try: # Start battle against opponent await player.battle_against(opponent_agent, n_battles=1) # Wait for battle to start with shorter intervals for responsiveness max_wait_time = 10 # Maximum 10 seconds check_interval = 0.5 # Check every 0.5 seconds wait_time = 0 while wait_time < max_wait_time: await asyncio.sleep(check_interval) wait_time += check_interval # Check if a new battle has started if player.battles: battle_id = list(player.battles.keys())[-1] battle = player.battles[battle_id] # Store battle info active_battles[battle_id] = { 'type': 'agent', 'player_username': username, 'opponent': opponent_agent.username, 'battle_state': format_battle_state(battle), 'waiting_for_move': False, 'completed': False, 'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}" } return battle_id # If we get here, no battle started within timeout raise Exception(f"Battle against {opponent_agent.username} requested, but no battle started within {max_wait_time} seconds.") except Exception as e: raise Exception(f"Failed to start battle against agent: {str(e)}") async def _send_challenge_task(player, opponent_username: str): """Background task to send challenge - fire and forget""" try: await player.send_challenges(opponent_username, n_challenges=1) print(f"Challenge sent to {opponent_username}") except Exception as e: print(f"Failed to send challenge to {opponent_username}: {e}") def start_battle_against_player(username: str, opponent_username: str) -> Dict[str, str]: """ Start a battle against a specific player (fire-and-forget). Args: username (str): Username for the MCP player opponent_username (str): Username of the opponent Returns: dict: Status information about the challenge request """ if username in player_instances: player = player_instances[username] else: player = MCPPokemonAgent(username) player_instances[username] = player try: # Start challenge in background task (fire-and-forget) asyncio.create_task(_send_challenge_task(player, opponent_username)) # Return immediately return { 'status': 'challenge_queued', 'player_username': username, 'opponent': opponent_username, 'message': f'Challenge to {opponent_username} queued. Use find_recent_battles() or get_player_status() to check if battle started.' } except Exception as e: # Clean up player instance on failure if username in player_instances: del player_instances[username] raise Exception(f"Failed to queue challenge to {opponent_username}: {str(e)}") async def submit_move_for_battle(battle_id: str, move_name: str = None, pokemon_name: str = None) -> str: """ Submit a move or switch for a specific battle. Args: battle_id (str): The battle ID move_name (str, optional): Name of move to use pokemon_name (str, optional): Name of Pokemon to switch to Returns: str: Result message """ if battle_id not in active_battles: raise ValueError(f"Battle {battle_id} not found") battle_info = active_battles[battle_id] username = battle_info['player_username'] if username not in player_instances: raise ValueError(f"Player {username} not found") player = player_instances[username] # Find the battle object battle = None for battle_obj in player.battles.values(): if battle_obj.battle_tag == battle_id: battle = battle_obj break if not battle: raise ValueError(f"Battle object for {battle_id} not found") # Submit the move result = await player.submit_move(move_name=move_name, pokemon_name=pokemon_name, battle=battle) # Update battle state active_battles[battle_id]['battle_state'] = format_battle_state(battle) return result def get_battle_state(battle_id: str) -> Dict[str, Any]: """ Get the current state of a battle. Args: battle_id (str): The battle ID Returns: dict: Current battle state """ if battle_id not in active_battles: raise ValueError(f"Battle {battle_id} not found") return active_battles[battle_id] def list_active_battles() -> List[Dict[str, Any]]: """ List all active battles. Returns: list: List of active battle info """ return [ { 'battle_id': battle_id, 'type': info['type'], 'opponent': info['opponent'], 'waiting_for_move': info['waiting_for_move'], 'completed': info['completed'], 'battle_url': info.get('battle_url', f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}") } for battle_id, info in active_battles.items() ] def check_recent_battles(username: str) -> List[Dict[str, Any]]: """ Check for recent battles that may have started after a timeout. Args: username (str): Username to check battles for Returns: list: List of recent battle info """ if username not in player_instances: return [] player = player_instances[username] recent_battles = [] for battle_id, battle in player.battles.items(): if battle_id not in active_battles: # This is a new battle that wasn't tracked yet battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}" battle_info = { 'battle_id': battle_id, 'battle_url': battle_url, 'opponent': getattr(battle, 'opponent_username', 'unknown'), 'format': battle.format if hasattr(battle, 'format') else 'unknown', 'turn': battle.turn if hasattr(battle, 'turn') else 0, 'battle_state': format_battle_state(battle) } # Add to active battles tracking active_battles[battle_id] = { 'type': 'recovered', 'player_username': username, 'opponent': battle_info['opponent'], 'battle_state': battle_info['battle_state'], 'waiting_for_move': False, 'completed': False, 'battle_url': battle_url } recent_battles.append(battle_info) return recent_battles async def download_battle_replay(battle_id: str) -> str: """ Download the replay for a completed battle. Args: battle_id (str): The battle ID Returns: str: Path to the replay file or replay content """ if battle_id not in active_battles: raise ValueError(f"Battle {battle_id} not found") battle_info = active_battles[battle_id] username = battle_info['player_username'] if username not in player_instances: raise ValueError(f"Player {username} not found") player = player_instances[username] # Find the battle object battle = None for battle_obj in player.battles.values(): if battle_obj.battle_tag == battle_id: battle = battle_obj break if not battle: raise ValueError(f"Battle object for {battle_id} not found") # Create replay directory with proper permissions # Use /tmp for temporary files or create in current directory with explicit permissions replay_dir = os.path.join(os.getcwd(), "battle_replays") try: os.makedirs(replay_dir, mode=0o755, exist_ok=True) except PermissionError: # Fallback to /tmp if we can't write to current directory replay_dir = "/tmp/battle_replays" os.makedirs(replay_dir, mode=0o755, exist_ok=True) # Look for replay file potential_files = [ f"{replay_dir}/{battle_id}.html", f"{replay_dir}/{battle_id}.txt", f"{replay_dir}/{battle.battle_tag}.html", f"{replay_dir}/{battle.battle_tag}.txt" ] for file_path in potential_files: if os.path.exists(file_path): return file_path # If no replay file found, return the battle's replay data if available if hasattr(battle, 'replay') and battle.replay: replay_path = os.path.join(replay_dir, f"{battle_id}_replay.txt") try: with open(replay_path, 'w') as f: f.write(str(battle.replay)) return replay_path except PermissionError: raise ValueError(f"Permission denied when writing replay file to {replay_path}. Check directory permissions.") raise ValueError(f"No replay found for battle {battle_id}") def cleanup_completed_battles(): """Clean up completed battles to free memory.""" global active_battles active_battles = { battle_id: info for battle_id, info in active_battles.items() if not info.get('completed', False) } def debug_move_attributes(battle) -> Dict[str, Any]: """Debug function to inspect Move object attributes""" debug_info = { "available_moves_count": len(battle.available_moves) if hasattr(battle, 'available_moves') and battle.available_moves else 0, "move_attributes": [] } if hasattr(battle, 'available_moves') and battle.available_moves: for i, move in enumerate(battle.available_moves[:3]): # Check first 3 moves only try: move_attrs = { "move_index": i, "move_type": type(move).__name__, "available_attributes": [attr for attr in dir(move) if not attr.startswith('_')], "str_representation": str(move), "has_id": hasattr(move, 'id'), "has_name": hasattr(move, 'name'), "id_value": getattr(move, 'id', 'NO_ID'), "name_value": getattr(move, 'name', 'NO_NAME') if hasattr(move, 'name') else 'NO_NAME_ATTR' } debug_info["move_attributes"].append(move_attrs) except Exception as e: debug_info["move_attributes"].append({ "move_index": i, "error": str(e) }) return debug_info