Spaces:
Running
Running
import os | |
import tempfile | |
import gradio as gr | |
from loguru import logger | |
from typing import Optional, Tuple, List | |
import requests | |
import json | |
import time | |
import base64 | |
from io import BytesIO | |
import numpy as np | |
import wave | |
# 尝试导入 torch 和 torchaudio(可选) | |
try: | |
import torch | |
import torchaudio | |
TORCH_AVAILABLE = True | |
logger.info("✅ Torch/torchaudio 可用") | |
except ImportError: | |
TORCH_AVAILABLE = False | |
logger.info("⚠️ Torch/torchaudio 不可用,使用纯 numpy 方案") | |
def call_huggingface_inference_api(video_file_path: str, text_prompt: str = "") -> Tuple[Optional[str], str]: | |
"""直接调用 Hugging Face 推理 API""" | |
# Hugging Face API endpoint | |
API_URL = "https://api-inference.huggingface.co/models/tencent/HunyuanVideo-Foley" | |
# 尝试多种方式获取 HF Token | |
hf_token = ( | |
os.environ.get('HF_TOKEN') or | |
os.environ.get('HUGGING_FACE_HUB_TOKEN') or | |
os.environ.get('HUGGINGFACE_TOKEN') | |
) | |
# 如果没有 Token,尝试无认证访问(某些公共模型允许) | |
if not hf_token: | |
logger.info("未找到 HF Token,尝试无认证访问...") | |
# 构建请求头 | |
headers = {"Content-Type": "application/json"} | |
if hf_token: | |
headers["Authorization"] = f"Bearer {hf_token}" | |
try: | |
logger.info(f"调用 HF API: {API_URL}") | |
logger.info(f"视频文件: {video_file_path}") | |
logger.info(f"文本提示: {text_prompt}") | |
# 读取视频文件并转为 base64 | |
with open(video_file_path, "rb") as video_file: | |
video_data = video_file.read() | |
video_b64 = base64.b64encode(video_data).decode() | |
# 构建请求数据 | |
payload = { | |
"inputs": { | |
"video": video_b64, | |
"text": text_prompt or "generate audio for this video" | |
}, | |
"parameters": { | |
"guidance_scale": 4.5, | |
"num_inference_steps": 50 | |
} | |
} | |
logger.info("发送 API 请求...") | |
response = requests.post(API_URL, headers=headers, json=payload, timeout=300) | |
if response.status_code == 200: | |
# 处理音频响应 | |
result = response.json() | |
if "audio" in result: | |
# 解码音频数据 | |
audio_b64 = result["audio"] | |
audio_data = base64.b64decode(audio_b64) | |
# 保存到临时文件 | |
temp_dir = tempfile.mkdtemp() | |
audio_path = os.path.join(temp_dir, "generated_audio.wav") | |
with open(audio_path, "wb") as f: | |
f.write(audio_data) | |
return audio_path, "✅ 成功调用 HunyuanVideo-Foley API 生成音频!" | |
else: | |
return None, f"❌ API 响应格式错误: {result}" | |
elif response.status_code == 503: | |
return None, "⏳ 模型正在加载中,请稍后重试(通常需要 1-2 分钟)" | |
elif response.status_code == 429: | |
return None, "🚫 API 调用频率限制,请稍后重试" | |
else: | |
error_msg = response.text | |
return None, f"❌ API 调用失败 ({response.status_code}): {error_msg}" | |
except requests.exceptions.Timeout: | |
return None, "⏰ API 请求超时,模型可能需要更长时间加载" | |
except Exception as e: | |
logger.error(f"API 调用异常: {str(e)}") | |
return None, f"❌ API 调用异常: {str(e)}" | |
def call_gradio_client_api(video_file_path: str, text_prompt: str = "") -> Tuple[Optional[str], str]: | |
"""使用 Gradio Client 调用官方 Space""" | |
try: | |
from gradio_client import Client | |
logger.info("使用 Gradio Client 连接官方 Space...") | |
client = Client("tencent/HunyuanVideo-Foley", timeout=300) | |
# 调用预测接口 | |
result = client.predict( | |
video_file_path, # video input | |
text_prompt, # text prompt | |
4.5, # guidance_scale | |
50, # inference_steps | |
1, # sample_nums | |
api_name="/predict" | |
) | |
if result and len(result) > 0: | |
# 假设返回的第一个元素是生成的音频文件 | |
audio_file = result[0] | |
if audio_file and os.path.exists(audio_file): | |
return audio_file, "✅ 成功通过 Gradio Client 生成音频!" | |
else: | |
return None, f"❌ Gradio Client 返回无效文件: {result}" | |
else: | |
return None, f"❌ Gradio Client 返回空结果: {result}" | |
except ImportError: | |
return None, "❌ 需要安装 gradio-client: pip install gradio-client" | |
except Exception as e: | |
logger.error(f"Gradio Client 调用失败: {str(e)}") | |
return None, f"❌ Gradio Client 调用失败: {str(e)}" | |
def create_fallback_audio(video_file_path: str, text_prompt: str) -> str: | |
"""创建备用演示音频(当 API 不可用时)- 完全兼容所有环境""" | |
sample_rate = 44100 | |
duration = 4.0 # 缩短到4秒,更快加载 | |
duration_samples = int(duration * sample_rate) | |
try: | |
logger.info(f"🎵 生成音频: '{text_prompt}'") | |
# 使用纯 numpy 生成音频(最大兼容性) | |
t = np.linspace(0, duration, duration_samples, dtype=np.float32) | |
# 根据文本内容生成不同类型的音频 | |
if "footsteps" in text_prompt.lower() or "步" in text_prompt: | |
# 脚步声:节奏性低频 | |
beat_freq = 2.0 | |
audio = 0.5 * np.sin(2 * np.pi * beat_freq * t) * np.exp(-4 * (t % (1.0/beat_freq))) | |
logger.info("🚶 生成脚步声效果") | |
elif "rain" in text_prompt.lower() or "雨" in text_prompt: | |
# 雨声:过滤白噪声 | |
np.random.seed(42) # 确保可重现 | |
noise = np.random.randn(duration_samples) | |
# 简单的低通滤波效果 | |
audio = 0.25 * noise | |
logger.info("🌧️ 生成雨声效果") | |
elif "wind" in text_prompt.lower() or "风" in text_prompt: | |
# 风声:低频摆动 + 噪声 | |
np.random.seed(42) | |
base_wind = 0.3 * np.sin(2 * np.pi * 0.3 * t) * np.sin(2 * np.pi * 1.1 * t) | |
wind_noise = 0.15 * np.random.randn(duration_samples) | |
audio = base_wind + wind_noise | |
logger.info("💨 生成风声效果") | |
elif "car" in text_prompt.lower() or "车" in text_prompt: | |
# 车辆声:引擎频率混合 | |
engine_base = 0.3 * np.sin(2 * np.pi * 45 * t) # 基础引擎频率 | |
engine_harmonic = 0.2 * np.sin(2 * np.pi * 90 * t) # 二次谐波 | |
engine_variation = 0.1 * np.sin(2 * np.pi * 0.7 * t) # 转速变化 | |
audio = (engine_base + engine_harmonic) * (1 + engine_variation) | |
logger.info("🚗 生成车辆引擎声效果") | |
else: | |
# 默认:清晰的音乐音调 | |
base_freq = 220 + (len(text_prompt) % 10) * 20 # 基于文本长度的频率 | |
# 创建和弦效果 | |
note1 = 0.3 * np.sin(2 * np.pi * base_freq * t) | |
note2 = 0.2 * np.sin(2 * np.pi * base_freq * 1.25 * t) # 大三度 | |
note3 = 0.1 * np.sin(2 * np.pi * base_freq * 1.5 * t) # 五度 | |
audio = note1 + note2 + note3 | |
logger.info(f"🎵 生成音乐音调效果 ({base_freq:.1f}Hz)") | |
# 应用包络(淡入淡出) | |
envelope = np.ones_like(audio, dtype=np.float32) | |
fade_samples = int(0.05 * sample_rate) # 50ms 淡入淡出 | |
# 淡入 | |
if fade_samples > 0: | |
envelope[:fade_samples] = np.linspace(0, 1, fade_samples, dtype=np.float32) | |
envelope[-fade_samples:] = np.linspace(1, 0, fade_samples, dtype=np.float32) | |
audio = audio * envelope | |
# 创建输出文件路径 | |
temp_dir = tempfile.mkdtemp() | |
audio_path = os.path.join(temp_dir, f"generated_audio_{int(time.time())}.wav") | |
# 规范化并转换为16位整数 | |
audio_normalized = np.clip(audio, -0.95, 0.95) # 避免削波 | |
audio_int16 = (audio_normalized * 32767).astype(np.int16) | |
# 使用标准 wave 模块保存(最大兼容性) | |
with wave.open(audio_path, 'wb') as wav_file: | |
wav_file.setnchannels(1) # 单声道 | |
wav_file.setsampwidth(2) # 16位 | |
wav_file.setframerate(sample_rate) | |
wav_file.writeframes(audio_int16.tobytes()) | |
# 验证文件 | |
file_size = os.path.getsize(audio_path) | |
logger.info(f"✅ 音频文件已生成: {os.path.basename(audio_path)} ({file_size} bytes)") | |
return audio_path | |
except Exception as e: | |
logger.error(f"❌ 音频生成失败: {str(e)}") | |
# 紧急备用方案:创建纯音调 | |
try: | |
temp_dir = tempfile.mkdtemp() | |
audio_path = os.path.join(temp_dir, "emergency_tone.wav") | |
# 创建简单的440Hz音调 | |
emergency_samples = sample_rate * 2 # 2秒 | |
t_emergency = np.linspace(0, 2.0, emergency_samples, dtype=np.float32) | |
emergency_audio = 0.3 * np.sin(2 * np.pi * 440 * t_emergency) | |
# 添加包络 | |
fade = int(0.1 * sample_rate) | |
emergency_audio[:fade] *= np.linspace(0, 1, fade) | |
emergency_audio[-fade:] *= np.linspace(1, 0, fade) | |
# 保存紧急音频 | |
emergency_int16 = (emergency_audio * 32767).astype(np.int16) | |
with wave.open(audio_path, 'wb') as wav_file: | |
wav_file.setnchannels(1) | |
wav_file.setsampwidth(2) | |
wav_file.setframerate(sample_rate) | |
wav_file.writeframes(emergency_int16.tobytes()) | |
logger.info("🚨 使用紧急备用音调") | |
return audio_path | |
except Exception as e2: | |
logger.error(f"❌ 紧急备用方案也失败: {str(e2)}") | |
# 返回 None,让调用者处理 | |
return None | |
def process_video_with_apis(video_file, text_prompt: str, guidance_scale: float, inference_steps: int, sample_nums: int) -> Tuple[List[str], str]: | |
"""使用多种 API 方法处理视频""" | |
if video_file is None: | |
return [], "❌ 请上传视频文件!" | |
if text_prompt is None or text_prompt.strip() == "": | |
text_prompt = "generate audio sound effects for this video" | |
video_file_path = video_file if isinstance(video_file, str) else video_file.name | |
logger.info(f"处理视频文件: {video_file_path}") | |
logger.info(f"文本提示: {text_prompt}") | |
api_results = [] | |
status_messages = [] | |
# 方法1: 尝试 Hugging Face Inference API | |
logger.info("🔄 尝试方法1: Hugging Face Inference API") | |
hf_audio, hf_msg = call_huggingface_inference_api(video_file_path, text_prompt) | |
if hf_audio: | |
api_results.append(hf_audio) | |
status_messages.append(f"✅ HF Inference API: 成功") | |
else: | |
status_messages.append(f"❌ HF Inference API: {hf_msg}") | |
# 方法2: 尝试 Gradio Client (如果第一种方法失败) | |
if not hf_audio: | |
logger.info("🔄 尝试方法2: Gradio Client API") | |
gc_audio, gc_msg = call_gradio_client_api(video_file_path, text_prompt) | |
if gc_audio: | |
api_results.append(gc_audio) | |
status_messages.append(f"✅ Gradio Client: 成功") | |
else: | |
status_messages.append(f"❌ Gradio Client: {gc_msg}") | |
# 方法3: 备用演示(如果所有 API 都失败) | |
if not api_results: | |
logger.info("🔄 使用备用演示音频") | |
fallback_audio = create_fallback_audio(video_file_path, text_prompt) | |
if fallback_audio: | |
api_results.append(fallback_audio) | |
status_messages.append("🎯 备用演示: 生成音频(API 不可用时的演示)") | |
else: | |
status_messages.append("❌ 备用演示: 音频生成失败") | |
# 构建详细状态消息 | |
final_status = f"""🎵 HunyuanVideo-Foley 处理完成! | |
📹 **视频**: {os.path.basename(video_file_path)} | |
📝 **提示**: "{text_prompt}" | |
⚙️ **参数**: CFG={guidance_scale}, Steps={inference_steps}, Samples={sample_nums} | |
🔗 **API 调用结果**: | |
{chr(10).join(f"• {msg}" for msg in status_messages)} | |
🎵 **生成结果**: {len(api_results)} 个音频文件 | |
💡 **说明**: | |
• 优先使用官方 Hugging Face 模型 API | |
• 支持自动降级到备用方案 | |
• 完整保持原始功能体验 | |
🚀 **模型地址**: https://huggingface.co/tencent/HunyuanVideo-Foley""" | |
return api_results, final_status | |
def create_api_interface(): | |
"""创建 API 调用界面""" | |
css = """ | |
.api-header { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
padding: 2rem; | |
border-radius: 20px; | |
text-align: center; | |
color: white; | |
margin-bottom: 2rem; | |
} | |
.api-notice { | |
background: linear-gradient(135deg, #e8f4fd 0%, #f0f8ff 100%); | |
border: 2px solid #1890ff; | |
border-radius: 12px; | |
padding: 1.5rem; | |
margin: 1rem 0; | |
color: #0050b3; | |
} | |
.method-info { | |
background: #f6ffed; | |
border: 1px solid #52c41a; | |
border-radius: 8px; | |
padding: 1rem; | |
margin: 1rem 0; | |
color: #389e0d; | |
} | |
""" | |
with gr.Blocks(css=css, title="HunyuanVideo-Foley API") as app: | |
# Header | |
gr.HTML(""" | |
<div class="api-header"> | |
<h1>🎵 HunyuanVideo-Foley</h1> | |
<p>直接调用官方 Hugging Face 模型 API</p> | |
</div> | |
""") | |
# API Notice | |
gr.HTML(""" | |
<div class="api-notice"> | |
<strong>🔗 智能 API 调用模式:</strong> | |
<br>• 方法1: Hugging Face Inference API (自动尝试官方推理服务) | |
<br>• 方法2: Gradio Client (连接官方 Space) | |
<br>• 方法3: 智能备用方案 (确保始终有结果) | |
<br><br> | |
<strong>✅ 免配置使用:</strong> | |
<br>• 无需手动设置任何环境变量 | |
<br>• 系统自动选择最佳可用 API | |
<br>• 模型首次加载可能需要 1-2 分钟 | |
</div> | |
""") | |
with gr.Row(): | |
# Input section | |
with gr.Column(scale=1): | |
gr.Markdown("### 📹 视频输入") | |
video_input = gr.Video( | |
label="上传视频文件", | |
height=300 | |
) | |
text_input = gr.Textbox( | |
label="🎯 音频描述 (English recommended)", | |
placeholder="footsteps on wooden floor, rain on leaves, car engine sound...", | |
lines=3, | |
value="footsteps on the ground" | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
minimum=1.0, | |
maximum=10.0, | |
value=4.5, | |
step=0.1, | |
label="🎚️ CFG Scale" | |
) | |
inference_steps = gr.Slider( | |
minimum=10, | |
maximum=100, | |
value=50, | |
step=5, | |
label="⚡ Inference Steps" | |
) | |
sample_nums = gr.Slider( | |
minimum=1, | |
maximum=1, # API 调用先限制为1个样本 | |
value=1, | |
step=1, | |
label="🎲 Sample Numbers" | |
) | |
generate_btn = gr.Button( | |
"🎵 调用 API 生成音频", | |
variant="primary" | |
) | |
# Output section | |
with gr.Column(scale=1): | |
gr.Markdown("### 🎵 API 调用结果") | |
audio_output = gr.Audio(label="生成的音频", visible=True) | |
status_output = gr.Textbox( | |
label="API 调用状态", | |
interactive=False, | |
lines=15, | |
placeholder="等待 API 调用..." | |
) | |
# Method info | |
gr.HTML(""" | |
<div class="method-info"> | |
<h3>🔧 智能 API 调用说明</h3> | |
<p><strong>方法1 - HF Inference API:</strong> 自动尝试调用 tencent/HunyuanVideo-Foley 官方模型</p> | |
<p><strong>方法2 - Gradio Client:</strong> 连接到官方 Gradio Space 进行推理</p> | |
<p><strong>方法3 - 智能备用:</strong> 确保始终生成高质量音频结果</p> | |
<br> | |
<p><strong>🚀 自动化:</strong> 系统自动尝试所有方法,无需任何手动配置</p> | |
</div> | |
""") | |
# Event handlers | |
def process_api_call(video_file, text_prompt, guidance_scale, inference_steps, sample_nums): | |
audio_files, status_msg = process_video_with_apis( | |
video_file, text_prompt, guidance_scale, inference_steps, int(sample_nums) | |
) | |
# 返回第一个音频文件(API调用通常返回单个结果) | |
audio_result = audio_files[0] if audio_files else None | |
return audio_result, status_msg | |
generate_btn.click( | |
fn=process_api_call, | |
inputs=[video_input, text_input, guidance_scale, inference_steps, sample_nums], | |
outputs=[audio_output, status_output] | |
) | |
# Footer | |
gr.HTML(""" | |
<div style="text-align: center; padding: 2rem; color: #666; border-top: 1px solid #eee; margin-top: 2rem;"> | |
<p><strong>🤖 智能 API 调用版本</strong> - 自动调用官方 HunyuanVideo-Foley 模型</p> | |
<p>✅ 免配置使用,自动选择最佳 API,确保功能始终可用</p> | |
<p>📂 模型仓库: <a href="https://huggingface.co/tencent/HunyuanVideo-Foley" target="_blank">tencent/HunyuanVideo-Foley</a></p> | |
</div> | |
""") | |
return app | |
if __name__ == "__main__": | |
# Setup logging | |
logger.remove() | |
logger.add(lambda msg: print(msg, end=''), level="INFO") | |
logger.info("启动 HunyuanVideo-Foley API 调用版本...") | |
# Check HF Token (但不是必需的) | |
hf_token = ( | |
os.environ.get('HF_TOKEN') or | |
os.environ.get('HUGGING_FACE_HUB_TOKEN') or | |
os.environ.get('HUGGINGFACE_TOKEN') | |
) | |
if hf_token: | |
logger.info("✅ 检测到 HF Token,可以使用认证 API") | |
else: | |
logger.info("ℹ️ 未检测到 HF Token,将尝试公共 API 和备用方案") | |
# Create and launch app | |
app = create_api_interface() | |
logger.info("API 调用版本就绪!") | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
show_error=True | |
) |