import json import re from typing import List, Dict, Any, Optional from services.mistral_service import MistralService from services.database_service import DatabaseService class FoodSearchService: """Service pour la recherche et le traitement des données alimentaires""" def __init__(self): self.mistral_service = MistralService() self.database_service = DatabaseService() def search_and_format_products(self, query: str, max_results: int = 5) -> str: """ Recherche de produits alimentaires avec analyse intelligente par IA. Cette fonction utilise Mistral AI pour comprendre les requêtes en langage naturel et recherche dans la base OpenFoodFacts pour retourner des informations détaillées sur les produits alimentaires. Args: query (str): Requête de recherche en langage naturel. max_results (int): Nombre maximum de résultats à retourner (1-20). Returns: str: Résultats formatés en markdown contenant pour chaque produit : - Nom du produit et marque - Scores nutritionnels (Nutri-Score, NOVA) - Composition (nombre d'ingrédients et additifs) - Liste complète des ingrédients - Allergènes présents - Valeurs nutritionnelles (énergie, matières grasses, sucres, sel, protéines) - Informations produit (quantité, portion) - Catégories du produit Raises: Exception: En cas d'erreur de recherche ou de connexion à la base de données Examples: >>> service = FoodSearchService() >>> results = service.search_products("nutella", 3) >>> print(results) **📊 3 produits trouvés :** **1. Nutella** - **Marque**: Ferrero - **Scores**: Nutri: 🔴E | NOVA: 🔴4 - **🧾 Ingrédients**: Sucre, huile de palme, noisettes... """ if not query.strip(): return "❌ Enter your research" try: analysis = self.mistral_service.analyze_query(query) except Exception as e: print(f"Mistral not ready: {e}") analysis = {'keywords': [query], 'brand': '', 'category': ''} # Rechercher dans la base try: products = self.database_service.search_products(analysis, limit=max_results) except Exception as e: return f"❌ Search error: {str(e)}" if not products: return f"❌ No results for '{query}'" return self._format_products_results(products) def _format_products_results(self, products: List[Dict[str, Any]]) -> str: """Format results in markdown""" result = f"**📊 {len(products)} produits trouvés :**\n\n" for i, product in enumerate(products, 1): try: result += self._format_single_product(i, product) except Exception as e: print(f"Error formating product {i}: {e}") result += f"**{i}. Erreur lors du traitement du produit**\n\n" continue return result def _format_single_product(self, index: int, product: Dict[str, Any]) -> str: """Formate single produdt""" product_name = product.get('product_name', 'Produit inconnu') brand = product.get('brands', 'N/A') result = f"**{index}. {product_name}**\n" result += f"- **Marque**: {brand}\n" # Scores nutritionnels result += self._format_nutrition_scores(product) # Composition result += self._format_composition(product) # Ingrédients result += self._format_ingredients(product) # Allergènes result += self._format_allergens(product) # Valeurs nutritionnelles result += self._format_nutrition_values(product) # Informations additionnelles result += self._format_additional_info(product) # Catégories result += self._format_categories(product) result += "\n" return result def _format_nutrition_scores(self, product: Dict[str, Any]) -> str: """Format nutrition scores""" scores_line = [] # Nutri-Score nutri_grade = product.get('nutriscore_grade') if nutri_grade and nutri_grade != 'unknown': nutri_emoji = self._get_grade_emoji(nutri_grade, 'nutri') scores_line.append(f"Nutri: {nutri_emoji}{nutri_grade.upper()}") # Groupe NOVA nova_group = product.get('nova_group') if nova_group: nova_emoji = self._get_grade_emoji(nova_group, 'nova') scores_line.append(f"NOVA: {nova_emoji}{nova_group}") return f"- **Scores**: {' | '.join(scores_line)}\n" if scores_line else "" @staticmethod def _format_composition(product: Dict[str, Any]) -> str: """Format composition (ingrédients/additifs)""" composition_parts = [] ingredients_count = product.get('ingredients_count', 0) additives_count = product.get('additives_count', 0) if ingredients_count and ingredients_count > 0: composition_parts.append(f"{ingredients_count} ingrédients") if additives_count and additives_count > 0: composition_parts.append(f"⚠️ {additives_count} additifs") return f"- **Composition**: {', '.join(composition_parts)}\n" if composition_parts else "" def _format_ingredients(self, product: Dict[str, Any]) -> str: """Format ingredients""" ingredients_text = product.get('ingredients_text') if ingredients_text and ingredients_text != 'N/A': ingredients_clean = self._extract_ingredients_text(ingredients_text) if ingredients_clean: # Limiter la longueur pour l'affichage if len(ingredients_clean) > 300: ingredients_display = ingredients_clean[:300] + "..." else: ingredients_display = ingredients_clean return f"- **🧾 Ingrédients**: {ingredients_display}\n" return "" def _format_allergens(self, product: Dict[str, Any]) -> str: """Format allergens""" allergens = product.get('allergens', []) if allergens and len(allergens) > 0: allergens_clean = [self._clean_allergen_name(a) for a in allergens[:4]] allergens_clean = [a for a in allergens_clean if a] if allergens_clean: return f"- **⚠️ Allergènes**: {', '.join(allergens_clean)}\n" return "" def _format_nutrition_values(self, product: Dict[str, Any]) -> str: """Format nutrition values""" nutrients = self._parse_nutrients(product.get('nutriments', {})) nutrition_display = [] # Ordre d'importance pour l'affichage key_nutrients = [ 'energy-kcal', 'energy_kcal', 'sugars', 'fat', 'salt', 'sodium', 'proteins', 'saturated-fat', 'saturated_fat', 'carbohydrates' ] for nutrient_name in key_nutrients: if nutrient_name in nutrients: formatted_value = self._format_nutrition_value(nutrients[nutrient_name]) if formatted_value != "N/A": display_name = { 'energy-kcal': 'Énergie', 'energy_kcal': 'Énergie', 'fat': 'Matières grasses', 'saturated-fat': 'Sat. grasses', 'saturated_fat': 'Sat. grasses', 'sugars': 'Sucres', 'salt': 'Sel', 'sodium': 'Sel', 'proteins': 'Protéines', 'carbohydrates': 'Glucides' }.get(nutrient_name, nutrient_name.title()) nutrition_display.append(f"{display_name}: {formatted_value}") if len(nutrition_display) >= 4: break return f"- **🍽️ Nutrition** (100g): {' | '.join(nutrition_display)}\n" if nutrition_display else "" @staticmethod def _format_additional_info(product: Dict[str, Any]) -> str: """Format additional info""" additional_info = [] serving_size = product.get('serving_size') quantity = product.get('quantity') if quantity and quantity != 'N/A': additional_info.append(f"📦 {quantity}") if serving_size and serving_size != 'N/A': additional_info.append(f"🥄 Portion: {serving_size}") return f"- **ℹ️ Infos**: {' | '.join(additional_info)}\n" if additional_info else "" @staticmethod def _format_categories(product: Dict[str, Any]) -> str: """Formate categories""" categories = product.get('categories') if categories and categories != 'N/A': cats_clean = categories.replace(',', ' → ').replace('Snacks → ', '') if len(cats_clean) > 60: cats_clean = cats_clean[:60] + "..." return f"- **📂 Catégorie**: {cats_clean}\n" return "" @staticmethod def _get_grade_emoji(grade, grade_type='nutri') -> str: """Return grad emoji""" if not grade or grade == 'unknown': return '' if grade_type == 'nutri': return {'a': '🟢', 'b': '🟡', 'c': '🟠', 'd': '🔴', 'e': '🔴'}.get(str(grade).lower(), '') elif grade_type == 'nova': return {1: '🟢', 2: '🟡', 3: '🟠', 4: '🔴'}.get(grade, '') return '' @staticmethod def _clean_allergen_name(allergen: str) -> str: """Clean allergen name""" if not allergen: return allergen cleaned = allergen.replace('en:', '').replace('fr:', '') cleaned = cleaned.replace('-', ' ').title() return cleaned def _extract_ingredients_text(self, ingredients_data) -> str: """Extract ingredients list""" if not ingredients_data or ingredients_data == 'N/A': return "" try: data_str = str(ingredients_data) patterns = [ r"'text':\s*'([^']{20,})'", r'"text":\s*"([^"]{20,})"', r"'text':\s*\"([^\"]{20,})\"", r'"text":\s*\'([^\']{20,})\'', ] all_matches = [] for pattern in patterns: matches = re.findall(pattern, data_str, re.DOTALL) all_matches.extend(matches) if all_matches: longest_match = max(all_matches, key=len) return self._clean_ingredients_text(longest_match) # Fallback simple_pattern = r"text[^a-zA-Z]*([a-zA-Z][^}]{30,})" simple_matches = re.findall(simple_pattern, data_str) if simple_matches: return self._clean_ingredients_text(simple_matches[0]) except Exception as e: print(f"Error ingredients extraction: {e}") return "" @staticmethod def _clean_ingredients_text(text: str) -> str: """Clean ingredients text""" if not text: return "" text = re.sub(r'(.*?)', r'\1', text) text = text.replace('\\', '').replace('\\"', '"').replace("\\'", "'") text = text.replace('"', '"') text = re.sub(r'\s*%\s*', '% ', text) text = re.sub(r'\s*\(\s*', ' (', text) text = re.sub(r'\s*\)\s*', ') ', text) text = re.sub(r'\s+', ' ', text) return text.strip() @staticmethod def _parse_nutrients(nutrients_data) -> Dict[str, Any]: """Parse nutrients""" if not nutrients_data or nutrients_data == 'N/A': return {} try: # Si c'est déjà une liste if isinstance(nutrients_data, list): result = {} for nutrient in nutrients_data: if isinstance(nutrient, dict) and 'name' in nutrient: result[nutrient['name']] = nutrient return result # Si c'est une chaîne if isinstance(nutrients_data, str): if nutrients_data.startswith('[') and 'name' in nutrients_data: try: import ast nutrients_list = ast.literal_eval(nutrients_data) if isinstance(nutrients_list, list): result = {} for nutrient in nutrients_list: if isinstance(nutrient, dict) and 'name' in nutrient: result[nutrient['name']] = nutrient return result except: pass try: nutrients_list = json.loads(nutrients_data) result = {} for nutrient in nutrients_list: if isinstance(nutrient, dict) and 'name' in nutrient: result[nutrient['name']] = nutrient return result except: pass except: pass return {} @staticmethod def _format_nutrition_value(nutrient_data) -> str: """Format nutrition value""" if not nutrient_data: return "N/A" value = nutrient_data.get('100g', nutrient_data.get('value', 0)) unit = nutrient_data.get('unit', '') if value is None or value == 0: return "N/A" if 'kcal' in unit.lower(): return f"{int(value)} kcal" elif 'kj' in unit.lower(): return f"{int(value)} kJ" elif unit == 'g': return f"{value:.1f}g" else: return f"{value} {unit}" if unit else f"{value}"