agent_course / agent /handlers.py
kamil1300's picture
Upload 9 files
6627dda verified
import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime
import pytz
from PIL import Image
from io import BytesIO
import tempfile
import os
import speech_recognition as sr
from ddgs import DDGS
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch
# Lazy import pydub to avoid ffmpeg warning when not needed
# from pydub import AudioSegment
# handlers for tools
# 1. get_weather
def get_weather(latitude : str, longitude : str) -> str:
"""Get current temperature for a given location using latitude and longitude coordinates."""
api = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&current=temperature_2m,wind_speed_10m&hourly=temperature_2m,relative_humidity_2m,wind_speed_10m"
response = requests.get(api)
return response.json()["current"]["temperature_2m"]
# 2. web_search
def web_search(query: str, max_results: int = 5) -> str:
"""Enhanced web search with better error handling"""
try:
results_text = []
with DDGS() as ddgs:
# Add more specific search parameters
results = ddgs.text(
query,
max_results=max_results,
region='us-en',
safesearch='off'
)
for i, result in enumerate(results):
title = result.get("title", "")
snippet = result.get("body", "")
link = result.get("href", "")
# Clean up the text
title = title.strip() if title else ""
snippet = snippet.strip() if snippet else ""
if title or snippet:
results_text.append(f"{i+1}. {title}\n🔗 {link}\n📝 {snippet}\n")
if results_text:
return "\n".join(results_text)
else:
# Try alternative search approach
return f"No results found for: {query}. Try rephrasing your search query."
except Exception as e:
return f"Search error: {str(e)}. Please try a different search query."
# 3. get_time_in_location
def get_current_time(timezone: str = "Asia/Kolkata") -> str:
"""Get current time in specified timezone."""
try:
# Convert timezone string to pytz timezone object
tz = pytz.timezone(timezone)
current_time = datetime.now(tz)
# Format the time nicely
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S %Z")
return f"Current time in {timezone}: {formatted_time}"
except Exception as e:
return f"Error getting time: {str(e)}"
def get_time_in_location(location: str) -> str:
"""Get current time for a specific location."""
# Map common locations to timezones
location_timezones = {
"gujarat": "Asia/Kolkata",
"india": "Asia/Kolkata",
"mumbai": "Asia/Kolkata",
"delhi": "Asia/Kolkata",
"paris": "Europe/Paris",
"london": "Europe/London",
"new york": "America/New_York",
"tokyo": "Asia/Tokyo",
"sydney": "Australia/Sydney",
"utc": "UTC"
}
location_lower = location.lower()
if location_lower in location_timezones:
timezone = location_timezones[location_lower]
return get_current_time(timezone)
else:
return f"I don't have timezone information for '{location}'. Try: Gujarat, India, Mumbai, Delhi, Paris, London, New York, Tokyo, Sydney, or UTC."
# 4. analyze_image
def analyze_image(image_url: str, question: str = None) -> str:
"""Analyze an image and optionally answer a visual question about it."""
try:
# Download the image
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(image_url, headers=headers, timeout=30)
response.raise_for_status()
# Open image
image = Image.open(BytesIO(response.content)).convert('RGB')
# Load BLIP model for visual QA or captioning
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-vqa-base")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
if question:
inputs = processor(image, question, return_tensors="pt").to(device)
out = model.generate(**inputs)
answer = processor.decode(out[0], skip_special_tokens=True)
return f"❓ Question: {question}\n🧠 Answer: {answer}"
else:
# If no question, generate a caption
processor_caption = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model_caption = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
model_caption.to(device)
inputs = processor_caption(image, return_tensors="pt").to(device)
out = model_caption.generate(**inputs)
caption = processor_caption.decode(out[0], skip_special_tokens=True)
return f"🖼️ Image Caption: {caption}"
except Exception as e:
return f"Error analyzing image: {e}"
# 5. analyze_video
def analyze_video(video_url: str) -> str:
"""Analyze YouTube video metadata and attempt to fetch subtitles/transcript."""
try:
# Validate YouTube URL and extract video ID
video_id_match = re.search(r'(?:youtube\.com\/(?:watch\?v=|shorts\/)|youtu\.be\/)([^&\n?#\/]+)', video_url)
if not video_id_match:
return "Invalid YouTube URL format. Please provide a valid YouTube or Shorts URL."
video_id = video_id_match.group(1)
headers = {
"User-Agent": "Mozilla/5.0"
}
response = requests.get(video_url, headers=headers, timeout=20)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract title
title = None
for selector in ['meta[property="og:title"]', 'meta[name="title"]', 'title']:
el = soup.select_one(selector)
if el:
title = el.get('content') or el.text
break
title_text = title.strip() if title else "Unknown title"
# Extract description
description = None
for selector in ['meta[property="og:description"]', 'meta[name="description"]']:
el = soup.select_one(selector)
if el:
description = el.get('content') or el.text
break
desc_text = description.strip() if description else "No description available"
# Extract channel
channel = None
for selector in ['link[itemprop="name"]', 'meta[itemprop="channelId"]', 'a[href*="/@"]']:
el = soup.select_one(selector)
if el:
channel = el.get('content') or el.text or el.get('title')
break
channel_text = channel.strip() if channel else "Unknown channel"
# Attempt to fetch transcript using YouTubeTranscriptApi
transcript_text = "Transcript not available."
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = "\n".join([f"{line['text']}" for line in transcript[:5]]) + "\n..."
except (TranscriptsDisabled, NoTranscriptFound):
transcript_text = "No transcript available for this video."
except Exception as e:
transcript_text = f"Transcript fetch failed: {e}"
# Final analysis summary
result = f"📺 Video Analysis\n"
result += f"Title: {title_text}\n"
result += f"Channel: {channel_text}\n"
result += f"Description: {desc_text[:300]}...\n"
result += f"Transcript Preview:\n{transcript_text}"
return result
except Exception as e:
return f"Error analyzing video: {e}"
# 6. transcribe_audio
def transcribe_audio(audio_url: str) -> str:
"""Transcribe audio content from an audio file URL using speech recognition."""
try:
# Download the audio file
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
response = requests.get(audio_url, headers=headers, timeout=30)
response.raise_for_status()
# Create a temporary file to store the audio
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_audio:
temp_audio.write(response.content)
temp_audio_path = temp_audio.name
try:
# Try to convert audio to WAV format (speech_recognition works better with WAV)
try:
# Lazy import pydub only when needed
from pydub import AudioSegment
audio = AudioSegment.from_file(temp_audio_path)
# Create temporary WAV file
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_wav:
audio.export(temp_wav.name, format="wav")
wav_path = temp_wav.name
except Exception as e:
# If conversion fails, try using the original file directly
print(f"Audio conversion warning: {e}")
wav_path = temp_audio_path
# Initialize speech recognizer
recognizer = sr.Recognizer()
# Load the audio file
with sr.AudioFile(wav_path) as source:
# Adjust for ambient noise
recognizer.adjust_for_ambient_noise(source, duration=0.5)
# Record the audio
audio_data = recognizer.record(source)
# Try to recognize speech
try:
# Use Google Speech Recognition (free, requires internet)
text = recognizer.recognize_google(audio_data)
return f"Audio Transcription:\n{text}"
except sr.UnknownValueError:
return "Audio Transcription: Could not understand the audio. The speech may be unclear, too quiet, or in a language not supported."
except sr.RequestError as e:
return f"Audio Transcription: Error with speech recognition service: {str(e)}"
finally:
# Clean up temporary files
try:
os.unlink(temp_audio_path)
if wav_path != temp_audio_path: # Only delete wav file if it's different
os.unlink(wav_path)
except:
pass
except ImportError:
return "Audio Transcription: Speech recognition library not available. Please install 'SpeechRecognition' package."
except Exception as e:
return f"Audio Transcription Error: {str(e)}"