Spaces:
Runtime error
Runtime error
from flask import Flask, render_template, request, jsonify, send_file, send_from_directory | |
from flask_cors import CORS | |
import os | |
import logging | |
import sys | |
import shutil | |
import uuid | |
from datetime import datetime | |
from werkzeug.utils import secure_filename | |
from utils.video_processing import ( | |
verify_output_video, | |
download_video_with_fallback, | |
inference_preview, | |
wav2lip_animate | |
) | |
from utils.tts_processing import generate_tts_with_fallback | |
import threading | |
import time | |
import json | |
import cv2 | |
import numpy as np | |
log_directory = 'logs' | |
if not os.path.exists(log_directory): | |
os.makedirs(log_directory) | |
log_file = os.path.join(log_directory, f'app_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s [%(levelname)s] %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file, encoding='utf-8'), | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
for handler in logger.handlers: | |
handler.setLevel(logging.INFO) | |
app = Flask(__name__, static_folder='static') | |
CORS(app) | |
app.config['UPLOAD_FOLDER'] = 'uploads' | |
app.config['TEMP_FOLDER'] = 'temp_previews' | |
app.config['AUDIO_FOLDER'] = 'temp_audio' | |
app.config['WAV2LIP_FOLDER'] = 'temp_wav2lip' | |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 | |
required_directories = [ | |
app.config['UPLOAD_FOLDER'], | |
app.config['TEMP_FOLDER'], | |
app.config['AUDIO_FOLDER'], | |
app.config['WAV2LIP_FOLDER'], | |
'wav2lip_temp' | |
] | |
for directory in required_directories: | |
os.makedirs(directory, exist_ok=True) | |
def cleanup_temp_directories(): | |
directories = [ | |
app.config['TEMP_FOLDER'], | |
app.config['AUDIO_FOLDER'], | |
app.config['WAV2LIP_FOLDER'], | |
'wav2lip_temp' | |
] | |
for directory in directories: | |
if os.path.exists(directory): | |
try: | |
shutil.rmtree(directory) | |
os.makedirs(directory) | |
logger.info(f"Directorio {directory} limpiado y recreado") | |
except Exception as e: | |
logger.error(f"Error al limpiar directorio {directory}: {str(e)}") | |
def cleanup_temp_files(file_path, delay=300): | |
def delayed_cleanup(): | |
try: | |
time.sleep(delay) | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
except Exception as e: | |
logger.error(f"Error al limpiar archivo temporal {file_path}: {str(e)}") | |
thread = threading.Thread(target=delayed_cleanup) | |
thread.daemon = True | |
thread.start() | |
def index(): | |
return render_template('index.html') | |
def get_video_info_route(): | |
try: | |
url = request.json['url'] | |
info = get_video_info(url) | |
return jsonify(info) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 400 | |
async def generate_tts_route(): | |
try: | |
data = request.json | |
text = data['text'] | |
voice = data.get('voice', 'es-MX-JorgeNeural') | |
if not text: | |
return jsonify({'error': 'Texto no proporcionado'}), 400 | |
success, audio_path, error = await generate_tts_with_fallback(text, voice) | |
if not success: | |
return jsonify({'error': error}), 400 | |
audio_id = os.path.splitext(os.path.basename(audio_path))[0] | |
if not os.path.exists(audio_path): | |
return jsonify({'error': 'No se gener贸 el archivo de audio'}), 500 | |
return jsonify({ | |
'success': True, | |
'audio_id': audio_id | |
}) | |
except Exception as e: | |
logger.error(f"Error en generate_tts_route: {str(e)}") | |
return jsonify({'error': str(e)}), 400 | |
def get_audio(audio_id): | |
audio_path = os.path.join(app.config['AUDIO_FOLDER'], f"{audio_id}.mp3") | |
if os.path.exists(audio_path): | |
return send_file(audio_path, mimetype='audio/mpeg') | |
return jsonify({'error': 'Audio no encontrado'}), 404 | |
def serve_static(filename): | |
return send_from_directory(app.static_folder, filename) | |
async def generate_wav2lip_route(): | |
try: | |
audio_id = request.form.get('audio_id') | |
if not audio_id: | |
return jsonify({'error': 'ID de audio no proporcionado'}), 400 | |
audio_path = os.path.join('temp_audio', f'{audio_id}.mp3') | |
if not os.path.exists(audio_path): | |
return jsonify({'error': 'Audio no encontrado'}), 404 | |
image_path = None | |
if 'image' in request.files: | |
file = request.files['image'] | |
if file: | |
filename = secure_filename(file.filename) | |
image_path = os.path.join('uploads', filename) | |
os.makedirs('uploads', exist_ok=True) | |
file.save(image_path) | |
elif 'default_image' in request.form: | |
default_type = request.form['default_image'] | |
if default_type in ['male', 'female']: | |
image_path = os.path.abspath(os.path.join('static', f'{default_type}.png')) | |
else: | |
return jsonify({'error': 'Tipo de imagen por defecto no v谩lido'}), 400 | |
else: | |
return jsonify({'error': 'Imagen no proporcionada'}), 400 | |
if not os.path.exists(image_path): | |
return jsonify({'error': f'Imagen no encontrada: {image_path}'}), 404 | |
use_gpu = request.form.get('use_gpu', 'false').lower() == 'true' | |
session_id = wav2lip_animate(image_path, audio_path, use_gpu) | |
cleanup_temp_files(os.path.join(app.config['WAV2LIP_FOLDER'], f'{session_id}.webm')) | |
return jsonify({ | |
'success': True, | |
'wav2lip_id': session_id | |
}) | |
except Exception as e: | |
logger.error(f"Error en generate_wav2lip_route: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def process_video(): | |
try: | |
data = request.get_json() | |
start_time = float(data.get('start_time', 0)) | |
end_time = float(data.get('end_time', 0)) | |
video_url = data.get('video_url') | |
logger.info(f"Procesando video - URL: {video_url}") | |
logger.info(f"Tiempo - inicio: {start_time}, fin: {end_time}") | |
preview_id = str(uuid.uuid4()) | |
preview_path = os.path.join(app.config['TEMP_FOLDER'], f"{preview_id}.webm") | |
video_path = os.path.join(app.config['TEMP_FOLDER'], f"{preview_id}_original.mp4") | |
logger.info(f"ID de preview: {preview_id}") | |
logger.info(f"Ruta de preview: {preview_path}") | |
logger.info(f"Ruta de video original: {video_path}") | |
os.makedirs(app.config['TEMP_FOLDER'], exist_ok=True) | |
success, error = download_video_with_fallback(video_url, video_path) | |
if not success: | |
error_msg = f"Error al descargar video: {error}" | |
logger.error(error_msg) | |
return jsonify({ | |
'success': False, | |
'error': error_msg | |
}), 500 | |
logger.info("Video descargado correctamente") | |
if not verify_output_video(video_path): | |
error_msg = "El video descargado no es v谩lido" | |
logger.error(error_msg) | |
return jsonify({ | |
'success': False, | |
'error': error_msg | |
}), 500 | |
success, output_path = inference_preview( | |
video_path, | |
start_time, | |
end_time, | |
preview_path | |
) | |
if not success: | |
error_msg = f"Error al crear preview: {output_path}" | |
logger.error(error_msg) | |
return jsonify({ | |
'success': False, | |
'error': error_msg | |
}), 500 | |
logger.info("Preview creado exitosamente") | |
if not verify_output_video(preview_path): | |
error_msg = "El preview generado no es v谩lido" | |
logger.error(error_msg) | |
return jsonify({ | |
'success': False, | |
'error': error_msg | |
}), 500 | |
try: | |
if os.path.exists(video_path): | |
os.remove(video_path) | |
logger.info("Video temporal eliminado") | |
except Exception as e: | |
logger.error(f"Error al eliminar video temporal: {str(e)}") | |
return jsonify({ | |
'success': True, | |
'preview_id': preview_id | |
}) | |
except Exception as e: | |
error_msg = f"Error en process_video: {str(e)}" | |
logger.error(error_msg) | |
return jsonify({ | |
'success': False, | |
'error': error_msg | |
}), 500 | |
def get_animation(animation_id): | |
try: | |
permanent_path = os.path.join('static', 'animations', f'{animation_id}.webm') | |
if os.path.exists(permanent_path): | |
return send_file(permanent_path, mimetype='video/webm') | |
temp_path = os.path.join('wav2lip_temp', animation_id, 'results', 'result_voice.webm') | |
if os.path.exists(temp_path): | |
return send_file(temp_path, mimetype='video/webm') | |
return jsonify({'error': 'Animaci贸n no encontrada'}), 404 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def download_animation(animation_id): | |
try: | |
permanent_path = os.path.join('static', 'animations', f'{animation_id}.webm') | |
if os.path.exists(permanent_path): | |
return send_file(permanent_path, | |
mimetype='video/webm', | |
as_attachment=True, | |
download_name=f'wav2lip_animation_{animation_id}.webm') | |
temp_path = os.path.join('wav2lip_temp', animation_id, 'results', 'result_voice.webm') | |
if os.path.exists(temp_path): | |
return send_file(temp_path, | |
mimetype='video/webm', | |
as_attachment=True, | |
download_name=f'wav2lip_animation_{animation_id}.webm') | |
return jsonify({'error': 'Animaci贸n no encontrada'}), 404 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def get_preview(preview_id): | |
preview_path = os.path.join(app.config['TEMP_FOLDER'], f'{preview_id}.webm') | |
logger.info(f"Solicitando preview: {preview_path}") | |
if not os.path.exists(preview_path): | |
error = f"Preview no encontrado: {preview_path}" | |
logger.error(error) | |
return jsonify({'error': error}), 404 | |
try: | |
file_size = os.path.getsize(preview_path) | |
logger.info(f"Tama帽o del preview: {file_size} bytes") | |
if file_size < 1024: | |
error = f"Archivo de preview demasiado peque帽o: {file_size} bytes" | |
logger.error(error) | |
return jsonify({'error': error}), 500 | |
if not verify_output_video(preview_path): | |
error = "El archivo de preview no es v谩lido" | |
logger.error(error) | |
return jsonify({'error': error}), 500 | |
response = send_file( | |
preview_path, | |
mimetype='video/webm', | |
as_attachment=False, | |
download_name=f'preview_{preview_id}.webm', | |
conditional=True | |
) | |
response.headers['Content-Length'] = file_size | |
response.headers['Accept-Ranges'] = 'bytes' | |
logger.info(f"Enviando preview: {preview_path}") | |
return response | |
except Exception as e: | |
error = f"Error al enviar preview: {str(e)}" | |
logger.error(error) | |
return jsonify({'error': error}), 500 | |
def download_video_route(preview_id): | |
preview_path = os.path.join(app.config['TEMP_FOLDER'], f'{preview_id}.webm') | |
if os.path.exists(preview_path): | |
return send_file( | |
preview_path, | |
mimetype='video/webm', | |
as_attachment=True, | |
download_name=f'reaction_video_{preview_id}.webm' | |
) | |
return jsonify({'error': 'Video no encontrado'}), 404 | |
def check_progress(): | |
try: | |
progress_file = os.path.join('wav2lip_temp', 'progress.txt') | |
if not os.path.exists(progress_file): | |
return jsonify({'progress': 0, 'status': 'Iniciando...'}) | |
with open(progress_file, 'r', encoding='utf-8') as f: | |
try: | |
data = json.load(f) | |
return jsonify({ | |
'progress': data.get('progress', 0), | |
'status': data.get('status', 'Procesando...') | |
}) | |
except json.JSONDecodeError: | |
# Para compatibilidad con versiones anteriores | |
progress = int(f.read().strip() or 0) | |
return jsonify({ | |
'progress': progress, | |
'status': 'Procesando...' | |
}) | |
except Exception as e: | |
logger.error(f"Error al verificar progreso: {str(e)}") | |
return jsonify({'progress': 0, 'status': 'Error al verificar progreso'}) | |
if __name__ == '__main__': | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port) |