mjschock's picture
Refactor imports in main_v2.py to include SmartSearchTool, enhance code organization in model_factory.py by removing unnecessary whitespace, and improve formatting in test_questions.py for better readability and consistency.
352f525 unverified
raw
history blame contribute delete
4.71 kB
import logging
import os
import requests
import yaml
from dotenv import find_dotenv, load_dotenv
from litellm._logging import _disable_debugging
from openinference.instrumentation.smolagents import SmolagentsInstrumentor
from phoenix.otel import register
# from smolagents import CodeAgent, LiteLLMModel, LiteLLMRouterModel
from smolagents import CodeAgent, LiteLLMModel
from smolagents.monitoring import LogLevel
from model_factory import ModelFactory
from tools.smart_search.tool import SmartSearchTool
_disable_debugging()
# Configure OpenTelemetry with Phoenix
register()
SmolagentsInstrumentor().instrument()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# load_dotenv(find_dotenv())
# API_BASE = os.getenv("API_BASE")
# API_KEY = os.getenv("API_KEY")
# MODEL_ID = os.getenv("MODEL_ID")
# Create model using the factory
model = ModelFactory.create_model()
# data_agent = create_data_agent(model)
# media_agent = create_media_agent(model)
# web_agent = create_web_agent(model)
# search_agent = ToolCallingAgent(
# tools=[DuckDuckGoSearchTool(), VisitWebpageTool()],
# model=model,
# name="search_agent",
# description="This is an agent that can do web search.",
# )
prompt_templates = yaml.safe_load(open("prompts/code_agent_modified.yaml", "r"))
agent = CodeAgent(
# add_base_tools=True,
# additional_authorized_imports=[
# "json",
# "pandas",
# "numpy",
# "re",
# # "requests"
# # "urllib.request",
# ],
# max_steps=10,
# managed_agents=[web_agent, data_agent, media_agent],
# managed_agents=[search_agent],
model=model,
prompt_templates=prompt_templates,
tools=[
SmartSearchTool(),
# VisitWebpageTool(max_output_length=1024),
],
step_callbacks=None,
verbosity_level=LogLevel.ERROR,
)
agent.visualize()
def main(task: str):
# Format the task to request both succinct and verbose answers
formatted_task = f"""Please provide two answers to the following question:
1. A succinct answer that follows these rules:
- Contains ONLY the answer, nothing else
- Does not repeat the question
- Does not include explanations, reasoning, or context
- Does not include source attribution or references
- Does not use phrases like "The answer is" or "I found that"
- Does not include formatting, bullet points, or line breaks
- If the answer is a number, return only the number
- If the answer requires multiple items, separate them with commas
- If the answer requires ordering, maintain the specified order
- Uses the most direct and succinct form possible
2. A verbose answer that includes:
- The complete answer with all relevant details
- Explanations and reasoning
- Context and background information
- Source attribution where appropriate
Question: {task}
Please format your response as a JSON object with two keys:
- "succinct_answer": The concise answer following the rules above
- "verbose_answer": The detailed explanation with context"""
result = agent.run(
additional_args=None,
images=None,
max_steps=3,
reset=True,
stream=False,
task=formatted_task,
)
# Parse the result into a dictionary
try:
import json
# Find the JSON object in the response
json_str = result[result.find("{") : result.rfind("}") + 1]
parsed_result = json.loads(json_str)
except (ValueError, AttributeError) as e:
logger.error(f"Error parsing result: {e}")
# If parsing fails, return the raw result
return result
logger.info(f"Result: {parsed_result}")
return parsed_result["succinct_answer"]
if __name__ == "__main__":
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
for question_data in questions_data[:1]:
file_name = question_data["file_name"]
level = question_data["Level"]
question = question_data["question"]
task_id = question_data["task_id"]
logger.info(f"Question: {question}")
# logger.info(f"Level: {level}")
if file_name:
logger.info(f"File Name: {file_name}")
# logger.info(f"Task ID: {task_id}")
final_answer = main(question)
logger.info(f"Final Answer: {final_answer}")
logger.info("--------------------------------")