Spaces:
Paused
Paused
| import gradio as gr | |
| import subprocess | |
| import datetime | |
| import tempfile | |
| import requests | |
| from loguru import logger | |
| API_URL = "https://skdpcqcdd929o4k3.us-east-1.aws.endpoints.huggingface.cloud" | |
| headers = { | |
| "Accept": "application/json", | |
| "Content-Type": "audio/flac" | |
| } | |
| def format_time(seconds): | |
| """Convert seconds to SRT time format (HH:MM:SS,mmm). | |
| Args: | |
| seconds (float): Time in seconds to convert. | |
| Returns: | |
| str: Time formatted as HH:MM:SS,mmm where: | |
| - HH: Hours (00-99) | |
| - MM: Minutes (00-59) | |
| - SS: Seconds (00-59) | |
| - mmm: Milliseconds (000-999) | |
| Example: | |
| >>> format_time(3661.5) | |
| '01:01:01,500' | |
| """ | |
| td = datetime.timedelta(seconds=float(seconds)) | |
| hours = td.seconds // 3600 | |
| minutes = (td.seconds % 3600) // 60 | |
| seconds = td.seconds % 60 | |
| milliseconds = td.microseconds // 1000 | |
| return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
| def generate_srt(chunks): | |
| """Generate SRT format subtitles from transcription chunks. | |
| Args: | |
| chunks (list): List of dictionaries containing transcription chunks. | |
| Each chunk must have: | |
| - "timestamp": List of [start_time, end_time] in seconds | |
| - "text": The transcribed text for that time segment | |
| Returns: | |
| str: SRT formatted subtitles string with format: | |
| ``` | |
| 1 | |
| HH:MM:SS,mmm --> HH:MM:SS,mmm | |
| Text content | |
| 2 | |
| HH:MM:SS,mmm --> HH:MM:SS,mmm | |
| Text content | |
| ... | |
| ``` | |
| Example: | |
| >>> chunks = [ | |
| ... {"timestamp": [0.0, 1.5], "text": "Hello"}, | |
| ... {"timestamp": [1.5, 3.0], "text": "World"} | |
| ... ] | |
| >>> generate_srt(chunks) | |
| '1\\n00:00:00,000 --> 00:00:01,500\\nHello\\n\\n2\\n00:00:01,500 --> 00:00:03,000\\nWorld\\n\\n' | |
| """ | |
| srt_content = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| start_time = format_time(chunk["timestamp"][0]) | |
| end_time = format_time(chunk["timestamp"][1]) | |
| text = chunk.get("text", "").strip() | |
| srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") | |
| return "".join(srt_content) | |
| def save_srt_to_file(srt_content): | |
| """Save SRT content to a temporary file. | |
| Args: | |
| srt_content (str): The SRT formatted subtitles content to save. | |
| Returns: | |
| str or None: Path to the temporary file if content was saved, | |
| None if srt_content was empty. | |
| Note: | |
| The temporary file is created with delete=False to allow it to be | |
| used after the function returns. The file should be deleted by the | |
| caller when no longer needed. | |
| """ | |
| if not srt_content: | |
| return None | |
| # Create a temporary file with .srt extension | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) | |
| temp_file.write(srt_content.encode('utf-8')) | |
| temp_file.close() | |
| return temp_file.name | |
| # Check if ffmpeg is installed | |
| def check_ffmpeg(): | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
| logger.info("ffmpeg check passed successfully") | |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
| logger.error(f"ffmpeg check failed: {str(e)}") | |
| raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") | |
| # Initialize ffmpeg check | |
| check_ffmpeg() | |
| def transcribe(inputs, return_timestamps, generate_subs): | |
| """Transcribe audio input using Whisper model via Hugging Face Inference API. | |
| Args: | |
| inputs (str): Path to audio file to transcribe. | |
| return_timestamps (bool): Whether to include timestamps in output. | |
| generate_subs (bool): Whether to generate SRT subtitles. | |
| Returns: | |
| tuple: (formatted_result, srt_file, correction_text) | |
| - formatted_result (dict): Transcription results | |
| - srt_file (str): Path to SRT file if generated, None otherwise | |
| - correction_text (str): Empty string for corrections | |
| Raises: | |
| gr.Error: If no audio file is provided or transcription fails. | |
| """ | |
| if inputs is None: | |
| logger.warning("No audio file submitted") | |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
| try: | |
| logger.info(f"Processing audio file: {inputs}") | |
| # Read the audio file | |
| with open(inputs, "rb") as f: | |
| data = f.read() | |
| # Send request to API | |
| response = requests.post(API_URL, headers=headers, data=data) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| result = response.json() | |
| logger.debug(f"API response: {result}") | |
| # Format response as JSON | |
| formatted_result = { | |
| "text": result.get("text", "") | |
| } | |
| chunks = [] | |
| if return_timestamps and "chunks" in result: | |
| logger.info(f"Processing {len(result['chunks'])} chunks") | |
| for i, chunk in enumerate(result["chunks"]): | |
| logger.debug(f"Processing chunk {i}: {chunk}") | |
| try: | |
| start_time = chunk.get("timestamp", [None, None])[0] | |
| end_time = chunk.get("timestamp", [None, None])[1] | |
| text = chunk.get("text", "").strip() | |
| if start_time is not None and end_time is not None: | |
| chunk_data = { | |
| "text": text, | |
| "timestamp": [start_time, end_time] | |
| } | |
| chunks.append(chunk_data) | |
| else: | |
| logger.warning(f"Invalid timestamp in chunk {i}: {chunk}") | |
| except Exception as chunk_error: | |
| logger.error(f"Error processing chunk {i}: {str(chunk_error)}") | |
| continue | |
| formatted_result["chunks"] = chunks | |
| logger.info(f"Successfully processed transcription with {len(chunks)} chunks") | |
| # Generate subtitles if requested | |
| srt_file = None | |
| if generate_subs and chunks: | |
| logger.info("Generating SRT subtitles") | |
| srt_content = generate_srt(chunks) | |
| srt_file = save_srt_to_file(srt_content) | |
| logger.info("SRT subtitles generated successfully") | |
| return formatted_result, srt_file, "" # Return empty string for correction textbox | |
| except requests.exceptions.RequestException as e: | |
| logger.exception(f"API request failed: {str(e)}") | |
| raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") | |
| except Exception as e: | |
| logger.exception(f"Error during transcription: {str(e)}") | |
| raise gr.Error(f"Failed to transcribe audio: {str(e)}") | |
| demo = gr.Blocks(theme=gr.themes.Ocean()) | |
| # Define interfaces first | |
| mf_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="microphone", type="filepath"), | |
| gr.Checkbox(label="Include timestamps", value=True), | |
| gr.Checkbox(label="Generate subtitles", value=True), | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Transcription", open=True), | |
| gr.File(label="Subtitles (SRT)", visible=True), | |
| ], | |
| title="Tajik Speech Transcription", | |
| description=( | |
| "Transcribe Tajik language audio from microphone or file upload. " | |
| "Perfect for transcribing Tajik podcasts, interviews, and conversations. " | |
| "Supports both microphone recording and file uploads." | |
| ) | |
| ) | |
| file_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
| gr.Checkbox(label="Include timestamps", value=True), | |
| gr.Checkbox(label="Generate subtitles", value=True), | |
| ], | |
| outputs=[ | |
| gr.JSON(label="Transcription", open=True), | |
| gr.File(label="Subtitles (SRT)", visible=True), | |
| ], | |
| title="Tajik Speech Transcription", | |
| description=( | |
| "Transcribe Tajik language audio files. " | |
| "Upload your audio file and get accurate transcription with optional timestamps " | |
| "and subtitles. Supports various audio formats." | |
| ) | |
| ) | |
| # Then set up the demo with the interfaces | |
| with demo: | |
| gr.TabbedInterface([file_transcribe, mf_transcribe], ["Audio file", "Microphone"]) | |
| logger.info("Starting Gradio interface") | |
| demo.queue().launch(ssr_mode=False) | |