Abdullah Al Asif
--base
78cb487
raw
history blame
9.62 kB
import pyaudio
import numpy as np
import torch
from torch.nn.functional import pad
import time
from queue import Queue
import sounddevice as sd
from .config import settings
CHUNK = settings.CHUNK
FORMAT = pyaudio.paFloat32
CHANNELS = settings.CHANNELS
RATE = settings.RATE
SILENCE_THRESHOLD = settings.SILENCE_THRESHOLD
SPEECH_CHECK_THRESHOLD = settings.SPEECH_CHECK_THRESHOLD
MAX_SILENCE_DURATION = settings.MAX_SILENCE_DURATION
def init_vad_pipeline(hf_token):
"""Initializes the Voice Activity Detection pipeline.
Args:
hf_token (str): Hugging Face API token.
Returns:
pyannote.audio.pipelines.VoiceActivityDetection: VAD pipeline.
"""
from pyannote.audio import Model
from pyannote.audio.pipelines import VoiceActivityDetection
model = Model.from_pretrained(settings.VAD_MODEL, use_auth_token=hf_token)
pipeline = VoiceActivityDetection(segmentation=model)
HYPER_PARAMETERS = {
"min_duration_on": settings.VAD_MIN_DURATION_ON,
"min_duration_off": settings.VAD_MIN_DURATION_OFF,
}
pipeline.instantiate(HYPER_PARAMETERS)
return pipeline
def detect_speech_segments(pipeline, audio_data, sample_rate=None):
"""Detects speech segments in audio using pyannote VAD.
Args:
pipeline (pyannote.audio.pipelines.VoiceActivityDetection): VAD pipeline.
audio_data (np.ndarray or torch.Tensor): Audio data.
sample_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE.
Returns:
torch.Tensor or None: Concatenated speech segments as a torch tensor, or None if no speech is detected.
"""
if sample_rate is None:
sample_rate = settings.RATE
if len(audio_data.shape) == 1:
audio_data = audio_data.reshape(1, -1)
if not isinstance(audio_data, torch.Tensor):
audio_data = torch.from_numpy(audio_data)
if audio_data.shape[1] < sample_rate:
padding_size = sample_rate - audio_data.shape[1]
audio_data = pad(audio_data, (0, padding_size))
vad = pipeline({"waveform": audio_data, "sample_rate": sample_rate})
speech_segments = []
for speech in vad.get_timeline().support():
start_sample = int(speech.start * sample_rate)
end_sample = int(speech.end * sample_rate)
if start_sample < audio_data.shape[1]:
end_sample = min(end_sample, audio_data.shape[1])
segment = audio_data[0, start_sample:end_sample]
speech_segments.append(segment)
if speech_segments:
return torch.cat(speech_segments)
return None
def record_audio(duration=None):
"""Records audio for a specified duration.
Args:
duration (int, optional): Recording duration in seconds. Defaults to settings.RECORD_DURATION.
Returns:
np.ndarray: Recorded audio data as a numpy array.
"""
if duration is None:
duration = settings.RECORD_DURATION
p = pyaudio.PyAudio()
stream = p.open(
format=settings.FORMAT,
channels=settings.CHANNELS,
rate=settings.RATE,
input=True,
frames_per_buffer=settings.CHUNK,
)
print("\nRecording...")
frames = []
for i in range(0, int(settings.RATE / settings.CHUNK * duration)):
data = stream.read(settings.CHUNK)
frames.append(np.frombuffer(data, dtype=np.float32))
print("Done recording")
stream.stop_stream()
stream.close()
p.terminate()
audio_data = np.concatenate(frames, axis=0)
return audio_data
def record_continuous_audio():
"""Continuously monitors audio and detects speech segments.
Returns:
np.ndarray or None: Recorded audio data as a numpy array, or None if no speech is detected.
"""
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
)
print("\nListening... (Press Ctrl+C to stop)")
frames = []
buffer_frames = []
buffer_size = int(RATE * 0.5 / CHUNK)
silence_frames = 0
max_silence_frames = int(RATE / CHUNK * 1)
recording = False
try:
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.float32)
buffer_frames.append(audio_chunk)
if len(buffer_frames) > buffer_size:
buffer_frames.pop(0)
audio_level = np.abs(np.concatenate(buffer_frames)).mean()
if audio_level > SILENCE_THRESHOLD:
if not recording:
print("\nPotential speech detected...")
recording = True
frames.extend(buffer_frames)
frames.append(audio_chunk)
silence_frames = 0
elif recording:
frames.append(audio_chunk)
silence_frames += 1
if silence_frames >= max_silence_frames:
print("Processing speech segment...")
break
time.sleep(0.001)
except KeyboardInterrupt:
pass
finally:
stream.stop_stream()
stream.close()
p.terminate()
if frames:
return np.concatenate(frames)
return None
def check_for_speech(timeout=0.1):
"""Checks if speech is detected in a non-blocking way.
Args:
timeout (float, optional): Duration to check for speech in seconds. Defaults to 0.1.
Returns:
tuple: A tuple containing a boolean indicating if speech was detected and the audio data as a numpy array, or (False, None) if no speech is detected.
"""
p = pyaudio.PyAudio()
frames = []
is_speech = False
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
for _ in range(int(RATE * timeout / CHUNK)):
data = stream.read(CHUNK, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.float32)
frames.append(audio_chunk)
audio_level = np.abs(audio_chunk).mean()
if audio_level > SPEECH_CHECK_THRESHOLD:
is_speech = True
break
finally:
stream.stop_stream()
stream.close()
p.terminate()
if is_speech and frames:
return True, np.concatenate(frames)
return False, None
def play_audio_with_interrupt(audio_data, sample_rate=24000):
"""Plays audio while monitoring for speech interruption.
Args:
audio_data (np.ndarray): Audio data to play.
sample_rate (int, optional): Sample rate for playback. Defaults to 24000.
Returns:
tuple: A tuple containing a boolean indicating if playback was interrupted and None, or (False, None) if playback completes without interruption.
"""
interrupt_queue = Queue()
def input_callback(indata, frames, time, status):
"""Callback for monitoring input audio."""
if status:
print(f"Input status: {status}")
return
audio_level = np.abs(indata[:, 0]).mean()
if audio_level > settings.INTERRUPTION_THRESHOLD:
interrupt_queue.put(True)
def output_callback(outdata, frames, time, status):
"""Callback for output audio."""
if status:
print(f"Output status: {status}")
return
if not interrupt_queue.empty():
raise sd.CallbackStop()
remaining = len(audio_data) - output_callback.position
if remaining == 0:
raise sd.CallbackStop()
valid_frames = min(remaining, frames)
outdata[:valid_frames, 0] = audio_data[
output_callback.position : output_callback.position + valid_frames
]
if valid_frames < frames:
outdata[valid_frames:] = 0
output_callback.position += valid_frames
output_callback.position = 0
try:
with sd.InputStream(
channels=1, callback=input_callback, samplerate=settings.RATE
):
with sd.OutputStream(
channels=1, callback=output_callback, samplerate=sample_rate
):
while output_callback.position < len(audio_data):
sd.sleep(100)
if not interrupt_queue.empty():
return True, None
return False, None
except sd.CallbackStop:
return True, None
except Exception as e:
print(f"Error during playback: {str(e)}")
return False, None
def transcribe_audio(processor, model, audio_data, sampling_rate=None):
"""Transcribes audio using Whisper.
Args:
processor (transformers.WhisperProcessor): Whisper processor.
model (transformers.WhisperForConditionalGeneration): Whisper model.
audio_data (np.ndarray or torch.Tensor): Audio data to transcribe.
sampling_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE.
Returns:
str: Transcribed text.
"""
if sampling_rate is None:
sampling_rate = settings.RATE
if audio_data is None:
return ""
if isinstance(audio_data, torch.Tensor):
audio_data = audio_data.numpy()
input_features = processor(
audio_data, sampling_rate=sampling_rate, return_tensors="pt"
).input_features
predicted_ids = model.generate(input_features)
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
return transcription[0]