522H0134-NguyenNhatHuy's picture
Update app.py
1292a2b verified
import os
import sys
import torch
import pickle
import logging
import tempfile
import requests
import re
import asyncio
import aiohttp
from urllib.parse import quote_plus
from pytube import Search
from PIL import Image
from torchvision import transforms
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline, AutoModelForCausalLM
import gradio as gr
import pandas as pd
import plotly.express as px
from reportlab.lib.pagesizes import letter
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage
from reportlab.lib.styles import getSampleStyleSheet
from io import BytesIO
from langchain_huggingface import HuggingFacePipeline
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydub import AudioSegment
from pydub.utils import which
# Local imports (assumed to be available)
from args import get_parser
from model import get_model
from output_utils import prepare_output
# ============== DEVICE CONFIG ==============
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
map_loc = None if torch.cuda.is_available() else "cpu"
logging.getLogger("pytube").setLevel(logging.ERROR)
# ============== LOAD TRANSLATION MODELS ==============
model_envit5_name = "VietAI/envit5-translation"
try:
tokenizer_envit5 = AutoTokenizer.from_pretrained(model_envit5_name)
model_envit5 = AutoModelForSeq2SeqLM.from_pretrained(
model_envit5_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
).to(device)
pipe_envit5 = pipeline(
"text2text-generation",
model=model_envit5,
tokenizer=tokenizer_envit5,
device=0 if torch.cuda.is_available() else -1,
max_new_tokens=512,
do_sample=False
)
except Exception as e:
print(f"Error loading Vietnamese model: {e}")
pipe_envit5 = None
models = {
"Japanese": {"model_name": "Helsinki-NLP/opus-mt-en-jap"},
"Chinese": {"model_name": "Helsinki-NLP/opus-mt-en-zh"}
}
for lang in models:
try:
tokenizer = AutoTokenizer.from_pretrained(models[lang]["model_name"])
model = AutoModelForSeq2SeqLM.from_pretrained(
models[lang]["model_name"],
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
).to(device)
models[lang]["pipe"] = pipeline(
"translation",
model=model,
tokenizer=tokenizer,
device=0 if torch.cuda.is_available() else -1,
max_length=512,
batch_size=4 if torch.cuda.is_available() else 1,
truncation=True
)
except Exception as e:
print(f"Error loading {lang} model: {e}")
models[lang]["pipe"] = None
# ============== LOAD CHATBOT MODEL ==============
chatbot_tokenizer = AutoTokenizer.from_pretrained("bigscience/bloomz-560m")
chatbot_model = AutoModelForCausalLM.from_pretrained(
"bigscience/bloomz-560m",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
).to(device)
chatbot_pipeline = pipeline(
"text-generation",
model=chatbot_model,
tokenizer=chatbot_tokenizer,
device=0 if torch.cuda.is_available() else -1,
max_new_tokens=100,
do_sample=True,
temperature=0.6,
top_p=0.9,
pad_token_id=chatbot_tokenizer.eos_token_id,
batch_size=1
)
llm = HuggingFacePipeline(pipeline=chatbot_pipeline)
# LangChain Chatbot Setup
prompt = ChatPromptTemplate.from_template("""
You are a professional culinary assistant. You will answer the user's question directly based on the provided recipe.
Do not repeat the recipe or question in your answer. Be concise.
Dish: {title}
Ingredients: {ingredients}
Instructions: {instructions}
User Question: {question}
Answer:
""")
chain = prompt | llm
chat_histories = {}
def get_session_history(session_id):
if session_id not in chat_histories:
chat_histories[session_id] = InMemoryChatMessageHistory()
return chat_histories[session_id]
chatbot_chain = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="question",
history_messages_key="history"
)
# ============== GLOBAL STATE ==============
current_recipe_context = {"context": "", "title": "", "ingredients": [], "instructions": [], "image": None}
# ============== RECIPE FORMAT & TRANSLATE ==============
def format_recipe(title, ingredients, instructions, lang):
emoji = {"title": "🍽️", "ingredients": "πŸ§‚", "instructions": "πŸ“–"}
titles = {
"en": {"ingredients": "Ingredients", "instructions": "Instructions"},
"ja": {"ingredients": "Ingredients (材料)", "instructions": "Instructions (δ½œγ‚Šζ–Ή)"},
"zh": {"ingredients": "Ingredients (食材)", "instructions": "Instructions (ζ­₯ιͺ€)"},
"vi": {"ingredients": "Ingredients (NguyΓͺn liệu)", "instructions": "Instructions (CΓ‘ch lΓ m)"},
}
code_mapping = {
"English (original)": "en",
"Japanese": "ja",
"Chinese": "zh",
"Vietnamese": "vi",
}
code = code_mapping.get(lang, "en")
result = [f"### {emoji['title']} {title}", f"**{emoji['ingredients']} {titles[code]['ingredients']}:**"]
result.extend([f"- {i}" for i in ingredients])
result.append(f"\n**{emoji['instructions']} {titles[code]['instructions']}:**")
result.extend([f"{i+1}. {step}" for i, step in enumerate(instructions)])
return "\n".join(result)
def translate_section(text, lang):
if lang == "English (original)":
return text
if lang == "Vietnamese":
if pipe_envit5 is None:
return f"❗ Vietnamese translation model not available"
try:
max_chunk_length = 400
if len(text) > max_chunk_length:
sentences = text.split('. ')
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < max_chunk_length:
current_chunk += sentence + ". "
else:
chunks.append(current_chunk)
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk)
else:
chunks = [text]
translated_chunks = []
for chunk in chunks:
chunk = f"en-vi: {chunk}"
translated = pipe_envit5(chunk, max_new_tokens=512)[0]["generated_text"]
translated = translated.replace("vi: vi: ", "").replace("vi: Vi: ", "").replace("vi: ", "").strip()
translated_chunks.append(translated)
return " ".join(translated_chunks)
except Exception as e:
print(f"Vietnamese translation error: {e}")
return text
if models.get(lang, {}).get("pipe") is None:
return f"❗ Translation model for {lang} not available"
try:
max_chunk_length = 400
if len(text) > max_chunk_length:
sentences = text.split('. ')
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < max_chunk_length:
current_chunk += sentence + ". "
else:
chunks.append(current_chunk)
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk)
else:
chunks = [text]
translated_chunks = []
for chunk in chunks:
translated = models[lang]["pipe"](chunk, max_length=512)[0]["translation_text"]
translated_chunks.append(translated)
return " ".join(translated_chunks)
except Exception as e:
print(f"Translation error ({lang}): {e}")
return text
def translate_recipe(lang):
if not current_recipe_context["title"]:
return "❗ Please generate a recipe from an image first."
title = translate_section(current_recipe_context["title"], lang)
ingrs = [translate_section(i, lang) for i in current_recipe_context["ingredients"]]
instrs = [translate_section(s, lang) for s in current_recipe_context["instructions"]]
return format_recipe(title, ingrs, instrs, lang)
# ============== NUTRITION ANALYSIS ==============
def nutrition_analysis(ingredient_input):
ingredients = " ".join(ingredient_input.strip().split())
api_url = f'https://api.api-ninjas.com/v1/nutrition?query={ingredients}'
headers = {'X-Api-Key': 'AHVy+tpkUoueBNdaFs9nCg==sFZTMRn8ikZVzx6E'}
response = requests.get(api_url, headers=headers)
if response.status_code != 200:
return "❌ API error or quota exceeded.", None, None, None
data = response.json()
df = pd.DataFrame(data)
numeric_cols = []
for col in df.columns:
if col == "name":
continue
df[col] = pd.to_numeric(df[col], errors="coerce")
if df[col].notna().sum() > 0:
numeric_cols.append(col)
if df.empty or len(numeric_cols) < 3:
return "⚠️ Insufficient numerical data for charts (need at least 3 metrics).", None, None, None
draw_cols = numeric_cols[:3]
fig_bar = px.bar(df, x="name", y=draw_cols[0], title=f"Bar Chart: {draw_cols[0]}", text_auto=True)
pie_data = df[[draw_cols[1], "name"]].dropna()
if pie_data[draw_cols[1]].sum() > 0:
fig_pie = px.pie(pie_data, names="name", values=draw_cols[1], title=f"Pie Chart: {draw_cols[1]}")
else:
fig_pie = px.bar(title="⚠️ Insufficient data for pie chart")
fig_line = px.line(df, x="name", y=draw_cols[2], markers=True, title=f"Line Chart: {draw_cols[2]}")
return "βœ… Analysis successful!", fig_bar, fig_pie, fig_line
def load_recipe_ingredients():
if not current_recipe_context["ingredients"]:
return "⚠️ No ingredients available. Generate a recipe first."
return "\n".join(current_recipe_context["ingredients"])
# ============== CHATBOT ==============
def clean_response(response):
# Remove everything before "Answer:" if present
if "Answer:" in response:
response = response.split("Answer:")[-1]
# Remove potential repetitions of Dish, Ingredients, Instructions
response = re.sub(r"Dish:.*?(Ingredients:|Instructions:).*?", "", response, flags=re.DOTALL)
response = re.sub(r"Ingredients:.*?(Instructions:).*?", "", response, flags=re.DOTALL)
response = re.sub(r"Instructions:.*", "", response, flags=re.DOTALL)
# Remove redundant system info
response = re.sub(r"You are a professional culinary assistant.*?Answer:", "", response, flags=re.DOTALL)
# Remove duplicate user question inside response (very common in these LLM outputs)
response = re.sub(r"User Question:.*", "", response, flags=re.DOTALL)
# Final strip + cleanup
return response.strip()
def validate_cooking_time(question, instructions):
# Extract cooking times from instructions
time_pattern = r"(\d+)\s*(minutes|minute)"
total_time = 0
for instr in instructions:
matches = re.findall(time_pattern, instr)
for match in matches:
total_time += int(match[0])
# Check if user question contains a time
user_time = re.search(time_pattern, question)
if user_time:
user_minutes = int(user_time.group(1))
if user_minutes != total_time:
return f"The recipe takes about {total_time} minutes to cook, not {user_minutes} minutes."
return None
def generate_chat_response(message, session_id="default"):
if not current_recipe_context["title"]:
return "Please generate a recipe from an image before asking about the dish."
# Validate cooking time if relevant
correction = validate_cooking_time(message, current_recipe_context["instructions"])
response = chatbot_chain.invoke(
{
"title": current_recipe_context["title"],
"ingredients": ", ".join(current_recipe_context["ingredients"]),
"instructions": " ".join(current_recipe_context["instructions"]),
"question": message
},
config={"configurable": {"session_id": session_id}}
)
response = clean_response(response)
if correction:
response = f"{correction} {response}"
return response.strip()
def chat_with_bot(message, chat_history, session_id="default"):
if not message.strip():
return "", chat_history
response = generate_chat_response(message, session_id)
chat_history.append({"role": "user", "content": message})
chat_history.append({"role": "assistant", "content": response})
return "", chat_history
# ============== IMAGE TO RECIPE ==============
with open("ingr_vocab.pkl", 'rb') as f:
ingrs_vocab = pickle.load(f)
with open("instr_vocab.pkl", 'rb') as f:
vocab = pickle.load(f)
args = get_parser()
args.maxseqlen = 15
args.ingrs_only = False
model_ic = get_model(args, len(ingrs_vocab), len(vocab))
model_ic.load_state_dict(torch.load("modelbest.ckpt", map_location=map_loc, weights_only=True))
model_ic.to(device).eval()
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
def generate_recipe(image):
if image is None:
return "❗ Please upload an image."
current_recipe_context["image"] = image
image = transform(image.convert("RGB")).unsqueeze(0).to(device)
with torch.no_grad():
outputs = model_ic.sample(image, greedy=True, temperature=1.0, beam=-1, true_ingrs=None)
ids = (outputs['ingr_ids'].cpu().numpy(), outputs['recipe_ids'].cpu().numpy())
outs, valid = prepare_output(ids[1][0], ids[0][0], ingrs_vocab, vocab)
if not valid['is_valid']:
return f"❌ Invalid recipe: {valid['reason']}"
current_recipe_context.update({
"title": outs['title'],
"ingredients": outs['ingrs'],
"instructions": outs['recipe']
})
return format_recipe(outs['title'], outs['ingrs'], outs['recipe'], "English (original)")
# ============== GOOGLE TTS ==============
languages_tts = {
"English": "en",
"Chinese": "zh-CN",
"Japanese": "ja",
"Vietnamese": "vi",
}
async def fetch_tts_audio_async(session, chunk, lang_code):
url = f"https://translate.google.com/translate_tts?ie=UTF-8&q={quote_plus(chunk)}&tl={lang_code}&client=tw-ob"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Referer": "https://translate.google.com/",
}
try:
async with session.get(url, headers=headers, timeout=10) as response:
response.raise_for_status()
return await response.read()
except Exception as e:
print(f"TTS Error for chunk: {e}")
return None
async def fetch_all_tts_audio(chunks, lang_code):
async with aiohttp.ClientSession() as session:
tasks = [fetch_tts_audio_async(session, chunk, lang_code) for chunk in chunks]
return await asyncio.gather(*tasks)
def google_tts(text, lang):
if not text or text.startswith("❗"):
return None, gr.update(visible=False)
# Clean text for TTS
clean_text = text.replace("**", "").replace("###", "").replace("- ", "")
for emoji in ["🍽️", "πŸ§‚", "πŸ“–"]:
clean_text = clean_text.replace(emoji, "")
# Split into chunks (Google TTS max ~200 chars)
max_chunk_length = 200
chunks = [clean_text[i:i+max_chunk_length] for i in range(0, len(clean_text), max_chunk_length)]
if not chunks:
return None, gr.update(visible=False)
# Fetch audio chunks asynchronously
lang_code = languages_tts.get(lang, "en")
audio_contents = asyncio.run(fetch_all_tts_audio(chunks, lang_code))
# Filter out failed requests
audio_files = []
for i, content in enumerate(audio_contents):
if content:
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
f.write(content)
audio_files.append(f.name)
if not audio_files:
return None, gr.update(visible=False)
# Combine audio if FFmpeg is available
if len(audio_files) == 1:
return audio_files[0], gr.update(visible=True)
if which("ffmpeg"):
try:
combined = AudioSegment.empty()
for file in audio_files:
combined += AudioSegment.from_mp3(file)
output_file = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name
combined.export(output_file, format="mp3")
for file in audio_files:
os.unlink(file)
return output_file, gr.update(visible=True)
except Exception as e:
print(f"Error combining audio files: {e}")
# Fallback to first chunk
for i in range(1, len(audio_files)):
os.unlink(audio_files[i])
return audio_files[0], gr.update(visible=True)
else:
print("FFmpeg not found, returning first audio chunk.")
for i in range(1, len(audio_files)):
os.unlink(audio_files[i])
return audio_files[0], gr.update(visible=True)
# ============== VIDEO SEARCH ==============
def search_top_3_videos(keyword):
if not keyword.strip():
return ["", "", ""] * 3
try:
search = Search(f"How to make {keyword}")
results = search.results[:3]
embeds, titles, urls = [], [], []
for video in results:
embed_html = f'''
<iframe width="520" height="320"
src="https://www.youtube.com/embed/{video.video_id}"
frameborder="0" allowfullscreen></iframe>
'''
embeds.append(embed_html)
titles.append(video.title)
urls.append(f"https://www.youtube.com/watch?v={video.video_id}")
while len(embeds) < 3:
embeds.append("No video found")
titles.append("")
urls.append("")
return embeds + titles + urls
except Exception as e:
print(f"Video search error: {e}")
return ["", "", ""] * 3
# ============== RESTAURANT SEARCH ==============
def get_google_maps_search_url(dish_name, city="Ho Chi Minh City"):
query = f"{dish_name} in {city}"
url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}"
return url
def search_and_show_link(dish):
if not dish.strip():
return "Go to Google Maps", gr.update(visible=False)
url = get_google_maps_search_url(dish)
return url, gr.update(visible=True)
# ============== PDF GENERATION ==============
def generate_pdf_recipe():
if not current_recipe_context["title"]:
return None, "❗ Please generate a recipe from an image first."
output_file = "recipe.pdf"
doc = SimpleDocTemplate(output_file, pagesize=letter)
styles = getSampleStyleSheet()
story = []
if current_recipe_context["image"]:
try:
img_buffer = BytesIO()
current_recipe_context["image"].save(img_buffer, format="PNG")
img_buffer.seek(0)
img = ReportLabImage(img_buffer, width=200, height=200)
story.append(img)
story.append(Spacer(1, 12))
except Exception as e:
print(f"Error adding image to PDF: {e}")
story.append(Paragraph(current_recipe_context["title"], styles['Title']))
story.append(Spacer(1, 12))
story.append(Paragraph("Ingredients:", styles['Heading2']))
for ingr in current_recipe_context["ingredients"]:
story.append(Paragraph(f"- {ingr}", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Instructions:", styles['Heading2']))
for i, instr in enumerate(current_recipe_context["instructions"], 1):
story.append(Paragraph(f"{i}. {instr}", styles['Normal']))
doc.build(story)
return output_file, "βœ… Recipe saved as recipe.pdf"
# ============== GRADIO UI ==============
with gr.Blocks(theme=gr.themes.Soft(), title="AI Recipe Generator") as demo:
gr.Markdown("""
# 🍳 AI Recipe Generator & Multilingual Cooking Assistant
Generate recipes from images, translate to multiple languages, get cooking videos, chat with a culinary assistant, analyze nutrition, and find restaurants!
""")
with gr.Tab("πŸ“· Generate Recipe"):
with gr.Row():
with gr.Column():
image_input = gr.Image(type="pil", label="Upload Dish Image", height=300)
gen_btn = gr.Button("Generate Recipe", variant="primary", elem_id="action-btn")
save_pdf_btn = gr.Button("Save as PDF", variant="secondary", elem_id="action-btn")
pdf_output = gr.File(label="Download Recipe PDF", interactive=False)
recipe_output = gr.Markdown("### Your recipe will appear here", elem_classes="recipe-box")
gen_btn.click(generate_recipe, inputs=image_input, outputs=recipe_output)
save_pdf_btn.click(fn=generate_pdf_recipe, outputs=[pdf_output, recipe_output])
with gr.Tab("🌍 Translate & TTS"):
with gr.Row():
with gr.Column():
lang_dropdown = gr.Dropdown(
choices=["English (original)", "Japanese", "Chinese", "Vietnamese"],
value="Japanese",
label="Select Language"
)
with gr.Row():
trans_btn = gr.Button("Translate Recipe", variant="primary", elem_id="action-btn")
tts_btn = gr.Button("πŸ”ˆ Listen to Recipe", variant="secondary", elem_id="action-btn")
with gr.Column():
translation_output = gr.Markdown("### Translated recipe will appear here", elem_classes="recipe-box")
tts_audio = gr.Audio(interactive=False, label="Audio Output", visible=False)
trans_btn.click(fn=translate_recipe, inputs=lang_dropdown, outputs=translation_output)
tts_btn.click(fn=google_tts, inputs=[translation_output, lang_dropdown], outputs=[tts_audio, tts_audio])
with gr.Tab("πŸŽ₯ Cooking Videos"):
with gr.Row():
with gr.Column():
video_keyword = gr.Textbox(label="Search Cooking Videos", placeholder="e.g. beef pho")
search_btn = gr.Button("Search Videos", variant="primary", elem_id="action-btn")
with gr.Column():
video_embeds, video_titles, video_urls = [], [], []
for i in range(3):
with gr.Column():
video_embeds.append(gr.HTML(label=f"🎬 Video {i+1}"))
video_titles.append(gr.Textbox(label=f"πŸ“Œ Title {i+1}", interactive=False))
video_urls.append(gr.Textbox(label=f"πŸ”— URL {i+1}", interactive=False, visible=False))
search_btn.click(fn=search_top_3_videos, inputs=video_keyword, outputs=video_embeds + video_titles + video_urls)
with gr.Tab("πŸ’¬ Culinary Chatbot"):
chatbot = gr.Chatbot(height=400, type="messages")
with gr.Row():
chat_input = gr.Textbox(placeholder="Ask about the dish...", scale=4)
chat_btn = gr.Button("Send", variant="primary", scale=1, elem_id="action-btn")
chat_btn.click(chat_with_bot, inputs=[chat_input, chatbot], outputs=[chat_input, chatbot])
chat_input.submit(chat_with_bot, inputs=[chat_input, chatbot], outputs=[chat_input, chatbot])
with gr.Tab("πŸ₯— Nutrition Analysis"):
with gr.Row():
with gr.Column():
ingredient_input = gr.Textbox(
label="🧾 Enter Ingredients (one per line or space-separated)",
lines=10,
placeholder="cheese\npepper\negg\n..."
)
with gr.Row():
load_ingredients_btn = gr.Button("Load Recipe Ingredients", variant="secondary", elem_id="action-btn")
analyze_btn = gr.Button("Analyze Nutrition", variant="primary", elem_id="action-btn")
with gr.Column():
nutrition_message = gr.Textbox(label="πŸ”” Message", interactive=False)
bar_chart = gr.Plot(label="πŸ“Š Bar Chart")
pie_chart = gr.Plot(label="πŸ₯§ Pie Chart")
line_chart = gr.Plot(label="πŸ“ˆ Line Chart")
load_ingredients_btn.click(fn=load_recipe_ingredients, outputs=ingredient_input)
analyze_btn.click(
fn=nutrition_analysis,
inputs=ingredient_input,
outputs=[nutrition_message, bar_chart, pie_chart, line_chart]
)
with gr.Tab("🍽️ Find Restaurants"):
with gr.Row():
with gr.Column():
dish_input = gr.Textbox(label="Enter Dish Name", placeholder="e.g. beef pho", interactive=True)
search_restaurant_btn = gr.Button("Find Restaurants", variant="primary", elem_id="action-btn")
open_maps_btn = gr.Button("Go to Google Maps", visible=True, variant="secondary", elem_id="open-maps-btn")
search_restaurant_btn.click(fn=search_and_show_link, inputs=dish_input, outputs=[open_maps_btn, open_maps_btn])
open_maps_btn.click(
fn=lambda url: url,
inputs=open_maps_btn,
outputs=None,
js="(url) => { if(url) window.open(url, '_blank'); }"
)
demo.css = """
.recipe-box {
padding: 20px;
border-radius: 10px;
background: #f9f9f9;
border: 1px solid #e0e0e0;
}
.dark .recipe-box {
background: #2a2a2a;
border-color: #444;
}
.gr-box {
margin-bottom: 20px;
}
#action-btn {
max-width: 220px;
margin: 10px auto;
font-weight: 600;
font-size: 16px;
border-radius: 8px;
}
#open-maps-btn {
max-width: 220px;
margin: 10px auto;
font-weight: 600;
font-size: 16px;
border-radius: 8px;
}
"""
if __name__ == "__main__":
demo.launch()