Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Madverse Music API | |
AI Music Detection Service | |
""" | |
# Configure numba before any other imports to avoid caching issues | |
import os | |
os.environ.setdefault('NUMBA_DISABLE_JIT', '1') | |
os.environ.setdefault('NUMBA_CACHE_DIR', '/app/.cache/numba') | |
from fastapi import FastAPI, HTTPException, BackgroundTasks, Header, Depends | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from pydantic import BaseModel, HttpUrl | |
import torch | |
import soundfile as sf | |
import scipy.signal | |
import tempfile | |
import requests | |
from pathlib import Path | |
import time | |
from typing import Optional, Annotated, List, Union | |
import uvicorn | |
import asyncio | |
from contextlib import asynccontextmanager | |
import socket | |
import numpy as np | |
# Global model variable | |
model = None | |
async def lifespan(app: FastAPI): | |
"""Application lifespan management""" | |
# Startup | |
global model | |
try: | |
from sonics import HFAudioClassifier | |
print("🔄 Loading Madverse Music AI model...") | |
# Set cache directory to a writable location | |
cache_dir = "/app/.cache" if os.path.exists("/app") else "./cache" | |
os.makedirs(cache_dir, exist_ok=True) | |
# Load model with explicit cache directory | |
model = HFAudioClassifier.from_pretrained( | |
"awsaf49/sonics-spectttra-alpha-120s", | |
cache_dir=cache_dir | |
) | |
model.eval() | |
print("✅ Model loaded successfully!") | |
except Exception as e: | |
print(f"❌ Failed to load model: {e}") | |
import traceback | |
traceback.print_exc() | |
raise | |
yield | |
# Shutdown | |
print("🔄 Shutting down...") | |
# Initialize FastAPI app with lifespan | |
app = FastAPI( | |
title="Madverse Music API", | |
description="AI-powered music detection API to identify AI-generated vs human-created music", | |
version="1.0.0", | |
docs_url="/", | |
redoc_url="/docs", | |
lifespan=lifespan | |
) | |
# API Key Configuration | |
API_KEY = os.getenv("MADVERSE_API_KEY", "madverse-music-api-key-2024") # Default key for demo | |
async def verify_api_key(x_api_key: Annotated[Union[str, None], Header()] = None): | |
"""Verify API key from header""" | |
if x_api_key is None: | |
raise HTTPException( | |
status_code=401, | |
detail="Missing API key. Please provide a valid X-API-Key header." | |
) | |
if x_api_key != API_KEY: | |
raise HTTPException( | |
status_code=401, | |
detail="Invalid API key. Please provide a valid X-API-Key header." | |
) | |
return x_api_key | |
class MusicAnalysisRequest(BaseModel): | |
urls: List[HttpUrl] | |
def check_api_key_first(request: MusicAnalysisRequest, x_api_key: Annotated[Union[str, None], Header()] = None): | |
"""Check API key before processing request""" | |
if x_api_key is None: | |
raise HTTPException( | |
status_code=401, | |
detail="Missing API key. Please provide a valid X-API-Key header." | |
) | |
if x_api_key != API_KEY: | |
raise HTTPException( | |
status_code=401, | |
detail="Invalid API key. Please provide a valid X-API-Key header." | |
) | |
return request | |
class FileAnalysisResult(BaseModel): | |
url: str | |
success: bool | |
classification: Optional[str] = None # "Real" or "Fake" | |
confidence: Optional[float] = None # 0.0 to 1.0 | |
probability: Optional[float] = None # Raw sigmoid probability | |
raw_score: Optional[float] = None # Raw model output | |
duration: Optional[float] = None # Audio duration in seconds | |
message: str | |
processing_time: Optional[float] = None | |
error: Optional[str] = None | |
class MusicAnalysisResponse(BaseModel): | |
success: bool | |
total_files: int | |
successful_analyses: int | |
failed_analyses: int | |
results: List[FileAnalysisResult] | |
total_processing_time: float | |
message: str | |
class ErrorResponse(BaseModel): | |
success: bool | |
error: str | |
message: str | |
def cleanup_file(file_path: str): | |
"""Background task to cleanup temporary files""" | |
try: | |
if os.path.exists(file_path): | |
os.unlink(file_path) | |
except: | |
pass | |
def download_audio(url: str, max_size_mb: int = 100) -> str: | |
"""Download audio file from URL with size validation""" | |
try: | |
print(f"🔽 Downloading audio from: {url}") | |
# Check if URL is accessible | |
response = requests.head(str(url), timeout=10) | |
print(f"📊 Head response status: {response.status_code}") | |
# Check content size | |
content_length = response.headers.get('Content-Length') | |
if content_length: | |
size_mb = int(content_length) / (1024 * 1024) | |
print(f"📏 File size: {size_mb:.2f}MB") | |
if int(content_length) > max_size_mb * 1024 * 1024: | |
raise HTTPException( | |
status_code=413, | |
detail=f"File too large. Maximum size: {max_size_mb}MB" | |
) | |
# Download file | |
print("🔽 Starting download...") | |
response = requests.get(str(url), timeout=30, stream=True) | |
response.raise_for_status() | |
print(f"✅ Download response status: {response.status_code}") | |
# Create temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
downloaded_size = 0 | |
for chunk in response.iter_content(chunk_size=8192): | |
downloaded_size += len(chunk) | |
if downloaded_size > max_size_mb * 1024 * 1024: | |
os.unlink(tmp_file.name) | |
raise HTTPException( | |
status_code=413, | |
detail=f"File too large. Maximum size: {max_size_mb}MB" | |
) | |
tmp_file.write(chunk) | |
print(f"💾 Downloaded {downloaded_size} bytes to {tmp_file.name}") | |
return tmp_file.name | |
except requests.exceptions.RequestException as e: | |
error_msg = f"Failed to download audio: {str(e)}" | |
print(f"❌ Download error: {error_msg}") | |
raise HTTPException( | |
status_code=400, | |
detail=error_msg | |
) | |
except Exception as e: | |
error_msg = f"Error downloading file: {str(e)}" | |
print(f"❌ Unexpected download error: {error_msg}") | |
raise HTTPException( | |
status_code=500, | |
detail=error_msg | |
) | |
def classify_audio(file_path: str) -> dict: | |
"""Classify audio file using the AI model""" | |
try: | |
print(f"🎵 Loading audio file: {file_path}") | |
# Check if file exists | |
if not os.path.exists(file_path): | |
raise ValueError(f"Audio file not found: {file_path}") | |
# Check file size | |
file_size = os.path.getsize(file_path) | |
print(f"📏 Audio file size: {file_size} bytes") | |
if file_size == 0: | |
raise ValueError("Audio file is empty") | |
# Load audio with soundfile | |
print("🔊 Loading audio with soundfile...") | |
audio, sr = sf.read(file_path) | |
print(f"🎼 Audio loaded: {len(audio)} samples at {sr}Hz, duration: {len(audio)/sr:.2f}s") | |
if len(audio) == 0: | |
raise ValueError("Audio file contains no audio data") | |
# Convert to mono if stereo | |
if audio.ndim > 1: | |
print("🔀 Converting stereo to mono...") | |
audio = np.mean(audio, axis=1) | |
# Resample to 16kHz if needed (model requirement) | |
target_sr = 16000 | |
if sr != target_sr: | |
print(f"🔄 Resampling from {sr}Hz to {target_sr}Hz...") | |
# Calculate the number of samples after resampling | |
num_samples = int(len(audio) * target_sr / sr) | |
audio = scipy.signal.resample(audio, num_samples) | |
sr = target_sr | |
print(f"✅ Resampled: {len(audio)} samples at {sr}Hz") | |
# Convert to tensor and add batch dimension | |
print("🧮 Converting to tensor...") | |
audio_tensor = torch.FloatTensor(audio).unsqueeze(0) | |
print(f"📊 Tensor shape: {audio_tensor.shape}") | |
# Get prediction | |
print("🤖 Running model inference...") | |
with torch.no_grad(): | |
output = model(audio_tensor) | |
print(f"📈 Model output: {output}") | |
# Convert logit to probability using sigmoid | |
prob = torch.sigmoid(output).item() | |
print(f"📊 Sigmoid probability: {prob}") | |
# Classify: prob < 0.5 = Real, prob >= 0.5 = Fake | |
if prob < 0.5: | |
classification = "Real" | |
confidence = (1 - prob) * 2 # Convert to 0-1 scale | |
else: | |
classification = "Fake" | |
confidence = (prob - 0.5) * 2 # Convert to 0-1 scale | |
result = { | |
"classification": classification, | |
"confidence": min(confidence, 1.0), # Cap at 1.0 | |
"probability": prob, | |
"raw_score": output.item(), | |
"duration": len(audio) / sr | |
} | |
print(f"✅ Classification result: {result}") | |
return result | |
except Exception as e: | |
error_msg = f"Error analyzing audio: {str(e)}" | |
print(f"❌ Audio analysis error: {error_msg}") | |
import traceback | |
print(f"🔍 Traceback: {traceback.format_exc()}") | |
raise HTTPException( | |
status_code=500, | |
detail=error_msg | |
) | |
async def process_single_url(url: str) -> FileAnalysisResult: | |
"""Process a single URL and return result""" | |
start_time = time.time() | |
temp_file = None | |
try: | |
print(f"🚀 Processing URL: {url}") | |
# Download audio file | |
temp_file = download_audio(url) | |
print(f"✅ Download completed: {temp_file}") | |
# Classify audio | |
result = classify_audio(temp_file) | |
print(f"✅ Classification completed: {result}") | |
# Calculate processing time | |
processing_time = time.time() - start_time | |
# Prepare response | |
emoji = "🎤" if result["classification"] == "Real" else "🤖" | |
message = f'{emoji} Detected as {result["classification"].lower()} music' | |
return FileAnalysisResult( | |
url=str(url), | |
success=True, | |
classification=result["classification"], | |
confidence=result["confidence"], | |
probability=result["probability"], | |
raw_score=result["raw_score"], | |
duration=result["duration"], | |
message=message, | |
processing_time=processing_time | |
) | |
except Exception as e: | |
processing_time = time.time() - start_time | |
error_msg = str(e) | |
print(f"❌ Processing failed for {url}: {error_msg}") | |
import traceback | |
print(f"🔍 Full traceback: {traceback.format_exc()}") | |
return FileAnalysisResult( | |
url=str(url), | |
success=False, | |
message=f"❌ Failed to process: {error_msg}", | |
processing_time=processing_time, | |
error=error_msg | |
) | |
finally: | |
# Cleanup file in background | |
if temp_file: | |
try: | |
print(f"🧹 Cleaning up temporary file: {temp_file}") | |
os.unlink(temp_file) | |
except Exception as cleanup_error: | |
print(f"⚠️ Failed to cleanup {temp_file}: {cleanup_error}") | |
async def analyze_music( | |
request: MusicAnalysisRequest = Depends(check_api_key_first) | |
): | |
""" | |
Analyze music from URL(s) to detect if it's AI-generated or human-created | |
- **urls**: Array of direct URLs to audio files (MP3, WAV, FLAC, M4A, OGG) | |
- Returns classification results for each file | |
- Processes files concurrently for better performance when multiple URLs provided | |
""" | |
start_time = time.time() | |
if not model: | |
raise HTTPException( | |
status_code=503, | |
detail="Model not loaded. Please try again later." | |
) | |
if len(request.urls) > 50: # Limit processing | |
raise HTTPException( | |
status_code=400, | |
detail="Too many URLs. Maximum 50 files per request." | |
) | |
if len(request.urls) == 0: | |
raise HTTPException( | |
status_code=400, | |
detail="At least one URL is required." | |
) | |
try: | |
# Process all URLs concurrently with limited concurrency | |
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent downloads | |
async def process_with_semaphore(url): | |
async with semaphore: | |
return await process_single_url(str(url)) | |
# Create tasks for all URLs | |
tasks = [process_with_semaphore(url) for url in request.urls] | |
# Wait for all tasks to complete | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Process results and handle any exceptions | |
processed_results = [] | |
successful_count = 0 | |
failed_count = 0 | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
# Handle exception case | |
processed_results.append(FileAnalysisResult( | |
url=str(request.urls[i]), | |
success=False, | |
message=f"❌ Processing failed: {str(result)}", | |
error=str(result) | |
)) | |
failed_count += 1 | |
else: | |
processed_results.append(result) | |
if result.success: | |
successful_count += 1 | |
else: | |
failed_count += 1 | |
# Calculate total processing time | |
total_processing_time = time.time() - start_time | |
# Prepare summary message | |
total_files = len(request.urls) | |
if total_files == 1: | |
# Single file message | |
if successful_count == 1: | |
message = processed_results[0].message | |
else: | |
message = processed_results[0].message | |
else: | |
# Multiple files message | |
if successful_count == total_files: | |
message = f"✅ Successfully analyzed all {total_files} files" | |
elif successful_count > 0: | |
message = f"⚠️ Analyzed {successful_count}/{total_files} files successfully" | |
else: | |
message = f"❌ Failed to analyze any files" | |
return MusicAnalysisResponse( | |
success=successful_count > 0, | |
total_files=total_files, | |
successful_analyses=successful_count, | |
failed_analyses=failed_count, | |
results=processed_results, | |
total_processing_time=total_processing_time, | |
message=message | |
) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Internal server error during processing: {str(e)}" | |
) | |
async def health_check(): | |
"""Health check endpoint""" | |
return { | |
"status": "healthy", | |
"model_loaded": model is not None, | |
"service": "Madverse Music API" | |
} | |
async def get_info(): | |
"""Get API information""" | |
return { | |
"name": "Madverse Music API", | |
"version": "1.0.0", | |
"description": "AI-powered music detection to identify AI-generated vs human-created music", | |
"model": "SpecTTTra-α (120s)", | |
"accuracy": { | |
"f1_score": 0.97, | |
"sensitivity": 0.96, | |
"specificity": 0.99 | |
}, | |
"supported_formats": ["MP3", "WAV", "FLAC", "M4A", "OGG"], | |
"max_file_size": "100MB", | |
"max_duration": "120 seconds", | |
"authentication": { | |
"required": True, | |
"type": "API Key", | |
"header": "X-API-Key", | |
"example": "X-API-Key: your-api-key-here" | |
}, | |
"usage": { | |
"curl_example": "curl -X POST 'http://localhost:8000/analyze' -H 'X-API-Key: your-api-key' -H 'Content-Type: application/json' -d '{\"url\":\"https://example.com/song.mp3\"}'" | |
} | |
} | |
def find_available_port(start_port: int = 8000, max_attempts: int = 10) -> int: | |
"""Find an available port starting from start_port""" | |
import random | |
import time | |
# Add some randomization to avoid race conditions | |
time.sleep(random.uniform(0.1, 0.5)) | |
for port in range(start_port, start_port + max_attempts): | |
try: | |
# Try to bind to the port with proper error handling | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind(('0.0.0.0', port)) | |
s.listen(1) | |
print(f"✅ Port {port} is available") | |
return port | |
except OSError as e: | |
print(f"❌ Port {port} is busy: {e}") | |
continue | |
# If no port found, raise an exception | |
raise RuntimeError(f"No available port found in range {start_port}-{start_port + max_attempts - 1}") | |
if __name__ == "__main__": | |
try: | |
# Check if we're in a Hugging Face environment | |
is_hf_space = os.getenv('SPACE_ID') is not None | |
hf_port = os.getenv('PORT') # HF Spaces sets this | |
if is_hf_space and hf_port: | |
# Use HF Spaces assigned port | |
port = int(hf_port) | |
print(f"🤗 Running in Hugging Face Spaces on port {port}") | |
elif is_hf_space: | |
print("🤗 Running in Hugging Face Spaces environment") | |
# Use standard HF Spaces port | |
port = 7860 | |
else: | |
# Find an available port for local development | |
port = find_available_port(8000, 10) | |
print(f"🚀 Starting server on port {port}") | |
# For HF Spaces, don't use the retry logic as it might cause issues | |
if is_hf_space: | |
uvicorn.run(app, host="0.0.0.0", port=port) | |
else: | |
# Add retry logic for local development | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
uvicorn.run(app, host="0.0.0.0", port=port) | |
break # If successful, break out of retry loop | |
except OSError as e: | |
if "Address already in use" in str(e) and attempt < max_retries - 1: | |
print(f"⚠️ Port {port} became busy, trying next port...") | |
port = find_available_port(port + 1, 10) | |
print(f"🔄 Retrying on port {port}") | |
else: | |
raise | |
except RuntimeError as e: | |
print(f"❌ {e}") | |
print("💡 Suggestions:") | |
print(" 1. Wait a moment and try again (another instance might be shutting down)") | |
print(" 2. Manually specify a different port:") | |
print(" uvicorn app:app --host 0.0.0.0 --port 8001") | |
print(" 3. Check for running processes: ps aux | grep python") | |
except KeyboardInterrupt: | |
print("\n🛑 Server stopped by user") | |
except Exception as e: | |
print(f"❌ Failed to start server: {e}") | |
import traceback | |
traceback.print_exc() |