File size: 3,779 Bytes
3a4500b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# app.py
import os, tempfile, subprocess, gradio as gr
from dotenv import load_dotenv
import whisper
import pvfalcon

# ───────────────────────────────────────────
# 1.  ENVIRONMENT
# ───────────────────────────────────────────
load_dotenv()
FALCON_ACCESS_KEY = os.getenv("FALCON_ACCESS_KEY")
if not FALCON_ACCESS_KEY:
    raise RuntimeError(
        "Set FALCON_ACCESS_KEY in your environment or .env file "
        "(get one free at https://console.picovoice.ai)."
    )

# ───────────────────────────────────────────
# 2.  MODELS
# ───────────────────────────────────────────
whisper_model = whisper.load_model("base")          # CPU-friendly
falcon = pvfalcon.create(access_key=FALCON_ACCESS_KEY)

# ───────────────────────────────────────────
# 3.  CORE LOGIC
# ───────────────────────────────────────────
def process_video(file, language="Auto"):
    # 3.1  Choose language for Whisper
    lang_code = None if language == "Auto" else language.lower()

    # 3.2  Extract mono 16-kHz WAV with ffmpeg
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav:
        wav_path = wav.name
    subprocess.run(
        ["ffmpeg", "-y", "-i", file.name,
         "-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", wav_path],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
    )
    if not os.path.getsize(wav_path):
        return "Audio extraction failed.", ""

    # 3.3  Speaker diarization
    segments = falcon.process_file(wav_path)         # list[pvfalcon.Segment]
    diarized_map, label_map, counter = [], {}, 1
    for seg in segments:
        tag = seg.speaker_tag
        if tag not in label_map:
            label_map[tag] = f"Speaker {counter}"
            counter += 1
        diarized_map.append(
            dict(start=seg.start_sec, end=seg.end_sec, speaker=label_map[tag])
        )

    # 3.4  Transcription (Whisper)
    res = whisper_model.transcribe(wav_path, language=lang_code)
    paragraph_transcript = res["text"]                       # plain paragraph

    # 3.5  Merge speakers with transcription
    speaker_lines = []
    for s in res.get("segments", []):
        speaker = next(
            (m["speaker"] for m in diarized_map if m["start"] <= s["start"] <= m["end"]),
            "Unknown"
        )
        speaker_lines.append(f"{speaker}: {s['text']}")
    speaker_transcript = "\n".join(speaker_lines)

    # 3.6  Return in desired order
    return speaker_transcript, paragraph_transcript

# ───────────────────────────────────────────
# 4.  GRADIO UI
# ───────────────────────────────────────────
demo = gr.Interface(
    fn=process_video,
    inputs=[
        gr.File(label="Upload Video", type="filepath"),
        gr.Dropdown(["Auto", "English", "Hindi", "Urdu"], label="Language")
    ],
    outputs=[
        gr.Textbox(label="Speaker-wise Transcript", show_copy_button=True),
        gr.Textbox(label=" Transcription", show_copy_button=True)
    ],
    title="Transcription + Speaker Segmentation",
    description="Whisper + Picovoice Falcon running fully on CPU."
)

if __name__ == "__main__":
    demo.launch()