import gradio as gr import subprocess import datetime import tempfile import requests import os import time from loguru import logger # Load API keys from environment variables API_URL = os.getenv("API_URL") SIEVE_API_KEY = os.getenv("SIEVE_API_KEY") SIEVE_API_URL = "https://mango.sievedata.com/v2" headers = { "Accept": "application/json", "Content-Type": "audio/flac" } def format_time(seconds): """Convert seconds to SRT time format (HH:MM:SS,mmm). Args: seconds (float): Time in seconds to convert. Returns: str: Time formatted as HH:MM:SS,mmm where: - HH: Hours (00-99) - MM: Minutes (00-59) - SS: Seconds (00-59) - mmm: Milliseconds (000-999) Example: >>> format_time(3661.5) '01:01:01,500' """ td = datetime.timedelta(seconds=float(seconds)) hours = td.seconds // 3600 minutes = (td.seconds % 3600) // 60 seconds = td.seconds % 60 milliseconds = td.microseconds // 1000 return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def generate_srt(chunks): """Generate SRT format subtitles from transcription chunks. Args: chunks (list): List of dictionaries containing transcription chunks. Each chunk must have: - "timestamp": List of [start_time, end_time] in seconds - "text": The transcribed text for that time segment Returns: str: SRT formatted subtitles string with format: ``` 1 HH:MM:SS,mmm --> HH:MM:SS,mmm Text content 2 HH:MM:SS,mmm --> HH:MM:SS,mmm Text content ... ``` Example: >>> chunks = [ ... {"timestamp": [0.0, 1.5], "text": "Hello"}, ... {"timestamp": [1.5, 3.0], "text": "World"} ... ] >>> generate_srt(chunks) '1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n' """ srt_content = [] for i, chunk in enumerate(chunks, 1): start_time = format_time(chunk["timestamp"][0]) end_time = format_time(chunk["timestamp"][1]) text = chunk.get("text", "").strip() srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") return "".join(srt_content) def save_srt_to_file(srt_content): """Save SRT content to a temporary file. Args: srt_content (str): The SRT formatted subtitles content to save. Returns: str or None: Path to the temporary file if content was saved, None if srt_content was empty. Note: The temporary file is created with delete=False to allow it to be used after the function returns. The file should be deleted by the caller when no longer needed. """ if not srt_content: return None # Create a temporary file with .srt extension temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) temp_file.write(srt_content.encode('utf-8')) temp_file.close() return temp_file.name # Check if ffmpeg is installed def check_ffmpeg(): try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) logger.info("ffmpeg check passed successfully") except (subprocess.CalledProcessError, FileNotFoundError) as e: logger.error(f"ffmpeg check failed: {str(e)}") raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") # Initialize ffmpeg check check_ffmpeg() def download_youtube_audio(url): """Download audio from YouTube using Sieve API. Args: url (str): YouTube video URL Returns: str: Path to downloaded audio file Raises: gr.Error: If download fails or API key is not set """ if not SIEVE_API_KEY: raise gr.Error("SIEVE_API_KEY environment variable is not set") try: # Create a temporary file for the audio temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) temp_file.close() output_path = temp_file.name # Prepare the request to Sieve API payload = { "function": "sieve/youtube-downloader", "inputs": { "url": url, "download_type": "audio", "audio_format": "mp3", "include_metadata": False, "include_subtitles": False } } # Send request to Sieve API response = requests.post( f"{SIEVE_API_URL}/push", headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"}, json=payload ) response.raise_for_status() job_id = response.json().get("id") if not job_id: raise gr.Error("Failed to get job ID from Sieve API") # Poll for job completion while True: job_response = requests.get( f"{SIEVE_API_URL}/jobs/{job_id}", headers={"X-API-Key": SIEVE_API_KEY} ) job_response.raise_for_status() job_data = job_response.json() if job_data.get("status") == "completed": # Download the audio file audio_url = job_data.get("output_0", {}).get("url") if not audio_url: raise gr.Error("No audio URL in job response") audio_response = requests.get(audio_url) audio_response.raise_for_status() with open(output_path, "wb") as f: f.write(audio_response.content) return output_path elif job_data.get("status") == "failed": raise gr.Error(f"Job failed: {job_data.get('error', 'Unknown error')}") # Wait before polling again time.sleep(2) except Exception as e: logger.exception(f"Error downloading YouTube audio: {str(e)}") raise gr.Error(f"Failed to download YouTube audio: {str(e)}") def transcribe_youtube(url, return_timestamps, generate_subs): """Transcribe audio from YouTube video. Args: url (str): YouTube video URL return_timestamps (bool): Whether to include timestamps in output generate_subs (bool): Whether to generate SRT subtitles Returns: tuple: (formatted_result, srt_file, correction_text) """ try: # Download audio from YouTube audio_path = download_youtube_audio(url) # Transcribe the downloaded audio result = transcribe(audio_path, return_timestamps, generate_subs) # Clean up the temporary file try: os.unlink(audio_path) except Exception as e: logger.warning(f"Failed to delete temporary file: {str(e)}") return result except Exception as e: logger.exception(f"Error in YouTube transcription: {str(e)}") raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}") def transcribe(inputs, return_timestamps, generate_subs): """Transcribe audio input using Whisper model via Hugging Face Inference API. Args: inputs (str): Path to audio file to transcribe. return_timestamps (bool): Whether to include timestamps in output. generate_subs (bool): Whether to generate SRT subtitles. Returns: tuple: (formatted_result, srt_file, correction_text) - formatted_result (dict): Transcription results - srt_file (str): Path to SRT file if generated, None otherwise - correction_text (str): Empty string for corrections Raises: gr.Error: If no audio file is provided or transcription fails. """ if inputs is None: logger.warning("No audio file submitted") raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") try: logger.info(f"Processing audio file: {inputs}") # Read the audio file with open(inputs, "rb") as f: data = f.read() # Send request to API response = requests.post(API_URL, headers=headers, data=data) response.raise_for_status() # Raise an exception for bad status codes result = response.json() logger.debug(f"API response: {result}") # Format response as JSON formatted_result = { "text": result.get("text", "") } chunks = [] if return_timestamps and "chunks" in result: logger.info(f"Processing {len(result['chunks'])} chunks") for i, chunk in enumerate(result["chunks"]): logger.debug(f"Processing chunk {i}: {chunk}") try: start_time = chunk.get("timestamp", [None, None])[0] end_time = chunk.get("timestamp", [None, None])[1] text = chunk.get("text", "").strip() if start_time is not None and end_time is not None: chunk_data = { "text": text, "timestamp": [start_time, end_time] } chunks.append(chunk_data) else: logger.warning(f"Invalid timestamp in chunk {i}: {chunk}") except Exception as chunk_error: logger.error(f"Error processing chunk {i}: {str(chunk_error)}") continue formatted_result["chunks"] = chunks logger.info(f"Successfully processed transcription with {len(chunks)} chunks") # Generate subtitles if requested srt_file = None if generate_subs and chunks: logger.info("Generating SRT subtitles") srt_content = generate_srt(chunks) srt_file = save_srt_to_file(srt_content) logger.info("SRT subtitles generated successfully") return formatted_result, srt_file, "" # Return empty string for correction textbox except requests.exceptions.RequestException as e: logger.exception(f"API request failed: {str(e)}") raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") except Exception as e: logger.exception(f"Error during transcription: {str(e)}") raise gr.Error(f"Failed to transcribe audio: {str(e)}") demo = gr.Blocks(theme=gr.themes.Ocean()) # Define interfaces first youtube_transcribe = gr.Interface( fn=transcribe_youtube, inputs=[ gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio from YouTube videos. " "Paste a YouTube URL and get accurate transcription with optional timestamps " "and subtitles." ) ) mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="microphone", type="filepath"), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio from microphone or file upload. " "Perfect for transcribing Tajik podcasts, interviews, and conversations. " "Supports both microphone recording and file uploads." ) ) file_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="upload", type="filepath", label="Audio file"), gr.Checkbox(label="Include timestamps", value=True), gr.Checkbox(label="Generate subtitles", value=True), ], outputs=[ gr.JSON(label="Transcription", open=True), gr.File(label="Subtitles (SRT)", visible=True), ], title="Tajik Speech Transcription", description=( "Transcribe Tajik language audio files. " "Upload your audio file and get accurate transcription with optional timestamps " "and subtitles. Supports various audio formats." ) ) # Then set up the demo with the interfaces with demo: gr.TabbedInterface( [youtube_transcribe, file_transcribe, mf_transcribe], ["YouTube", "Audio file", "Microphone"] ) logger.info("Starting Gradio interface") demo.queue().launch(ssr_mode=False)