import requests from bs4 import BeautifulSoup import re from datetime import datetime import pytz from PIL import Image from io import BytesIO import tempfile import os import speech_recognition as sr from ddgs import DDGS from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound from transformers import BlipProcessor, BlipForConditionalGeneration import torch # Lazy import pydub to avoid ffmpeg warning when not needed # from pydub import AudioSegment # handlers for tools # 1. get_weather def get_weather(latitude : str, longitude : str) -> str: """Get current temperature for a given location using latitude and longitude coordinates.""" api = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}¤t=temperature_2m,wind_speed_10m&hourly=temperature_2m,relative_humidity_2m,wind_speed_10m" response = requests.get(api) return response.json()["current"]["temperature_2m"] # 2. web_search def web_search(query: str, max_results: int = 5) -> str: """Enhanced web search with better error handling""" try: results_text = [] with DDGS() as ddgs: # Add more specific search parameters results = ddgs.text( query, max_results=max_results, region='us-en', safesearch='off' ) for i, result in enumerate(results): title = result.get("title", "") snippet = result.get("body", "") link = result.get("href", "") # Clean up the text title = title.strip() if title else "" snippet = snippet.strip() if snippet else "" if title or snippet: results_text.append(f"{i+1}. {title}\nšŸ”— {link}\nšŸ“ {snippet}\n") if results_text: return "\n".join(results_text) else: # Try alternative search approach return f"No results found for: {query}. Try rephrasing your search query." except Exception as e: return f"Search error: {str(e)}. Please try a different search query." # 3. get_time_in_location def get_current_time(timezone: str = "Asia/Kolkata") -> str: """Get current time in specified timezone.""" try: # Convert timezone string to pytz timezone object tz = pytz.timezone(timezone) current_time = datetime.now(tz) # Format the time nicely formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S %Z") return f"Current time in {timezone}: {formatted_time}" except Exception as e: return f"Error getting time: {str(e)}" def get_time_in_location(location: str) -> str: """Get current time for a specific location.""" # Map common locations to timezones location_timezones = { "gujarat": "Asia/Kolkata", "india": "Asia/Kolkata", "mumbai": "Asia/Kolkata", "delhi": "Asia/Kolkata", "paris": "Europe/Paris", "london": "Europe/London", "new york": "America/New_York", "tokyo": "Asia/Tokyo", "sydney": "Australia/Sydney", "utc": "UTC" } location_lower = location.lower() if location_lower in location_timezones: timezone = location_timezones[location_lower] return get_current_time(timezone) else: return f"I don't have timezone information for '{location}'. Try: Gujarat, India, Mumbai, Delhi, Paris, London, New York, Tokyo, Sydney, or UTC." # 4. analyze_image def analyze_image(image_url: str, question: str = None) -> str: """Analyze an image and optionally answer a visual question about it.""" try: # Download the image headers = {"User-Agent": "Mozilla/5.0"} response = requests.get(image_url, headers=headers, timeout=30) response.raise_for_status() # Open image image = Image.open(BytesIO(response.content)).convert('RGB') # Load BLIP model for visual QA or captioning processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-vqa-base") device = "cuda" if torch.cuda.is_available() else "cpu" model.to(device) if question: inputs = processor(image, question, return_tensors="pt").to(device) out = model.generate(**inputs) answer = processor.decode(out[0], skip_special_tokens=True) return f"ā“ Question: {question}\n🧠 Answer: {answer}" else: # If no question, generate a caption processor_caption = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") model_caption = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") model_caption.to(device) inputs = processor_caption(image, return_tensors="pt").to(device) out = model_caption.generate(**inputs) caption = processor_caption.decode(out[0], skip_special_tokens=True) return f"šŸ–¼ļø Image Caption: {caption}" except Exception as e: return f"Error analyzing image: {e}" # 5. analyze_video def analyze_video(video_url: str) -> str: """Analyze YouTube video metadata and attempt to fetch subtitles/transcript.""" try: # Validate YouTube URL and extract video ID video_id_match = re.search(r'(?:youtube\.com\/(?:watch\?v=|shorts\/)|youtu\.be\/)([^&\n?#\/]+)', video_url) if not video_id_match: return "Invalid YouTube URL format. Please provide a valid YouTube or Shorts URL." video_id = video_id_match.group(1) headers = { "User-Agent": "Mozilla/5.0" } response = requests.get(video_url, headers=headers, timeout=20) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Extract title title = None for selector in ['meta[property="og:title"]', 'meta[name="title"]', 'title']: el = soup.select_one(selector) if el: title = el.get('content') or el.text break title_text = title.strip() if title else "Unknown title" # Extract description description = None for selector in ['meta[property="og:description"]', 'meta[name="description"]']: el = soup.select_one(selector) if el: description = el.get('content') or el.text break desc_text = description.strip() if description else "No description available" # Extract channel channel = None for selector in ['link[itemprop="name"]', 'meta[itemprop="channelId"]', 'a[href*="/@"]']: el = soup.select_one(selector) if el: channel = el.get('content') or el.text or el.get('title') break channel_text = channel.strip() if channel else "Unknown channel" # Attempt to fetch transcript using YouTubeTranscriptApi transcript_text = "Transcript not available." try: transcript = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = "\n".join([f"{line['text']}" for line in transcript[:5]]) + "\n..." except (TranscriptsDisabled, NoTranscriptFound): transcript_text = "No transcript available for this video." except Exception as e: transcript_text = f"Transcript fetch failed: {e}" # Final analysis summary result = f"šŸ“ŗ Video Analysis\n" result += f"Title: {title_text}\n" result += f"Channel: {channel_text}\n" result += f"Description: {desc_text[:300]}...\n" result += f"Transcript Preview:\n{transcript_text}" return result except Exception as e: return f"Error analyzing video: {e}" # 6. transcribe_audio def transcribe_audio(audio_url: str) -> str: """Transcribe audio content from an audio file URL using speech recognition.""" try: # Download the audio file headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} response = requests.get(audio_url, headers=headers, timeout=30) response.raise_for_status() # Create a temporary file to store the audio with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_audio: temp_audio.write(response.content) temp_audio_path = temp_audio.name try: # Try to convert audio to WAV format (speech_recognition works better with WAV) try: # Lazy import pydub only when needed from pydub import AudioSegment audio = AudioSegment.from_file(temp_audio_path) # Create temporary WAV file with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_wav: audio.export(temp_wav.name, format="wav") wav_path = temp_wav.name except Exception as e: # If conversion fails, try using the original file directly print(f"Audio conversion warning: {e}") wav_path = temp_audio_path # Initialize speech recognizer recognizer = sr.Recognizer() # Load the audio file with sr.AudioFile(wav_path) as source: # Adjust for ambient noise recognizer.adjust_for_ambient_noise(source, duration=0.5) # Record the audio audio_data = recognizer.record(source) # Try to recognize speech try: # Use Google Speech Recognition (free, requires internet) text = recognizer.recognize_google(audio_data) return f"Audio Transcription:\n{text}" except sr.UnknownValueError: return "Audio Transcription: Could not understand the audio. The speech may be unclear, too quiet, or in a language not supported." except sr.RequestError as e: return f"Audio Transcription: Error with speech recognition service: {str(e)}" finally: # Clean up temporary files try: os.unlink(temp_audio_path) if wav_path != temp_audio_path: # Only delete wav file if it's different os.unlink(wav_path) except: pass except ImportError: return "Audio Transcription: Speech recognition library not available. Please install 'SpeechRecognition' package." except Exception as e: return f"Audio Transcription Error: {str(e)}"