from os import listdir, path import numpy as np import scipy, cv2, os, sys import json, subprocess, random, string from tqdm import tqdm from glob import glob import torch, face_detection from models import Wav2Lip import platform from audio_processing import audio from PIL import Image import locale import warnings import unicodedata import ffmpeg import logging from pathlib import Path # Variables globales device = 'cuda' if torch.cuda.is_available() else 'cpu' face_det_batch_size = 4 wav2lip_batch_size = 32 box = [-1, -1, -1, -1] static = False pads = [0, 10, 0, 0] nosmooth = False img_size = 96 warnings.filterwarnings('ignore') # Forzar codificación UTF-8 y configurar locale if sys.platform.startswith('win'): try: if sys.version_info >= (3, 7): locale.setlocale(locale.LC_ALL, '.UTF-8') else: locale.setlocale(locale.LC_ALL, 'Spanish_Spain.1252') except locale.Error: print("No se pudo configurar el locale, usando valores por defecto") # Configurar codificación para Windows try: import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetConsoleCP(65001) kernel32.SetConsoleOutputCP(65001) except: print("No se pudo configurar la codificación de la consola") logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def load_image(image_path): try: image_path = os.path.abspath(image_path) image_path = image_path.encode('utf-8').decode('utf-8') if not os.path.exists(image_path): raise ValueError(f'Archivo no existe: {image_path}') img = None try: img = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8), cv2.IMREAD_COLOR) except Exception as e: logger.error(f"Error al usar imdecode: {str(e)}") if img is None: try: with open(image_path, 'rb') as f: img_array = np.asarray(bytearray(f.read()), dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) except Exception as e: logger.error(f"Error al leer bytes: {str(e)}") if img is None: try: img = cv2.imread(str(Path(image_path))) except Exception as e: logger.error(f"Error al usar imread: {str(e)}") if img is None: raise ValueError(f'No se pudo cargar la imagen: {image_path}') if len(img.shape) == 2: img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) elif img.shape[2] == 4: img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) return img except Exception as e: logger.error(f"Error al cargar imagen {image_path}: {str(e)}") raise def get_safe_path(path): # Normalizar la ruta path = os.path.abspath(path) # Convertir caracteres especiales a sus equivalentes ASCII path = ''.join(c for c in unicodedata.normalize('NFD', path) if unicodedata.category(c) != 'Mn') # Asegurar que las barras sean consistentes y usar ruta relativa path = path.replace('\\', '/') # Convertir a ruta relativa si es posible try: path = os.path.relpath(path) except ValueError: pass return path def process_mel_chunks(mel, fps): mel_step_size = 16 mel_idx_multiplier = 80./fps mel_chunks = [] # Asegurar que mel tenga la forma correcta (80, T) if len(mel.shape) == 2 and mel.shape[0] == 80: mel = mel elif len(mel.shape) == 2 and mel.shape[1] == 80: mel = mel.T else: raise ValueError(f'Forma del mel-spectrogram incorrecta: {mel.shape}. Debe ser (80, T) o (T, 80)') # Procesar chunks i = 0 while True: start_idx = int(i * mel_idx_multiplier) if start_idx + mel_step_size > len(mel[0]): # Padding para el último chunk si es necesario last_chunk = np.zeros((80, mel_step_size)) remaining = len(mel[0]) - start_idx if remaining > 0: last_chunk[:, :remaining] = mel[:, start_idx:] mel_chunks.append(last_chunk) break current_chunk = mel[:, start_idx:start_idx + mel_step_size] mel_chunks.append(current_chunk) i += 1 # Convertir a numpy array y verificar forma mel_chunks = np.array(mel_chunks) if len(mel_chunks.shape) != 3 or mel_chunks.shape[1] != 80 or mel_chunks.shape[2] != mel_step_size: raise ValueError(f'Forma de mel_chunks incorrecta: {mel_chunks.shape}. Debe ser (N, 80, 16)') return mel_chunks def main(checkpoint_path, face, audio_path, outfile, static=False, fps=25, pads=[0, 10, 0, 0], face_det_batch_size=16, wav2lip_batch_size=128, resize_factor=1, crop=[0, -1, 0, -1], box=[-1, -1, -1, -1], rotate=False, nosmooth=False, img_size=96): # Declarar variables globales al inicio global device update_progress(0, "Iniciando procesamiento...") try: checkpoint_path = os.path.abspath(checkpoint_path) face = os.path.abspath(face) audio_path = os.path.abspath(audio_path) outfile = os.path.abspath(outfile) os.makedirs(os.path.dirname(outfile), exist_ok=True) os.makedirs('temp', exist_ok=True) if not os.path.isfile(face): raise ValueError(f'Archivo de imagen/video no encontrado: {face}') update_progress(10, "Cargando modelo...") model = load_model(checkpoint_path) model = model.to(device) update_progress(20, "Cargando imagen/video...") if face.split('.')[-1].lower() in ['jpg', 'png', 'jpeg']: try: frame = load_image(face) if frame is None: raise ValueError(f'No se pudo cargar la imagen: {face}') full_frames = [frame] logger.info(f"Imagen cargada exitosamente: {face}") except Exception as e: raise ValueError(f'Error al cargar la imagen: {str(e)}') else: video_stream = cv2.VideoCapture(face) fps = video_stream.get(cv2.CAP_PROP_FPS) logger.info('Leyendo frames del video...') full_frames = [] try: while 1: still_reading, frame = video_stream.read() if not still_reading: video_stream.release() break if frame is not None: full_frames.append(frame) except Exception as e: video_stream.release() raise ValueError(f'Error al leer el video: {str(e)}') update_progress(30, "Procesando audio...") if not audio_path.endswith('.wav'): logger.info('Extrayendo audio raw...') temp_wav = os.path.join('temp', 'temp.wav') command = f'ffmpeg -y -i "{audio_path}" -strict -2 "{temp_wav}"' subprocess.call(command, shell=True) audio_path = temp_wav wav = audio.load_wav(audio_path, 16000) mel = audio.melspectrogram(wav) if np.isnan(mel.reshape(-1)).sum() > 0: raise ValueError('Mel contiene NaN. Si estás usando voz TTS, agrega un pequeño ruido epsilon al archivo wav') mel_chunks = process_mel_chunks(mel, fps) logger.info(f"Número de chunks mel: {len(mel_chunks)}, Forma: {mel_chunks.shape}") if len(full_frames) > len(mel_chunks): full_frames = full_frames[:len(mel_chunks)] update_progress(50, "Detectando rostros...") if box[0] == -1: face_det_results = face_detect(full_frames.copy()) else: logger.info('Usando el bounding box especificado en lugar de detección facial...') y1, y2, x1, x2 = box face_det_results = [[f[y1:y2, x1:x2], (y1, y2, x1, x2)] for f in full_frames] batch_size = wav2lip_batch_size gen = datagen(full_frames.copy(), mel_chunks) update_progress(70, "Generando animación...") os.makedirs('temp', exist_ok=True) for i, (img_batch, mel_batch, frames, coords) in enumerate(tqdm(gen)): if i == 0: frame_h, frame_w = full_frames[0].shape[:-1] out = cv2.VideoWriter('temp/result.avi', cv2.VideoWriter_fourcc(*'DIVX'), fps, (frame_w, frame_h)) img_batch = torch.FloatTensor(np.transpose(img_batch, (0, 3, 1, 2))).to(device) mel_batch = torch.FloatTensor(np.transpose(mel_batch, (0, 3, 1, 2))).to(device) with torch.no_grad(): pred = model(mel_batch, img_batch) pred = pred.cpu().numpy().transpose(0, 2, 3, 1) * 255. for p, f, c in zip(pred, frames, coords): y1, y2, x1, x2 = c p = cv2.resize(p.astype(np.uint8), (x2 - x1, y2 - y1)) f[y1:y2, x1:x2] = p out.write(f) out.release() update_progress(90, "Combinando audio y video...") command = f'ffmpeg -y -i "{audio_path}" -i temp/result.avi -strict -2 -q:v 1 "{outfile}"' subprocess.call(command, shell=platform.system() != 'Windows') update_progress(100, "Proceso completado") return True except Exception as e: logger.error(f"Error en main: {str(e)}") update_progress(-1, f"Error: {str(e)}") raise def get_smoothened_boxes(boxes, T): for i in range(len(boxes)): if i + T > len(boxes): window = boxes[len(boxes) - T:] else: window = boxes[i : i + T] boxes[i] = np.mean(window, axis=0) return boxes def face_detect(images): global device, face_det_batch_size, pads try: detector = face_detection.FaceAlignment(face_detection.LandmarksType._2D, device=device) batch_size = face_det_batch_size while 1: predictions = [] try: for i in tqdm(range(0, len(images), batch_size)): batch = images[i:i + batch_size] if len(batch) == 0: continue # Preprocesar imágenes para mejor detección processed_batch = [] for img in batch: # Convertir a escala de grises para mejor detección gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Ecualización de histograma adaptativo clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) gray = clahe.apply(gray) # Convertir de vuelta a BGR enhanced = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) processed_batch.append(enhanced) predictions.extend(detector.get_detections_for_batch(np.array(processed_batch))) except RuntimeError as e: if batch_size == 1: logger.error(f"Error de memoria en GPU: {str(e)}") raise RuntimeError('Imagen demasiado grande para detección facial en GPU. Por favor usa --resize_factor') batch_size //= 2 logger.info(f'Recuperándose de error OOM; Nuevo batch size: {batch_size}') continue break results = [] pady1, pady2, padx1, padx2 = pads for rect, image in zip(predictions, images): if rect is None: height, width = image.shape[:2] margin_x = int(width * 0.2) # Aumentado de 0.1 a 0.2 margin_y = int(height * 0.2) # Aumentado de 0.1 a 0.2 rect = [margin_x, margin_y, width-margin_x, height-margin_y] logger.warning("No se detectó cara, usando recuadro predeterminado") # Añadir padding extra para incluir más contexto facial y1 = max(0, rect[1] - pady1 - int(rect[3] * 0.1)) # 10% extra arriba y2 = min(image.shape[0], rect[3] + pady2 + int(rect[3] * 0.1)) # 10% extra abajo x1 = max(0, rect[0] - padx1 - int(rect[2] * 0.05)) # 5% extra a los lados x2 = min(image.shape[1], rect[2] + padx2 + int(rect[2] * 0.05)) results.append([x1, y1, x2, y2]) boxes = np.array(results) if not nosmooth: # Aumentar la ventana de suavizado temporal boxes = get_smoothened_boxes(boxes, T=7) # Aumentado de 5 a 7 # Aplicar suavizado adicional a la región de la mandíbula jaw_boxes = boxes.copy() jaw_boxes[:, 1] = boxes[:, 3] - (boxes[:, 3] - boxes[:, 1]) * 0.3 # Solo la parte inferior jaw_boxes = get_smoothened_boxes(jaw_boxes, T=9) # Suavizado extra para la mandíbula # Combinar los resultados boxes[:, 1] = jaw_boxes[:, 1] # Usar el suavizado de mandíbula para la parte inferior results = [[image[y1:y2, x1:x2], (y1, y2, x1, x2)] for image, (x1, y1, x2, y2) in zip(images, boxes)] del detector return results except Exception as e: logger.error(f"Error en face_detect: {str(e)}") results = [] for image in images: height, width = image.shape[:2] margin_x = int(width * 0.2) margin_y = int(height * 0.2) x1, y1, x2, y2 = margin_x, margin_y, width-margin_x, height-margin_y results.append([image[y1:y2, x1:x2], (y1, y2, x1, x2)]) return results def datagen(frames, mels): img_batch, mel_batch, frame_batch, coords_batch = [], [], [], [] if box[0] == -1: if not static: face_det_results = face_detect(frames) else: face_det_results = face_detect([frames[0]]) else: logger.info('Usando el bounding box especificado...') y1, y2, x1, x2 = box face_det_results = [[f[y1:y2, x1:x2], (y1, y2, x1, x2)] for f in frames] for i, m in enumerate(mels): idx = 0 if static else i % len(frames) frame_to_save = frames[idx].copy() face, coords = face_det_results[idx].copy() # Preprocesamiento mejorado de la imagen facial if face.shape[0] < img_size or face.shape[1] < img_size: # Usar interpolación Lanczos4 para mejor calidad en escalado hacia arriba face = cv2.resize(face, (img_size, img_size), interpolation=cv2.INTER_LANCZOS4) else: # Usar área para mejor calidad en escalado hacia abajo face = cv2.resize(face, (img_size, img_size), interpolation=cv2.INTER_AREA) # Mejorar el contraste y nitidez lab = cv2.cvtColor(face, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) l = clahe.apply(l) lab = cv2.merge((l,a,b)) face = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) img_batch.append(face) mel_batch.append(m) frame_batch.append(frame_to_save) coords_batch.append(coords) if len(img_batch) >= wav2lip_batch_size: img_batch, mel_batch = np.asarray(img_batch), np.asarray(mel_batch) img_masked = img_batch.copy() img_masked[:, img_size//2:] = 0 img_batch = np.concatenate((img_masked, img_batch), axis=3) / 255. mel_batch = np.expand_dims(mel_batch, axis=3) yield img_batch, mel_batch, frame_batch, coords_batch img_batch, mel_batch, frame_batch, coords_batch = [], [], [], [] if len(img_batch) > 0: img_batch, mel_batch = np.asarray(img_batch), np.asarray(mel_batch) img_masked = img_batch.copy() img_masked[:, img_size//2:] = 0 img_batch = np.concatenate((img_masked, img_batch), axis=3) / 255. mel_batch = np.expand_dims(mel_batch, axis=3) yield img_batch, mel_batch, frame_batch, coords_batch def load_model(checkpoint_path): model = Wav2Lip() logger.info(f'Cargando modelo desde {checkpoint_path}') checkpoint = torch.load(checkpoint_path, map_location=device) if "state_dict" in checkpoint: checkpoint = checkpoint["state_dict"] # Eliminar prefijo 'module.' si existe new_state_dict = {} for k, v in checkpoint.items(): name = k.replace('module.', '') new_state_dict[name] = v # Intentar cargar el modelo try: model.load_state_dict(new_state_dict, strict=False) logger.info("Modelo cargado exitosamente") except Exception as e: logger.error(f"Error al cargar el modelo: {str(e)}") # Analizar diferencias model_keys = set(model.state_dict().keys()) checkpoint_keys = set(new_state_dict.keys()) logger.info("Claves faltantes en el checkpoint:") for k in model_keys - checkpoint_keys: logger.info(f"- {k}") logger.info("Claves inesperadas en el checkpoint:") for k in checkpoint_keys - model_keys: logger.info(f"- {k}") raise model.eval() return model def process_face_detection(frame, fa): try: height, width = frame.shape[:2] min_size = 256 if width < min_size or height < min_size: scale = max(min_size/width, min_size/height) new_width = int(width * scale) new_height = int(height * scale) frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) if frame.shape[-1] == 4: alpha = frame[:,:,3] frame_rgb = frame[:,:,:3] white_background = np.ones_like(frame_rgb) * 255 alpha_3d = np.stack([alpha, alpha, alpha], axis=-1) / 255.0 frame = (frame_rgb * alpha_3d + white_background * (1 - alpha_3d)).astype(np.uint8) bbox = fa.detect_faces(frame) if bbox is None or len(bbox) == 0: margin_x = int(width * 0.2) margin_y = int(height * 0.2) bbox = np.array([margin_x, margin_y, width-margin_x, height-margin_y, 0.99], dtype=np.int32) bbox = np.array(bbox[:4], dtype=np.int32) bbox[0] = np.clip(bbox[0], 0, width - 1) bbox[1] = np.clip(bbox[1], 0, height - 1) bbox[2] = np.clip(bbox[2], 1, width) bbox[3] = np.clip(bbox[3], 1, height) face_width = bbox[2] - bbox[0] face_height = bbox[3] - bbox[1] if face_width < min_size or face_height < min_size: padding_x = (min_size - face_width) // 2 if face_width < min_size else 0 padding_y = (min_size - face_height) // 2 if face_height < min_size else 0 bbox[0] = max(0, bbox[0] - padding_x) bbox[2] = min(width, bbox[2] + padding_x) bbox[1] = max(0, bbox[1] - padding_y) bbox[3] = min(height, bbox[3] + padding_y) frame_with_face = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]] if width < min_size or height < min_size: frame_with_face = cv2.resize(frame_with_face, (face_width//scale, face_height//scale), interpolation=cv2.INTER_LANCZOS4) bbox = (bbox / scale).astype(np.int32) return frame_with_face, bbox except Exception as e: logger.error(f"Error en process_face_detection: {str(e)}") margin_x = int(width * 0.2) margin_y = int(height * 0.2) bbox = np.array([margin_x, margin_y, width-margin_x, height-margin_y], dtype=np.int32) frame_with_face = frame[bbox[1]:bbox[3], bbox[0]:bbox[2]] return frame_with_face, bbox def update_progress(progress, status=""): try: progress_file = os.path.join('wav2lip_temp', 'progress.txt') os.makedirs(os.path.dirname(progress_file), exist_ok=True) with open(progress_file, 'w', encoding='utf-8') as f: json.dump({ 'progress': progress, 'status': status }, f) except Exception as e: logger.error(f"Error al actualizar progreso: {str(e)}") if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Inference code to lip-sync videos in the wild using Wav2Lip models') parser.add_argument('--checkpoint_path', type=str, help='Ruta completa del checkpoint del modelo Wav2Lip') parser.add_argument('--face', type=str, help='Archivo de video/imagen de entrada') parser.add_argument('--audio', type=str, help='Archivo de audio de entrada') parser.add_argument('--outfile', type=str, default='results/result_voice.mp4', help='Ruta del video de salida') parser.add_argument('--static', action='store_true', help='Si el video de entrada es una imagen estática') parser.add_argument('--fps', type=float, default=25.0, help='FPS del video de salida') parser.add_argument('--pads', nargs='+', type=int, default=[0, 10, 0, 0], help='Padding para la cara detectada') parser.add_argument('--face_det_batch_size', type=int, default=16, help='Tamaño del batch para detección facial') parser.add_argument('--wav2lip_batch_size', type=int, default=128, help='Tamaño del batch para Wav2Lip') parser.add_argument('--resize_factor', default=1, type=int, help='Factor de redimensionamiento del video') parser.add_argument('--crop', nargs='+', type=int, default=[0, -1, 0, -1], help='Recortar video de entrada') parser.add_argument('--box', nargs='+', type=int, default=[-1, -1, -1, -1], help='Coordenadas del cuadro delimitador') parser.add_argument('--rotate', action='store_true', help='Rotar 90 grados') parser.add_argument('--nosmooth', action='store_true', help='Desactivar suavizado temporal') parser.add_argument('--cpu', action='store_true', help='Forzar uso de CPU') mel_step_size = 16 args = parser.parse_args() if args.pads: args.pads = [int(x) for x in args.pads] if args.crop: args.crop = [int(x) for x in args.crop] if args.box: args.box = [int(x) for x in args.box] main(checkpoint_path=args.checkpoint_path, face=args.face, audio_path=args.audio, outfile=args.outfile, static=args.static, fps=args.fps)