Spaces:
Sleeping
Sleeping
| # prompt: agent.run("Add the results of scraping: https://www.amazon.de/Amazon-Cola-24-330ml/dp/B0B2QFB69F/ref=sr_1_1_ffob_sspa?dib=eyJ2IjoiMSJ9.Ammc6GrHRevDKBSZX9_vNS5j3Kc1ZW2R4jISx9htSBAc0WWFC1xX5qnohoEQjmvNqQyWIr6hnMbFad3QuwPMVG8F_nZbwnpBcHL89OZsU2XzkSha-clTmgJLUUh7Z96_98HOe9hOif82mXyrL7ZTnbygPSbm-t6FDAfslLesKfij79QL7-a2RSOKVPcJRFR1DLUamaHfmhyN5c_rujFjb2X1rQSXg6NWCnOdgU2r1gzEa54bU8bxeQnX-vMsRMGEw4entZYP_Oh85pEImPU_lS2Awqr-sG_RgaV0Wuzfmdw.XA9kTWHZQvmhT2BoQWxRNix2TJe8EoeyjiSoQtFx1yY&dib_tag=se&keywords=Cola&qid=1738167189&rdc=1&sr=8-1-spons&sp_csd=d2lkZ2V0TmFtZT1zcF9hdGY&th=1 to a csv file") | |
| #!pip install -q smolagents transformers sentence_transformers gradio | |
| from smolagents import CodeAgent, HfApiModel, tool | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| import gradio as gr | |
| from typing import List, Dict, Optional | |
| import time | |
| import random | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor | |
| import json | |
| import pandas as pd | |
| #### | |
| #### | |
| #### | |
| def amazon_scraper(keyword: str, mandatory_columns: List[str] = None, max_products: int = 10) -> Dict: | |
| """ | |
| Enhanced Amazon scraper that gets both listing and detailed product information. | |
| Args: | |
| keyword: Search term for Amazon products | |
| mandatory_columns: List of specific attributes to always include | |
| max_products: Maximum number of products to scrape (default: 10) | |
| Returns: | |
| Dictionary containing list of products and their details, plus column names | |
| """ | |
| if mandatory_columns is None: | |
| mandatory_columns = ['title', 'price', 'rating', 'reviews'] | |
| headers = { | |
| 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/VERSION Safari/537.36", | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Connection': 'keep-alive', | |
| } | |
| search_url = f"https://www.amazon.com/s?k={keyword.replace(' ', '+')}" | |
| try: | |
| response = requests.get(search_url, headers=headers) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| items = soup.find_all('div', attrs={'data-component-type': 's-search-result'}) | |
| if not items: | |
| return {'error': 'No products found for the given search term'} | |
| products = [] | |
| for item in items[:max_products]: | |
| try: | |
| product = {} | |
| # Basic information from search results div data-cy="title-recipe" | |
| # here I want to filter for : | |
| title_elem = item.find('div', class_="title-instructions-style") | |
| product['title'] = title_elem.text.strip() if title_elem else 'N/A' | |
| price_elem = item.find('span', class_='a-offscreen') | |
| product['price'] = price_elem.text.strip() if price_elem else 'N/A' | |
| rating_elem = item.find('span', class_='a-icon-alt') | |
| product['rating'] = rating_elem.text.split(' ')[0] if rating_elem else 'N/A' | |
| reviews_elem = item.find('span', {'class': 'a-size-base', 'dir': 'auto'}) | |
| product['reviews'] = reviews_elem.text.strip() if reviews_elem else '0' | |
| # Get product URL | |
| url_elem = item.find('a', class_='a-link-normal s-no-outline') | |
| if url_elem and 'href' in url_elem.attrs: | |
| product_url = 'https://www.amazon.com' + url_elem['href'] | |
| product['url'] = product_url | |
| # Scrape detailed information | |
| details = scrape_product_details(product_url, headers) | |
| product.update(details) | |
| products.append(product) | |
| except Exception as e: | |
| print(f"Error processing item: {str(e)}") | |
| continue | |
| if not products: | |
| return {'error': 'Failed to extract product information'} | |
| return {'products': products, 'columns': list(products[0].keys())} | |
| except requests.RequestException as e: | |
| return {'error': f'Network error: {str(e)}'} | |
| except Exception as e: | |
| return {'error': f'Unexpected error: {str(e)}'} | |
| from typing import Optional, Dict | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from smolagents import tool # Ensure you import @tool | |
| def scrape_product_details(url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: | |
| """ | |
| Scrapes product details from an Amazon product page. | |
| Args: | |
| url: The URL of the Amazon product page to scrape. | |
| headers: HTTP headers to include in the request. Defaults to None. | |
| Returns: | |
| Dict[str, str]: A dictionary containing: | |
| - 'title': Product title | |
| - 'price': Product price | |
| - 'description': Product description | |
| - 'bullet_points': Bullet point features (comma-separated string) | |
| - 'average_rating': Customer rating | |
| - 'total_reviews': Number of reviews | |
| - 'image_link': URL of the main product image | |
| """ | |
| if headers is None: | |
| headers = { | |
| 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/VERSION Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| return {'error': f'Failed to retrieve the page. Status code: {response.status_code}'} | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract the product title | |
| product_title = soup.find('span', {'id': 'productTitle'}) | |
| product_title = product_title.get_text(strip=True) if product_title else 'Title not found' | |
| # Extract the product price | |
| product_price = soup.find('span', {'class': 'a-price-whole'}) | |
| product_price = product_price.get_text(strip=True) if product_price else 'Price not found' | |
| # Extract the product description | |
| product_description = soup.find('div', {'id': 'productDescription'}) | |
| product_description = product_description.get_text(strip=True) if product_description else 'Description not found' | |
| # Extract bullet points | |
| bullet_points = [] | |
| bullet_section = soup.find('ul', {'class': 'a-unordered-list a-vertical a-spacing-mini'}) | |
| if bullet_section: | |
| for li in bullet_section.find_all('li'): | |
| bullet = li.find('span', {'class': 'a-list-item'}) | |
| if bullet: | |
| bullet_points.append(bullet.get_text(strip=True)) | |
| bullet_points_text = ', '.join(bullet_points) if bullet_points else 'Bullet points not found' | |
| # Extract average customer rating | |
| average_rating = soup.find('span', {'class': 'a-icon-alt'}) | |
| average_rating = average_rating.get_text(strip=True) if average_rating else 'Average rating not found' | |
| # Extract total number of customer reviews | |
| total_reviews = soup.find('span', {'id': 'acrCustomerReviewText'}) | |
| total_reviews = total_reviews.get_text(strip=True) if total_reviews else 'Total reviews not found' | |
| # Extract the main image link | |
| image_tag = soup.find('img', {'id': 'landingImage'}) | |
| image_link = image_tag['src'] if image_tag else 'Image link not found' | |
| return { | |
| 'title': product_title, | |
| 'price': product_price, | |
| 'description': product_description, | |
| 'bullet_points': bullet_points_text, | |
| 'average_rating': average_rating, | |
| 'total_reviews': total_reviews, | |
| 'image_link': image_link | |
| } | |
| ### | |
| ### | |
| model = HfApiModel() | |
| agent = CodeAgent( | |
| tools=[amazon_scraper], | |
| model=model, | |
| additional_authorized_imports=['requests', 'bs4', 'pandas', 'gradio', 'concurrent.futures', 'csv', 'json'] | |
| ) | |
| # Assuming the agent.run call returns a dictionary with 'products' key | |
| ##### | |
| ##### | |
| ## | |
| import gradio as gr | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Union | |
| from smolagents import CodeAgent, HfApiModel, tool | |
| def process_agent_response(response: Union[Dict, List, str]) -> Tuple[pd.DataFrame, str]: | |
| """ | |
| Process the agent's response and convert it to a DataFrame | |
| Returns DataFrame and error message (if any) | |
| """ | |
| def extract_products_from_response(resp): | |
| """Helper function to extract product data from various response formats""" | |
| if isinstance(resp, list): | |
| # Response is already a list of products | |
| return resp | |
| elif isinstance(resp, dict): | |
| # Check if it's wrapped in a 'products' key | |
| if 'products' in resp: | |
| return resp['products'] | |
| # Check if it's a single product | |
| elif 'title' in resp: | |
| return [resp] | |
| elif 'error' in resp: | |
| return None | |
| elif isinstance(resp, str): | |
| try: | |
| # Try to parse the string response | |
| import ast | |
| # Remove common prefixes | |
| if "Final answer:" in resp: | |
| resp = resp.split("Final answer:", 1)[1].strip() | |
| elif "Out - Final answer:" in resp: | |
| resp = resp.split("Out - Final answer:", 1)[1].strip() | |
| parsed = ast.literal_eval(resp) | |
| # Recursively process the parsed result | |
| return extract_products_from_response(parsed) | |
| except: | |
| pass | |
| return None | |
| try: | |
| products = extract_products_from_response(response) | |
| if products is None: | |
| return pd.DataFrame(), "No valid product data found" | |
| # Convert to DataFrame | |
| df = pd.DataFrame(products) | |
| # Clean up column names | |
| df.columns = [col.lower().strip() for col in df.columns] | |
| return df, "" | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing data: {str(e)}" | |
| def search_products(keyword: str, max_products: int) -> Tuple[pd.DataFrame, str, str]: | |
| """ | |
| Search for products and return results as a DataFrame | |
| Returns: (DataFrame, status message, error message) | |
| """ | |
| try: | |
| result = agent.run(f'Show me details for {max_products} amazon products with keyword: {keyword}. Return a product-json with resp["products"]') | |
| df, error_msg = process_agent_response(result) | |
| if not df.empty: | |
| # Select and reorder relevant columns, using lowercase names | |
| display_columns = [ | |
| 'title', 'price', 'rating', 'reviews', 'description', | |
| 'bullet_points', 'average_rating', 'total_reviews' | |
| ] | |
| # Filter for columns that actually exist in the DataFrame | |
| available_columns = [col for col in display_columns if col in df.columns] | |
| df = df[available_columns] | |
| # Clean up the display | |
| if 'price' in df.columns: | |
| df['price'] = df['price'].apply(lambda x: f"${str(x).strip('.')}") | |
| # Truncate long text fields for better display | |
| if 'description' in df.columns: | |
| df['description'] = df['description'].apply(lambda x: x[:200] + '...' if len(x) > 200 else x) | |
| if 'bullet_points' in df.columns: | |
| df['bullet_points'] = df['bullet_points'].apply(lambda x: x[:200] + '...' if len(x) > 200 else x) | |
| status_msg = f"Found {len(df)} products" | |
| return df, status_msg, "" | |
| else: | |
| return df, "", error_msg or "No products found" | |
| except Exception as e: | |
| return pd.DataFrame(), "", f"Search error: {str(e)}" | |
| def answer_product_question(df: pd.DataFrame, question: str) -> str: | |
| """ | |
| Answer questions about the products using the agent | |
| """ | |
| if df.empty: | |
| return "Please search for products first before asking questions." | |
| try: | |
| # Convert DataFrame to a more readable format for the agent | |
| products_context = df.to_dict('records') | |
| prompt = f"""Based on these products: | |
| {products_context} | |
| Question: {question} | |
| Please provide a clear and concise answer using only the information available in the product data.""" | |
| response = agent.run(prompt) | |
| # Handle different response formats | |
| if isinstance(response, dict): | |
| return str(response) | |
| return response | |
| except Exception as e: | |
| return f"Error processing question: {str(e)}" | |
| def create_interface() -> gr.Interface: | |
| """ | |
| Create the Gradio interface with search and Q&A functionality | |
| """ | |
| with gr.Blocks(title="Amazon Product Search & Q&A") as interface: | |
| gr.Markdown("# Amazon Product Search and Q&A System") | |
| # Status message for feedback | |
| status_msg = gr.Markdown("") | |
| with gr.Row(): | |
| with gr.Column(): | |
| keyword_input = gr.Textbox( | |
| label="Product Keyword or Name", | |
| placeholder="Enter product keyword...", | |
| scale=3 | |
| ) | |
| max_products = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=5, | |
| step=1, | |
| label="Number of Products", | |
| ) | |
| search_button = gr.Button("Search Products", variant="primary") | |
| with gr.Column(): | |
| question_input = gr.Textbox( | |
| label="Ask about the products", | |
| placeholder="Enter your question about the products...", | |
| scale=3 | |
| ) | |
| ask_button = gr.Button("Ask Question", variant="secondary") | |
| # Output components | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| product_table = gr.Dataframe( | |
| label="Product Search Results", | |
| interactive=False, | |
| wrap=True | |
| ) | |
| with gr.Column(scale=1): | |
| answer_output = gr.Markdown( | |
| label="Answer to Your Question" | |
| ) | |
| # Store DataFrame state | |
| df_state = gr.State(pd.DataFrame()) | |
| def on_search(keyword: str, max_products: int) -> Tuple[pd.DataFrame, pd.DataFrame, str]: | |
| # TODO add thinking in an output | |
| df, status, error = search_products(keyword, max_products) | |
| message = error if error else status | |
| return df, df, gr.Markdown(message) | |
| def on_question(df: pd.DataFrame, question: str) -> str: | |
| # TODO add thinking in an output | |
| return answer_product_question(df, question) | |
| # Connect components | |
| search_button.click( | |
| fn=on_search, | |
| inputs=[keyword_input, max_products], | |
| outputs=[product_table, df_state, status_msg] | |
| ) | |
| ask_button.click( | |
| fn=on_question, | |
| inputs=[df_state, question_input], | |
| outputs=answer_output | |
| ) | |
| return interface | |
| def main(): | |
| # Create and launch the interface | |
| interface = create_interface() | |
| interface.launch( | |
| debug=True, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) | |
| if __name__ == "__main__": | |
| main() | |