import asyncio import os import json from typing import List, Dict, Any import gradio as gr from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from anthropic import Anthropic, APIError from dotenv import load_dotenv import cloudinary import cloudinary.uploader from contextlib import AsyncExitStack import requests import base64 load_dotenv() cloudinary.config(cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"), api_key=os.getenv("CLOUDINARY_API_KEY"), api_secret=os.getenv("CLOUDINARY_API_SECRET"), secure=True) try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) def get_image_media_type_and_data(image_url: str): try: response = requests.get(image_url, timeout=10) response.raise_for_status() content_type = response.headers.get('Content-Type', 'image/jpeg') image_data = base64.b64encode(response.content).decode('utf-8') return content_type, image_data except requests.exceptions.RequestException as e: print(f"Error fetching image: {e}") return None, None class MCPClientWrapper: def __init__(self): self.session: ClientSession | None = None self.exit_stack: AsyncExitStack | None = None self.tools: List[Dict[str, Any]] = [] self.anthropic = Anthropic() def connect(self, server_path: str) -> str: return loop.run_until_complete(self._connect(server_path)) async def _connect(self, server_path: str) -> str: if self.exit_stack: await self.exit_stack.aclose() self.exit_stack = AsyncExitStack() try: server_params = StdioServerParameters(command="python3", args=[server_path], env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"}) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() response = await self.session.list_tools() self.tools = [{"name": tool.name, "description": tool.description, "input_schema": tool.inputSchema} for tool in response.tools] print(f"DEBUG: Tools reported by MCP server: {[t['name'] for t in self.tools]}") return f"✅ Connected. Available tools: {', '.join([t['name'] for t in self.tools])}" except Exception as e: print(f"Failed to connect: {e}") self.session = None if self.exit_stack: await self.exit_stack.aclose(); self.exit_stack = None return f"❌ Connection Failed: {e}" def process_message(self, multi_modal_data: Dict[str, Any], history: List[Dict[str, Any]]) -> tuple: user_turn_content_for_gradio_display_parts = [] cloudinary_image_url_for_api = None user_text_input = multi_modal_data.get("text", "") user_files_input = multi_modal_data.get("files") if not self.session: updated_history = list(history) temp_display_parts_for_no_session = [] if user_text_input: temp_display_parts_for_no_session.append(user_text_input) if user_files_input: temp_display_parts_for_no_session.append("[Image Uploaded - Connection Error]") user_final_content_for_history = "\n".join(temp_display_parts_for_no_session) if temp_display_parts_for_no_session else "[Empty Input]" if user_final_content_for_history != "[Empty Input]": updated_history.append({"role": "user", "content": user_final_content_for_history}) updated_history.append({"role": "assistant", "content": "⚠️ Please connect to an MCP server first."}) return updated_history, gr.MultimodalTextbox(value=None) if user_text_input: user_turn_content_for_gradio_display_parts.append(user_text_input) if user_files_input and len(user_files_input) > 0: try: print(f"DEBUG: Uploading file {user_files_input[0]} to Cloudinary for display and API.") upload_result = cloudinary.uploader.upload(user_files_input[0], width=300, height=300, crop="limit", secure=True) cloudinary_image_url_for_api = upload_result.get("secure_url") if cloudinary_image_url_for_api: user_turn_content_for_gradio_display_parts.append(f"![User Uploaded Image]({cloudinary_image_url_for_api})") else: user_turn_content_for_gradio_display_parts.append("[Image Uploaded (URL retrieval failed)]") except Exception as e: print(f"Error uploading image to Cloudinary: {e}") user_turn_content_for_gradio_display_parts.append(f"[Image Upload Failed: {e}]") user_content_for_gradio_history_turn = "\n".join(user_turn_content_for_gradio_display_parts) if not user_content_for_gradio_history_turn.strip() and not (user_files_input and cloudinary_image_url_for_api) : user_content_for_gradio_history_turn = "[Empty Input]" history_for_llm_processing = list(history) if user_content_for_gradio_history_turn != "[Empty Input]": history_for_llm_processing.append({"role": "user", "content": user_content_for_gradio_history_turn}) assistant_response_text = loop.run_until_complete( self._process_query(multi_modal_data, history_for_llm_processing, cloudinary_image_url_for_api) ) final_gradio_history = list(history) if user_content_for_gradio_history_turn != "[Empty Input]": final_gradio_history.append({"role": "user", "content": user_content_for_gradio_history_turn}) if assistant_response_text: final_gradio_history.append({"role": "assistant", "content": assistant_response_text}) return final_gradio_history, gr.MultimodalTextbox(value=None) async def _process_query( self, multi_modal_data_from_user: Dict[str, Any], current_complete_history_for_llm: List[Dict[str, Any]], uploaded_image_url_for_api: str | None = None ) -> str: system_prompt = """ You are an intelligent travel assistant. Your goal is to help users by using the available tools: `summarize_trip`, `web_search`, `enhanced_personalized_travel_planner`, `travel_data_analyzer` and `fetch_trips_by_location`. - If the user asks to **plan a NEW trip** (e.g., "Plan a trip to Paris for Tracy", "Plan a trip to Paris for Tracy for 5 days in early August. The Louvre and a specific French restaurant are must-sees, and I'd like to see an opera at night. We'll be flying Finnair."): 1. First, consider if current, real-time information (from `web_search`) could enhance the plan or is necessary for its viability (e.g., for specific dates, named entities like attractions, airlines, or if user implies need for current status). 2. If you decide to use `web_search`, call it first. Collect its output as `web_intelligence`. 3. Then, you MUST call the `enhanced_personalized_travel_planner` tool. Provide it with all standard planning details (user_name, destination, duration, user_request with all specifics like Louvre, Finnair, opera, restaurants) AND the `web_intelligence` (if gathered, otherwise pass an empty string or null equivalent). 4. CRITICAL: The raw text output from `enhanced_personalized_travel_planner` tool IS THE FINAL AND ONLY RESPONSE to the user for this planning request. Do NOT add any of your own text before or after it. Do NOT summarize it. Do NOT rephrase its opening (e.g., if planner says "Great news...", you do not say that again). - If the user uploads an image: 1. Analyze the image to identify the location. You MUST state the identified location first (e.g., "This image appears to be of Helsinki..."). 2. If the query asks about past trips for this location, use `fetch_trips_by_location`. After receiving the tool's output, summarize its key findings for the user. Do not show the raw tool output. 3. If the image is accompanied by a request to plan a new trip, follow the "plan a NEW trip" logic above. - If the user asks to **summarize or recall a past trip**, use `summarize_trip`. Summarize its output. - If a user asks for **trips to a specific location** without an image, use `fetch_trips_by_location`. Summarize its output. - If a user asks general questions requiring analysis across many travel records, like "Who is the most active traveler?", "Who went farthest from Paris?", "What are common trip themes to Italy?", or "Who has visited Paris the most times?", use the `travel_data_analyzer` tool. Think step-by-step. For tools OTHER THAN `enhanced_personalized_travel_planner`, after they return, you should generate a user-friendly response based on their output. If no tool is appropriate, respond conversationally. """ messages_for_llm_api_call = [] for i, msg_from_history in enumerate(current_complete_history_for_llm): role = msg_from_history.get("role") content_value_from_history = msg_from_history.get("content") if not role or content_value_from_history is None: print(f"WARNING: Skipping invalid message from history (index {i}): {msg_from_history}") continue if i == len(current_complete_history_for_llm) - 1 and role == "user": # This is the current user turn, reconstruct its 'content' for the API current_turn_api_user_content_blocks = [] original_user_text = multi_modal_data_from_user.get("text", "") if uploaded_image_url_for_api: try: media_type, image_data = get_image_media_type_and_data(uploaded_image_url_for_api) if image_data: current_turn_api_user_content_blocks.append({"type": "image", "source": {"type": "base64", "media_type": media_type, "data": image_data}}) except Exception as e: print(f"Error converting uploaded image URL to base64 for API: {e}") return f"Error preparing image for assistant: {e}" text_for_api = original_user_text if not text_for_api and uploaded_image_url_for_api: text_for_api = "I've uploaded an image. Please analyze it, identify the location if possible, state where it is and then take the most relevant travel-related action using your tools (`summarize_trip`, `enhanced_personalized_travel_planner`, `fetch_trips_by_location`). If no tool is relevant, describe the image and ask how you can help." if text_for_api: current_turn_api_user_content_blocks.append({"type": "text", "text": text_for_api}) elif not current_turn_api_user_content_blocks and uploaded_image_url_for_api: current_turn_api_user_content_blocks.append({"type": "text", "text": " "}) if current_turn_api_user_content_blocks: messages_for_llm_api_call.append({"role": "user", "content": current_turn_api_user_content_blocks}) else: api_content_blocks = [] if isinstance(content_value_from_history, str): if content_value_from_history.startswith("![") and "](" in content_value_from_history: api_content_blocks.append({"type": "text", "text": f"[User previously displayed an image: {content_value_from_history}]"}) else: api_content_blocks.append({"type": "text", "text": content_value_from_history}) elif isinstance(content_value_from_history, list): api_content_blocks = content_value_from_history else: api_content_blocks.append({"type": "text", "text": str(content_value_from_history)}) if api_content_blocks: messages_for_llm_api_call.append({"role": role, "content": api_content_blocks}) MAX_TURNS = 3 for i in range(MAX_TURNS): try: print(f"--- Agent thinking... (Turn {i+1}) ---") response = self.anthropic.messages.create( model="claude-3-5-sonnet-20240620", max_tokens=4000, system=system_prompt, messages=messages_for_llm_api_call, tools=self.tools ) except APIError as e: return f"❌ API Error: {e.message}" except Exception as e: return f"❌ Error calling Claude API: {e}" if response.content: messages_for_llm_api_call.append({"role": "assistant", "content": response.content}) else: return "❌ Claude API returned empty content." if response.stop_reason == "tool_use": tool_results_for_llm_next_turn = [] assistant_response_for_this_gradio_turn = "" planner_output_final = None for tool_call in response.content: if tool_call.type == 'tool_use': tool_name, tool_input = tool_call.name, tool_call.input claude_text_associated_with_this_tool_call_sequence = "\n".join([c.text for c in response.content if c.type == 'text']).strip() print(f"--- Agent executing tool: {tool_name} with input: {tool_input} ---") try: tool_result_resp = await self.session.call_tool(tool_name, tool_input) tool_output_text = tool_result_resp.content[0].text if tool_result_resp.content and tool_result_resp.content[0].text else json.dumps(tool_result_resp.content) if tool_name == "enhanced_personalized_travel_planner": print(f"DEBUG: enhanced_personalized_travel_planner output IS THE FINAL response. Discarding preceding Claude text.") planner_output_final = tool_output_text break else: if not assistant_response_for_this_gradio_turn and claude_text_associated_with_this_tool_call_sequence: assistant_response_for_this_gradio_turn += claude_text_associated_with_this_tool_call_sequence + "\n\n" assistant_response_for_this_gradio_turn += f"🛠️ Using tool `{tool_name}`...\n" print(f"DEBUG: Output from `{tool_name}` (raw for next LLM turn):\n{tool_output_text[:300]}...") tool_results_for_llm_next_turn.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": tool_output_text}) except Exception as e: tool_error_msg = f"Error executing tool {tool_name}: {e}"; print(f"ERROR: {tool_error_msg}") tool_results_for_llm_next_turn.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": tool_error_msg, "is_error": True}) if not assistant_response_for_this_gradio_turn and claude_text_associated_with_this_tool_call_sequence: assistant_response_for_this_gradio_turn += claude_text_associated_with_this_tool_call_sequence + "\n\n" assistant_response_for_this_gradio_turn += f"⚠️ Error using `{tool_name}`: {tool_error_msg}\n" if planner_output_final is not None: return planner_output_final if not tool_results_for_llm_next_turn: return assistant_response_for_this_gradio_turn.strip() or "An issue occurred: no tool results were prepared." if tool_results_for_llm_next_turn: messages_for_llm_api_call.append({"role": "user", "content": tool_results_for_llm_next_turn}) if i == MAX_TURNS - 1: return assistant_response_for_this_gradio_turn.strip() or "Reached max turns while processing non-planner tools." else: final_text_from_claude = "\n".join([c.text for c in response.content if c.type == 'text']).strip() return final_text_from_claude or "I've finished processing." return "I seem to be stuck after multiple turns. Could you please rephrase your request?" def gradio_interface(): client_wrapper = MCPClientWrapper() with gr.Blocks(theme=gr.themes.Soft(), title="Persona Trip Agent") as demo: gr.Markdown("## ✈️ Persona Trip Agent\nUpload an image of a past trip to summarize it, or ask me to plan a new one!") with gr.Row(): with gr.Column(scale=3): server_path = gr.Textbox(label="Local MCP Server File", value="server.py") with gr.Column(scale=1): connect_btn = gr.Button("🔌 Connect to Server", variant="primary") status = gr.Textbox(label="Connection Status", interactive=False) chatbot = gr.Chatbot(label="Chat History", type="messages", height=600, show_copy_button=True, avatar_images=("assets/user.png", "assets/bot.png")) multimodal_input = gr.MultimodalTextbox(file_types=["image"], placeholder="e.g., 'What did Tracy do here?' (with image) or 'Plan a 5-day trip to Tokyo for Tracy'", show_label=False, interactive=True) clear_btn = gr.Button("🧹 Clear Chat") clear_btn.click(fn=lambda: ([], None), outputs=[chatbot, multimodal_input]) connect_btn.click(fn=client_wrapper.connect, inputs=server_path, outputs=status) multimodal_input.submit(fn=client_wrapper.process_message, inputs=[multimodal_input, chatbot], outputs=[chatbot, multimodal_input]) return demo if __name__ == "__main__": if not os.getenv("ANTHROPIC_API_KEY"): print("🔴 WARNING: ANTHROPIC_API_KEY environment variable not set. Claude API calls will fail.") if not (os.getenv("CLOUDINARY_CLOUD_NAME") and os.getenv("CLOUDINARY_API_KEY") and os.getenv("CLOUDINARY_API_SECRET")): print("🟡 WARNING: Cloudinary environment variables not fully set. Image uploads might fail if used.") interface = gradio_interface() interface.launch(debug=True)