# app.py from flask import Flask, request, jsonify, send_from_directory, abort from flask_cors import CORS import os import google.generativeai as genai from gtts import gTTS from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips, CompositeAudioClip from moviepy.config import change_settings from PIL import Image import tempfile import base64 import uuid import math import requests from pydub import AudioSegment # Asegúrate de que pydub esté importado para el fallback de audio silencioso # Intenta configurar la ruta de FFmpeg si no está en el PATH # Si FFmpeg no está en tu PATH, descomenta y ajusta la siguiente línea: # change_settings({"FFMPEG_BINARY": "/usr/local/bin/ffmpeg"}) # Ruta de ejemplo en Linux/macOS app = Flask(__name__) CORS(app) # Habilita CORS para que tu frontend pueda comunicarse # --- Configuración de API Keys --- # Para Hugging Face Spaces, estas se cargarán automáticamente desde "Secrets" (o variables de entorno) # El código busca 'GEMINI_API_KEY' o 'gemini_api' (en minúsculas, como has indicado) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", os.getenv("gemini_api", "")) # Directorio para almacenar archivos temporales. Usamos /tmp para permisos en Docker. TEMP_DIR = tempfile.mkdtemp(dir="/tmp") print(f"Archivos temporales se guardarán en: {TEMP_DIR}") # Configuración de Gemini para texto genai.configure(api_key=GEMINI_API_KEY) def enhance_text_with_gemini(text_input): """Mejora un texto usando Gemini Flash.""" try: model = genai.GenerativeModel('gemini-2.0-flash') # Usar gemini-2.0-flash chat_history = [] chat_history.append({ "role": "user", "parts": [{"text": f"Mejora y profesionaliza el siguiente texto para un noticiero, hazlo conciso y atractivo, sin añadir introducciones ni despedidas: '{text_input}'"}] }) payload = {"contents": chat_history} api_url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={GEMINI_API_KEY}"; response = requests.post(api_url, json=payload) response.raise_for_status() # Lanza un error para códigos de estado HTTP 4xx/5xx result = response.json() if result.get("candidates") and len(result["candidates"]) > 0 and \ result["candidates"][0].get("content") and result["candidates"][0]["content"].get("parts") and \ len(result["candidates"][0]["content"]["parts"]) > 0: return result["candidates"][0]["content"]["parts"][0]["text"] else: print("Respuesta inesperada de Gemini para mejora de texto.") return text_input except requests.exceptions.RequestException as e: print(f"Error en la llamada a la API de Gemini para texto: {e}") return text_input except Exception as e: print(f"Error inesperado al mejorar texto con Gemini: {e}") return text_input def generate_image_with_gemini(prompt, aspect_ratio="16:9"): """ Genera una imagen usando Google Imagen (imagen-3.0-generate-002). Devuelve la ruta del archivo de imagen generado. """ try: api_url = f"https://generativelanguage.googleapis.com/v1beta/models/imagen-3.0-generate-002:predict?key={GEMINI_API_KEY}" payload = { "instances": {"prompt": prompt}, "parameters": {"sampleCount": 1} } response = requests.post(apiUrl, json=payload) response.raise_for_status() result = response.json() if result.get("predictions") and len(result["predictions"]) > 0 and result["predictions"][0].get("bytesBase64Encoded"): base64_image = result["predictions"][0]["bytesBase64Encoded"] img_data = base64.b64decode(base64_image) output_filename = os.path.join(TEMP_DIR, f"generated_image_{uuid.uuid4().hex}.png") with open(output_filename, "wb") as f: f.write(img_data) img = Image.open(output_filename) original_width, original_height = img.size target_width, target_height = (1280, 720) if aspect_ratio == "16:9" else (720, 1280) ratio_w = target_width / original_width ratio_h = target_height / original_height if ratio_w > ratio_h: new_height = int(original_height * ratio_w) img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) top = (new_height - target_height) / 2 bottom = top + target_height img = img.crop((0, top, target_width, bottom)) else: new_width = int(original_width * ratio_h) img = img.resize((new_width, target_height), Image.Resampling.LANCZOS) left = (new_width - target_width) / 2 right = left + target_width img = img.crop((left, 0, right, target_height)) img.save(output_filename) print(f"Imagen generada y ajustada en: {output_filename}") return output_filename else: raise Exception("Respuesta inesperada de la API de Imagen: no se encontró base64_image.") except requests.exceptions.RequestException as e: print(f"Error en la llamada a la API de Imagen: {e}") return None except Exception as e: print(f"Error inesperado al generar imagen: {e}") return None def process_uploaded_image(image_file, aspect_ratio="16:9"): """ Procesa una imagen subida, la guarda y la ajusta a la relación de aspecto. """ try: # Flask FileStorage object tiene un .filename que es el nombre original, # y el archivo en memoria o un temporal. Necesitamos guardar el contenido. # Para evitar conflictos con rutas temporales de Multer/Flask, # lo guardamos en nuestro TEMP_DIR con un nombre único y extensión original. temp_original_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}_{image_file.filename}") image_file.save(temp_original_path) # Guarda el contenido del FileStorage img = Image.open(temp_original_path) original_width, original_height = img.size target_width, target_height = (1280, 720) if aspect_ratio == "16:9" else (720, 1280) ratio_w = target_width / original_width ratio_h = target_height / original_height if ratio_w > ratio_h: new_height = int(original_height * ratio_w) img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) top = (new_height - target_height) / 2 bottom = top + target_height img = img.crop((0, top, target_width, bottom)) else: new_width = int(original_width * ratio_h) img = img.resize((new_width, target_height), Image.Resampling.LANCZOS) left = (new_width - target_width) / 2 right = left + target_width img = img.crop((left, 0, right, target_height)) processed_filename = os.path.join(TEMP_DIR, f"processed_uploaded_image_{uuid.uuid4().hex}.png") img.save(processed_filename) # Eliminar el archivo temporal original creado os.remove(temp_original_path) print(f"Imagen subida procesada y ajustada: {processed_filename}") return processed_filename except Exception as e: print(f"Error al procesar imagen subida: {e}") return None def text_to_speech(text, lang='es-MX', service='gtts', output_path=None): """ Convierte texto a voz y guarda el audio. Soporte básico para gTTS. """ if not output_path: output_path = os.path.join(TEMP_DIR, f"audio_{uuid.uuid4().hex}.mp3") try: if service == 'gtts': gtts_lang = lang.split('-')[0] tts = gTTS(text=text, lang=gtts_lang, slow=False) tts.save(output_path) print(f"Audio generado con gTTS: {output_path}") return output_path elif service == 'edge': print(f"Simulación: Audio generado con Edge TTS para '{text}'. GTTS usado como fallback.") gtts_lang = lang.split('-')[0] tts = gTTS(text=text, lang=gtts_lang, slow=False) tts.save(output_path) return output_path else: raise ValueError("Servicio de voz no soportado.") except Exception as e: print(f"Error al generar TTS: {e}") return None def calculate_audio_duration(audio_path): """Calcula la duración de un archivo de audio usando moviepy.""" try: audio_clip = AudioFileClip(audio_path) duration = audio_clip.duration audio_clip.close() return duration except Exception as e: print(f"Error al calcular duración del audio {audio_path}: {e}") return 0 @app.route('/test-voice', methods=['POST']) def test_voice(): data = request.json text = data.get('text', 'Hola, esta es una muestra de la voz por defecto.') lang = data.get('lang', 'es-MX') service = data.get('service', 'gtts') audio_file_path = text_to_speech(text, lang, service) if audio_file_path and os.path.exists(audio_file_path): return send_from_directory(os.path.dirname(audio_file_path), os.path.basename(audio_file_path), as_attachment=True) return jsonify({"error": "Failed to generate test voice"}), 500 @app.route('/generate-video', methods=['POST']) def generate_video(): # Asegurarse de que /tmp/temp_uploads exista para almacenar los archivos subidos de Flask. # Flask no usa Multer directamente; los archivos se reciben via request.files upload_dir = '/tmp/temp_uploads' if not os.path.exists(upload_dir): os.makedirs(upload_dir) script = request.form.get('script') use_llm_for_script = request.form.get('useLlmForScript') == 'true' aspect_ratio = request.form.get('aspectRatio') image_duration_str = request.form.get('imageDuration') image_duration = float(image_duration_str) if image_duration_str else 3.0 should_generate_images = request.form.get('shouldGenerateImages') == 'true' image_gen_prompt = request.form.get('imageGenerationPrompt') voice_service = request.form.get('voiceService') voice_language = request.form.get('voiceLanguage') # Validaciones iniciales uploaded_files_data = request.files.getlist('photos') # Pre-procesar los archivos subidos para almacenarlos antes de decidir si se usan o no temp_uploaded_paths = [] for f in uploaded_files_data: temp_path_for_processing = os.path.join(upload_dir, f"{uuid.uuid4().hex}_{f.filename}") f.save(temp_path_for_processing) temp_uploaded_paths.append(temp_path_for_processing) if len(temp_uploaded_paths) == 0 and not should_generate_images: return jsonify({"error": "Por favor, sube al menos una foto O marca la opción 'Generar imágenes con IA' y proporciona un prompt."}), 400 if len(temp_uploaded_paths) > 0 and should_generate_images: return jsonify({"error": "Conflicto: Has subido fotos Y activado la generación de imágenes con IA. Por favor, elige solo una opción."}), 400 if should_generate_images and not image_gen_prompt: return jsonify({"error": "Para generar imágenes con IA, debes proporcionar un prompt descriptivo."}), 400 if not script and not use_llm_for_script: return jsonify({"error": "Por favor, escribe un guion para tu noticia O marca la opción 'Usar LLM para generar/mejorar el guion'."}), 400 # 2. Procesar Guion (Mejora con LLM) final_script = script if use_llm_for_script and script: print("Mejorando guion con Gemini Flash...") final_script = enhance_text_with_gemini(script) print(f"Guion mejorado:\n{final_script}") elif not final_script: print("No se pudo obtener un guion final. Usando un placeholder.") final_script = "Noticia importante: Hoy exploramos un tema fascinante. Manténgase al tanto para más actualizaciones." script_segments = [s.strip() for s in final_script.split('\n') if s.strip()] if not script_segments: script_segments = ["Una noticia sin descripción. Más información a continuación."] # 3. Procesar/Generar Imágenes image_paths = [] if temp_uploaded_paths: print(f"Procesando {len(temp_uploaded_paths)} imágenes subidas...") if len(temp_uploaded_paths) > 10: return jsonify({"error": "Solo se permite un máximo de 10 fotos."}), 400 for i, original_path in enumerate(temp_uploaded_paths): processed_path = process_uploaded_image(original_path, aspect_ratio) if processed_path: image_paths.append(processed_path) else: print(f"Error al procesar imagen {original_path}, se saltará.") elif should_generate_images: print("Generando imágenes con IA (Imagen 3.0)...") num_images_to_generate = min(len(script_segments), 10) if num_images_to_generate == 0: num_images_to_generate = 1 for i in range(num_images_to_generate): current_prompt = f"{image_gen_prompt} - Parte {i+1} de la noticia." if num_images_to_generate > 1 else image_gen_prompt generated_img_path = generate_image_with_gemini(current_prompt, aspect_ratio) if generated_img_path: image_paths.append(generated_img_path) else: print(f"Error al generar la imagen {i+1}, se intentará continuar.") if not image_paths: return jsonify({"error": "No se pudieron obtener imágenes válidas para el video (ni subidas ni generadas)."}), 500 num_effective_segments = len(image_paths) if len(script_segments) < num_effective_segments: while len(script_segments) < num_effective_segments: script_segments.append("Más información sobre esta imagen.") elif len(script_segments) > num_effective_segments: script_segments = script_segments[:num_effective_segments] # 4. Generar Audio (TTS) y Clips de Video audio_clips = [] video_clips = [] print("Generando audio con Text-to-Speech (GTTS) y preparando clips de video...") for i in range(num_effective_segments): segment_text = script_segments[i] image_path = image_paths[i] audio_path = text_to_speech(segment_text, voice_language, voice_service) audio_dur = calculate_audio_duration(audio_path) if audio_path else image_duration if audio_dur < 1.0: audio_dur = max(audio_dur, image_duration) if audio_path and os.path.exists(audio_path): audio_clip = AudioFileClip(audio_path) audio_clip = audio_clip.set_duration(audio_dur) audio_clips.append(audio_clip) else: # Crear un clip de audio silencioso si la generación falla silent_audio_path = os.path.join(TEMP_DIR, "silent_audio.mp3") if not os.path.exists(silent_audio_path): # Generar un archivo de silencio real si no existe # Asegúrate de que pydub esté importado silent_audio = AudioSegment.silent(duration=5000, frame_rate=44100) # 5 segundos de silencio silent_audio.export(silent_audio_path, format="mp3") silent_clip = AudioFileClip(silent_audio_path) audio_clips.append(silent_clip.set_duration(audio_dur)) image_clip = ImageClip(image_path) image_clip = image_clip.set_duration(audio_dur) video_clips.append(image_clip) print(f"Segmento {i+1}: Imagen {image_path}, Audio Duración: {audio_dur:.2f}s") if not video_clips or not audio_clips: return jsonify({"error": "No se pudieron crear clips de video o audio válidos."}), 500 # 5. Composición Final del Video print("Componiendo el video final... esto puede llevar unos minutos.") final_video_clip = concatenate_videoclips(video_clips, method="compose") final_audio_composite = CompositeAudioClip(audio_clips) final_audio_composite = final_audio_composite.set_duration(final_video_clip.duration) final_video_clip = final_video_clip.set_audio(final_audio_composite) min_total_duration = 3.0 if final_video_clip.duration < min_total_duration: print(f"Advertencia: El video es más corto que la duración mínima ({min_total_duration}s). Actual: {final_video_clip.duration:.2f}s. Extendiéndolo.") pass output_video_path = os.path.join(TEMP_DIR, f"noticia_ia_{uuid.uuid4().hex}.mp4") final_video_clip.write_videofile( output_video_path, fps=24, codec="libx264", audio_codec="aac", bitrate="5000k", audio_bitrate="192k", threads=os.cpu_count() ) for clip in video_clips: clip.close() for clip in audio_clips: clip.close() print(f"Video final generado en: {output_video_path}") return send_from_directory(os.path.dirname(output_video_path), os.path.basename(output_video_path), as_attachment=True) @app.after_request def cleanup_temp_files(response): """Limpia los archivos temporales después de enviar la respuesta.""" print(f"Intentando limpiar archivos temporales en {TEMP_DIR}...") for filename in os.listdir(TEMP_DIR): file_path = os.path.join(TEMP_DIR, filename) if os.path.isfile(file_path): try: os.unlink(file_path) print(f"Eliminado: {file_path}") except Exception as e: print(f"Error al eliminar el archivo temporal {file_path}: {e}") # Limpiar también el directorio temporal de Multer si existe multer_upload_dir = '/tmp/temp_uploads' if os.path.exists(multer_upload_dir): print(f"Limpiando directorio de Multer: {multer_upload_dir}") for filename in os.listdir(multer_upload_dir): file_path = os.path.join(multer_upload_dir, filename) if os.path.isfile(file_path): try: os.unlink(file_path) print(f"Eliminado: {file_path}") except Exception as e: print(f"Error al eliminar archivo de Multer {file_path}: {e}") return response # Eliminamos app.run() aquí, Gunicorn lo ejecutará. # El bloque if __name__ == '__main__': es importante para ejecutar solo cuando el script se corre directamente. # Cuando Gunicorn lo ejecuta, simplemente importa la instancia 'app'. if __name__ == '__main__': # Asegura que los directorios temporales existan al iniciar el servidor if not os.path.exists(TEMP_DIR): os.makedirs(TEMP_DIR) multer_upload_dir = '/tmp/temp_uploads' if not os.path.exists(multer_upload_dir): os.makedirs(multer_upload_dir) # Crea un archivo de audio silencioso para fallbacks de TTS si no existe silent_audio_path = os.path.join(TEMP_DIR, "silent_audio.mp3") if not os.path.exists(silent_audio_path): try: # Asegúrate de que pydub esté importado silent_audio = AudioSegment.silent(duration=5000, frame_rate=44100) # 5 segundos de silencio silent_audio.export(silent_audio_path, format="mp3") print(f"Archivo de audio silencioso dummy creado en: {silent_audio_path}") except Exception as e: print(f"No se pudo crear el archivo de audio silencioso dummy: {e}. Asegúrate de que pydub esté instalado.") # Esta línea ahora solo se ejecutará si corres app.py directamente, no con Gunicorn. # Es útil para depuración local si no usas Gunicorn en desarrollo. # app.run(debug=False, host='0.0.0.0', port=5000) print("La aplicación Flask se ejecutará con Gunicorn. Este script ya no inicia el servidor directamente.")