news2 / app.py
salomonsky's picture
Create app.py
d405c53 verified
# app.py
from flask import Flask, request, jsonify, send_from_directory, abort
from flask_cors import CORS
import os
import google.generativeai as genai
from gtts import gTTS
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips, CompositeAudioClip
from moviepy.config import change_settings
from PIL import Image
import tempfile
import base64
import uuid
import math
import requests
from pydub import AudioSegment # Asegúrate de que pydub esté importado para el fallback de audio silencioso
# Intenta configurar la ruta de FFmpeg si no está en el PATH
# Si FFmpeg no está en tu PATH, descomenta y ajusta la siguiente línea:
# change_settings({"FFMPEG_BINARY": "/usr/local/bin/ffmpeg"}) # Ruta de ejemplo en Linux/macOS
app = Flask(__name__)
CORS(app) # Habilita CORS para que tu frontend pueda comunicarse
# --- Configuración de API Keys ---
# Para Hugging Face Spaces, estas se cargarán automáticamente desde "Secrets" (o variables de entorno)
# El código busca 'GEMINI_API_KEY' o 'gemini_api' (en minúsculas, como has indicado)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", os.getenv("gemini_api", ""))
# Directorio para almacenar archivos temporales. Usamos /tmp para permisos en Docker.
TEMP_DIR = tempfile.mkdtemp(dir="/tmp")
print(f"Archivos temporales se guardarán en: {TEMP_DIR}")
# Configuración de Gemini para texto
genai.configure(api_key=GEMINI_API_KEY)
def enhance_text_with_gemini(text_input):
"""Mejora un texto usando Gemini Flash."""
try:
model = genai.GenerativeModel('gemini-2.0-flash') # Usar gemini-2.0-flash
chat_history = []
chat_history.append({
"role": "user",
"parts": [{"text": f"Mejora y profesionaliza el siguiente texto para un noticiero, hazlo conciso y atractivo, sin añadir introducciones ni despedidas: '{text_input}'"}]
})
payload = {"contents": chat_history}
api_url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={GEMINI_API_KEY}";
response = requests.post(api_url, json=payload)
response.raise_for_status() # Lanza un error para códigos de estado HTTP 4xx/5xx
result = response.json()
if result.get("candidates") and len(result["candidates"]) > 0 and \
result["candidates"][0].get("content") and result["candidates"][0]["content"].get("parts") and \
len(result["candidates"][0]["content"]["parts"]) > 0:
return result["candidates"][0]["content"]["parts"][0]["text"]
else:
print("Respuesta inesperada de Gemini para mejora de texto.")
return text_input
except requests.exceptions.RequestException as e:
print(f"Error en la llamada a la API de Gemini para texto: {e}")
return text_input
except Exception as e:
print(f"Error inesperado al mejorar texto con Gemini: {e}")
return text_input
def generate_image_with_gemini(prompt, aspect_ratio="16:9"):
"""
Genera una imagen usando Google Imagen (imagen-3.0-generate-002).
Devuelve la ruta del archivo de imagen generado.
"""
try:
api_url = f"https://generativelanguage.googleapis.com/v1beta/models/imagen-3.0-generate-002:predict?key={GEMINI_API_KEY}"
payload = {
"instances": {"prompt": prompt},
"parameters": {"sampleCount": 1}
}
response = requests.post(apiUrl, json=payload)
response.raise_for_status()
result = response.json()
if result.get("predictions") and len(result["predictions"]) > 0 and result["predictions"][0].get("bytesBase64Encoded"):
base64_image = result["predictions"][0]["bytesBase64Encoded"]
img_data = base64.b64decode(base64_image)
output_filename = os.path.join(TEMP_DIR, f"generated_image_{uuid.uuid4().hex}.png")
with open(output_filename, "wb") as f:
f.write(img_data)
img = Image.open(output_filename)
original_width, original_height = img.size
target_width, target_height = (1280, 720) if aspect_ratio == "16:9" else (720, 1280)
ratio_w = target_width / original_width
ratio_h = target_height / original_height
if ratio_w > ratio_h:
new_height = int(original_height * ratio_w)
img = img.resize((target_width, new_height), Image.Resampling.LANCZOS)
top = (new_height - target_height) / 2
bottom = top + target_height
img = img.crop((0, top, target_width, bottom))
else:
new_width = int(original_width * ratio_h)
img = img.resize((new_width, target_height), Image.Resampling.LANCZOS)
left = (new_width - target_width) / 2
right = left + target_width
img = img.crop((left, 0, right, target_height))
img.save(output_filename)
print(f"Imagen generada y ajustada en: {output_filename}")
return output_filename
else:
raise Exception("Respuesta inesperada de la API de Imagen: no se encontró base64_image.")
except requests.exceptions.RequestException as e:
print(f"Error en la llamada a la API de Imagen: {e}")
return None
except Exception as e:
print(f"Error inesperado al generar imagen: {e}")
return None
def process_uploaded_image(image_file, aspect_ratio="16:9"):
"""
Procesa una imagen subida, la guarda y la ajusta a la relación de aspecto.
"""
try:
# Flask FileStorage object tiene un .filename que es el nombre original,
# y el archivo en memoria o un temporal. Necesitamos guardar el contenido.
# Para evitar conflictos con rutas temporales de Multer/Flask,
# lo guardamos en nuestro TEMP_DIR con un nombre único y extensión original.
temp_original_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}_{image_file.filename}")
image_file.save(temp_original_path) # Guarda el contenido del FileStorage
img = Image.open(temp_original_path)
original_width, original_height = img.size
target_width, target_height = (1280, 720) if aspect_ratio == "16:9" else (720, 1280)
ratio_w = target_width / original_width
ratio_h = target_height / original_height
if ratio_w > ratio_h:
new_height = int(original_height * ratio_w)
img = img.resize((target_width, new_height), Image.Resampling.LANCZOS)
top = (new_height - target_height) / 2
bottom = top + target_height
img = img.crop((0, top, target_width, bottom))
else:
new_width = int(original_width * ratio_h)
img = img.resize((new_width, target_height), Image.Resampling.LANCZOS)
left = (new_width - target_width) / 2
right = left + target_width
img = img.crop((left, 0, right, target_height))
processed_filename = os.path.join(TEMP_DIR, f"processed_uploaded_image_{uuid.uuid4().hex}.png")
img.save(processed_filename)
# Eliminar el archivo temporal original creado
os.remove(temp_original_path)
print(f"Imagen subida procesada y ajustada: {processed_filename}")
return processed_filename
except Exception as e:
print(f"Error al procesar imagen subida: {e}")
return None
def text_to_speech(text, lang='es-MX', service='gtts', output_path=None):
"""
Convierte texto a voz y guarda el audio.
Soporte básico para gTTS.
"""
if not output_path:
output_path = os.path.join(TEMP_DIR, f"audio_{uuid.uuid4().hex}.mp3")
try:
if service == 'gtts':
gtts_lang = lang.split('-')[0]
tts = gTTS(text=text, lang=gtts_lang, slow=False)
tts.save(output_path)
print(f"Audio generado con gTTS: {output_path}")
return output_path
elif service == 'edge':
print(f"Simulación: Audio generado con Edge TTS para '{text}'. GTTS usado como fallback.")
gtts_lang = lang.split('-')[0]
tts = gTTS(text=text, lang=gtts_lang, slow=False)
tts.save(output_path)
return output_path
else:
raise ValueError("Servicio de voz no soportado.")
except Exception as e:
print(f"Error al generar TTS: {e}")
return None
def calculate_audio_duration(audio_path):
"""Calcula la duración de un archivo de audio usando moviepy."""
try:
audio_clip = AudioFileClip(audio_path)
duration = audio_clip.duration
audio_clip.close()
return duration
except Exception as e:
print(f"Error al calcular duración del audio {audio_path}: {e}")
return 0
@app.route('/test-voice', methods=['POST'])
def test_voice():
data = request.json
text = data.get('text', 'Hola, esta es una muestra de la voz por defecto.')
lang = data.get('lang', 'es-MX')
service = data.get('service', 'gtts')
audio_file_path = text_to_speech(text, lang, service)
if audio_file_path and os.path.exists(audio_file_path):
return send_from_directory(os.path.dirname(audio_file_path), os.path.basename(audio_file_path), as_attachment=True)
return jsonify({"error": "Failed to generate test voice"}), 500
@app.route('/generate-video', methods=['POST'])
def generate_video():
# Asegurarse de que /tmp/temp_uploads exista para almacenar los archivos subidos de Flask.
# Flask no usa Multer directamente; los archivos se reciben via request.files
upload_dir = '/tmp/temp_uploads'
if not os.path.exists(upload_dir):
os.makedirs(upload_dir)
script = request.form.get('script')
use_llm_for_script = request.form.get('useLlmForScript') == 'true'
aspect_ratio = request.form.get('aspectRatio')
image_duration_str = request.form.get('imageDuration')
image_duration = float(image_duration_str) if image_duration_str else 3.0
should_generate_images = request.form.get('shouldGenerateImages') == 'true'
image_gen_prompt = request.form.get('imageGenerationPrompt')
voice_service = request.form.get('voiceService')
voice_language = request.form.get('voiceLanguage')
# Validaciones iniciales
uploaded_files_data = request.files.getlist('photos')
# Pre-procesar los archivos subidos para almacenarlos antes de decidir si se usan o no
temp_uploaded_paths = []
for f in uploaded_files_data:
temp_path_for_processing = os.path.join(upload_dir, f"{uuid.uuid4().hex}_{f.filename}")
f.save(temp_path_for_processing)
temp_uploaded_paths.append(temp_path_for_processing)
if len(temp_uploaded_paths) == 0 and not should_generate_images:
return jsonify({"error": "Por favor, sube al menos una foto O marca la opción 'Generar imágenes con IA' y proporciona un prompt."}), 400
if len(temp_uploaded_paths) > 0 and should_generate_images:
return jsonify({"error": "Conflicto: Has subido fotos Y activado la generación de imágenes con IA. Por favor, elige solo una opción."}), 400
if should_generate_images and not image_gen_prompt:
return jsonify({"error": "Para generar imágenes con IA, debes proporcionar un prompt descriptivo."}), 400
if not script and not use_llm_for_script:
return jsonify({"error": "Por favor, escribe un guion para tu noticia O marca la opción 'Usar LLM para generar/mejorar el guion'."}), 400
# 2. Procesar Guion (Mejora con LLM)
final_script = script
if use_llm_for_script and script:
print("Mejorando guion con Gemini Flash...")
final_script = enhance_text_with_gemini(script)
print(f"Guion mejorado:\n{final_script}")
elif not final_script:
print("No se pudo obtener un guion final. Usando un placeholder.")
final_script = "Noticia importante: Hoy exploramos un tema fascinante. Manténgase al tanto para más actualizaciones."
script_segments = [s.strip() for s in final_script.split('\n') if s.strip()]
if not script_segments:
script_segments = ["Una noticia sin descripción. Más información a continuación."]
# 3. Procesar/Generar Imágenes
image_paths = []
if temp_uploaded_paths:
print(f"Procesando {len(temp_uploaded_paths)} imágenes subidas...")
if len(temp_uploaded_paths) > 10:
return jsonify({"error": "Solo se permite un máximo de 10 fotos."}), 400
for i, original_path in enumerate(temp_uploaded_paths):
processed_path = process_uploaded_image(original_path, aspect_ratio)
if processed_path:
image_paths.append(processed_path)
else:
print(f"Error al procesar imagen {original_path}, se saltará.")
elif should_generate_images:
print("Generando imágenes con IA (Imagen 3.0)...")
num_images_to_generate = min(len(script_segments), 10)
if num_images_to_generate == 0:
num_images_to_generate = 1
for i in range(num_images_to_generate):
current_prompt = f"{image_gen_prompt} - Parte {i+1} de la noticia." if num_images_to_generate > 1 else image_gen_prompt
generated_img_path = generate_image_with_gemini(current_prompt, aspect_ratio)
if generated_img_path:
image_paths.append(generated_img_path)
else:
print(f"Error al generar la imagen {i+1}, se intentará continuar.")
if not image_paths:
return jsonify({"error": "No se pudieron obtener imágenes válidas para el video (ni subidas ni generadas)."}), 500
num_effective_segments = len(image_paths)
if len(script_segments) < num_effective_segments:
while len(script_segments) < num_effective_segments:
script_segments.append("Más información sobre esta imagen.")
elif len(script_segments) > num_effective_segments:
script_segments = script_segments[:num_effective_segments]
# 4. Generar Audio (TTS) y Clips de Video
audio_clips = []
video_clips = []
print("Generando audio con Text-to-Speech (GTTS) y preparando clips de video...")
for i in range(num_effective_segments):
segment_text = script_segments[i]
image_path = image_paths[i]
audio_path = text_to_speech(segment_text, voice_language, voice_service)
audio_dur = calculate_audio_duration(audio_path) if audio_path else image_duration
if audio_dur < 1.0:
audio_dur = max(audio_dur, image_duration)
if audio_path and os.path.exists(audio_path):
audio_clip = AudioFileClip(audio_path)
audio_clip = audio_clip.set_duration(audio_dur)
audio_clips.append(audio_clip)
else:
# Crear un clip de audio silencioso si la generación falla
silent_audio_path = os.path.join(TEMP_DIR, "silent_audio.mp3")
if not os.path.exists(silent_audio_path):
# Generar un archivo de silencio real si no existe
# Asegúrate de que pydub esté importado
silent_audio = AudioSegment.silent(duration=5000, frame_rate=44100) # 5 segundos de silencio
silent_audio.export(silent_audio_path, format="mp3")
silent_clip = AudioFileClip(silent_audio_path)
audio_clips.append(silent_clip.set_duration(audio_dur))
image_clip = ImageClip(image_path)
image_clip = image_clip.set_duration(audio_dur)
video_clips.append(image_clip)
print(f"Segmento {i+1}: Imagen {image_path}, Audio Duración: {audio_dur:.2f}s")
if not video_clips or not audio_clips:
return jsonify({"error": "No se pudieron crear clips de video o audio válidos."}), 500
# 5. Composición Final del Video
print("Componiendo el video final... esto puede llevar unos minutos.")
final_video_clip = concatenate_videoclips(video_clips, method="compose")
final_audio_composite = CompositeAudioClip(audio_clips)
final_audio_composite = final_audio_composite.set_duration(final_video_clip.duration)
final_video_clip = final_video_clip.set_audio(final_audio_composite)
min_total_duration = 3.0
if final_video_clip.duration < min_total_duration:
print(f"Advertencia: El video es más corto que la duración mínima ({min_total_duration}s). Actual: {final_video_clip.duration:.2f}s. Extendiéndolo.")
pass
output_video_path = os.path.join(TEMP_DIR, f"noticia_ia_{uuid.uuid4().hex}.mp4")
final_video_clip.write_videofile(
output_video_path,
fps=24,
codec="libx264",
audio_codec="aac",
bitrate="5000k",
audio_bitrate="192k",
threads=os.cpu_count()
)
for clip in video_clips:
clip.close()
for clip in audio_clips:
clip.close()
print(f"Video final generado en: {output_video_path}")
return send_from_directory(os.path.dirname(output_video_path), os.path.basename(output_video_path), as_attachment=True)
@app.after_request
def cleanup_temp_files(response):
"""Limpia los archivos temporales después de enviar la respuesta."""
print(f"Intentando limpiar archivos temporales en {TEMP_DIR}...")
for filename in os.listdir(TEMP_DIR):
file_path = os.path.join(TEMP_DIR, filename)
if os.path.isfile(file_path):
try:
os.unlink(file_path)
print(f"Eliminado: {file_path}")
except Exception as e:
print(f"Error al eliminar el archivo temporal {file_path}: {e}")
# Limpiar también el directorio temporal de Multer si existe
multer_upload_dir = '/tmp/temp_uploads'
if os.path.exists(multer_upload_dir):
print(f"Limpiando directorio de Multer: {multer_upload_dir}")
for filename in os.listdir(multer_upload_dir):
file_path = os.path.join(multer_upload_dir, filename)
if os.path.isfile(file_path):
try:
os.unlink(file_path)
print(f"Eliminado: {file_path}")
except Exception as e:
print(f"Error al eliminar archivo de Multer {file_path}: {e}")
return response
# Eliminamos app.run() aquí, Gunicorn lo ejecutará.
# El bloque if __name__ == '__main__': es importante para ejecutar solo cuando el script se corre directamente.
# Cuando Gunicorn lo ejecuta, simplemente importa la instancia 'app'.
if __name__ == '__main__':
# Asegura que los directorios temporales existan al iniciar el servidor
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
multer_upload_dir = '/tmp/temp_uploads'
if not os.path.exists(multer_upload_dir):
os.makedirs(multer_upload_dir)
# Crea un archivo de audio silencioso para fallbacks de TTS si no existe
silent_audio_path = os.path.join(TEMP_DIR, "silent_audio.mp3")
if not os.path.exists(silent_audio_path):
try:
# Asegúrate de que pydub esté importado
silent_audio = AudioSegment.silent(duration=5000, frame_rate=44100) # 5 segundos de silencio
silent_audio.export(silent_audio_path, format="mp3")
print(f"Archivo de audio silencioso dummy creado en: {silent_audio_path}")
except Exception as e:
print(f"No se pudo crear el archivo de audio silencioso dummy: {e}. Asegúrate de que pydub esté instalado.")
# Esta línea ahora solo se ejecutará si corres app.py directamente, no con Gunicorn.
# Es útil para depuración local si no usas Gunicorn en desarrollo.
# app.run(debug=False, host='0.0.0.0', port=5000)
print("La aplicación Flask se ejecutará con Gunicorn. Este script ya no inicia el servidor directamente.")