import logging import os import asyncio from langchain_core.tools import StructuredTool from pydantic import BaseModel, Field from typing import Optional, List from duckduckgo_search import DDGS from serpapi import GoogleSearch logger = logging.getLogger(__name__) class DuckDuckGoSearchInput(BaseModel): query: str = Field(description="Search query") original_query: str = Field(description="Original query for context") embedder: Optional[object] = Field(description="SentenceTransformer embedder", default=None) async def duckduckgo_search_func(query: str, original_query: str, embedder: Optional[object] = None) -> List[str]: """ Perform a DuckDuckGo search with retries and fall back to SerpAPI if needed. Args: query (str): Search query. original_query (str): Original query for context. embedder (Optional[object]): SentenceTransformer for result filtering. Returns: List[str]: List of search result snippets. """ async def try_duckduckgo(query: str, max_retries: int = 3) -> List[str]: for attempt in range(max_retries): try: logger.info(f"DuckDuckGo search attempt {attempt + 1} for query: {query}") with DDGS() as ddgs: results = [r['body'] for r in ddgs.text(query, max_results=5)] return results except Exception as e: if "Ratelimit" in str(e) and attempt < max_retries - 1: wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s logger.warning(f"DuckDuckGo rate limit hit, retrying in {wait_time}s: {e}") await asyncio.sleep(wait_time) else: logger.error(f"DuckDuckGo search failed for query '{query}': {e}") raise e return [] async def try_serpapi(query: str, max_retries: int = 3) -> List[str]: if not os.getenv("SERPAPI_API_KEY"): logger.warning("SERPAPI_API_KEY not set, cannot use SerpAPI fallback") return [] for attempt in range(max_retries): try: logger.info(f"SerpAPI search attempt {attempt + 1} for query: {query}") params = { "q": query, "api_key": os.getenv("SERPAPI_API_KEY"), "num": 5 } search = GoogleSearch(params) results = search.get_dict().get("organic_results", []) return [result.get("snippet", "") for result in results if "snippet" in result] except Exception as e: if attempt < max_retries - 1: wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s logger.warning(f"SerpAPI search failed, retrying in {wait_time}s: {e}") await asyncio.sleep(wait_time) else: logger.error(f"SerpAPI search failed for query '{query}': {e}") return [] try: # Try DuckDuckGo with retries logger.info(f"Executing DuckDuckGo search for query: {query}") results = await try_duckduckgo(query) # Fall back to SerpAPI if DuckDuckGo fails if not results: logger.info(f"DuckDuckGo returned no results, falling back to SerpAPI for query: {query}") results = await try_serpapi(query) # Rank results if embedder is provided if embedder and results: from sentence_transformers import util query_embedding = embedder.encode(original_query, convert_to_tensor=True) result_embeddings = embedder.encode(results, convert_to_tensor=True) scores = util.cos_sim(query_embedding, result_embeddings)[0] ranked_results = [results[i] for i in scores.argsort(descending=True)] return ranked_results[:3] return results[:3] if results else [] except Exception as e: logger.error(f"Search failed for query '{query}': {e}") return [] duckduckgo_search_tool = StructuredTool.from_function( func=duckduckgo_search_func, name="duckduckgo_search_tool", args_schema=DuckDuckGoSearchInput, coroutine=duckduckgo_search_func )