|
import os |
|
from dotenv import load_dotenv |
|
from openai.types.responses import ResponseTextDeltaEvent |
|
from agents import Agent, Runner, AsyncOpenAI, OpenAIChatCompletionsModel |
|
from agents.mcp import MCPServerSse |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
SERVER_URL = "https://agents-mcp-hackathon-travel-planner-mcp.hf.space/gradio_api/mcp/sse" |
|
|
|
instructions = """ |
|
You are an expert travel planner. |
|
Given a user query, resolve it, if needed you will use the connected MCP tool to assist with travel planning tasks. |
|
|
|
|
|
Your tasks include: |
|
|
|
1. Search for flight options using the tools from the mcp server, |
|
2. Search for hotel options using the tools from the mcp server, keeping in mind the user's preferences. |
|
3. Once you've decided the best options based on the user query, for each flight and hotel, include the booking link, for hotels you may include thumbnail image links. |
|
4. Estimate the total budget (flight + hotel for the stay). |
|
4. Suggest a list of fun activities at the destination based on your knowledge about the destination. |
|
5. Format the entire report in Markdown with clear headings and bullet points. |
|
|
|
any other suggestions which make the trip fun are also welcome. |
|
""" |
|
|
|
mcp_server = MCPServerSse({"url": SERVER_URL}) |
|
model = OpenAIChatCompletionsModel( |
|
model="Qwen/Qwen3-235B-A22B", |
|
openai_client=AsyncOpenAI(base_url="https://api.studio.nebius.com/v1/", api_key=os.getenv("NEBIUS_API_KEY")) |
|
) |
|
async def get_itinerary(user_query, history): |
|
try: |
|
await mcp_server.connect() |
|
agent = Agent( |
|
name="MCP Agent", |
|
instructions=instructions, |
|
mcp_servers=[mcp_server], |
|
model=model, |
|
) |
|
|
|
task = "You are a helpful assistant. Use the connected tool to assist with tasks." |
|
task += f"\n\nUser Query: {user_query}\n\n" |
|
|
|
think_buffer = "" |
|
response_buffer = "" |
|
showing_thought = False |
|
|
|
result = Runner.run_streamed(agent, task) |
|
async for event in result.stream_events(): |
|
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): |
|
token = event.data.delta |
|
|
|
if "<think>" in token: |
|
showing_thought = True |
|
think_buffer = "" |
|
continue |
|
elif "</think>" in token: |
|
showing_thought = False |
|
yield f"π€ *Thinking:* {think_buffer}\n" |
|
continue |
|
|
|
if showing_thought: |
|
think_buffer += token |
|
if len(think_buffer) > 0: |
|
yield f"π€ *Thinking:* {think_buffer}" |
|
else: |
|
response_buffer += token |
|
if len(response_buffer) > 0: |
|
yield f"{response_buffer}" |
|
|
|
finally: |
|
await mcp_server.cleanup() |
|
|