VoiceBot / app.py
Chris4K's picture
Update app.py
ad50c97 verified
raw
history blame
7.37 kB
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from config.config import settings
from services.llama_generator import LlamaGenerator
import os
# Initialize Langfuse
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-04d2302a-aa5c-4870-9703-58ab64c3bcae"
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-d34ea200-feec-428e-a621-784fce93a5af"
os.environ["LANGFUSE_HOST"] = "https://chris4k-langfuse-template-space.hf.space" # 🇪🇺 EU region
try:
langfuse = Langfuse()
except Exception as e:
print("Langfuse Offline")
# main.py
from fastapi import FastAPI, WebSocket
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse, HTMLResponse
import asyncio
import json
import webrtcvad
import numpy as np
import wave
import io
from typing import AsyncGenerator
from utils import (
from_en_translation,
to_en_translation,
tts,
tts_to_bytesio,
)
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
# Initialize tools and agent
model = HfApiModel()
search_tool = DuckDuckGoSearchTool()
visit_webpage_tool = VisitWebpageTool()
agent = CodeAgent(
tools=[search_tool, visit_webpage_tool],
model=model,
additional_authorized_imports=['requests', 'bs4', 'pandas', 'concurrent.futures', 'csv', 'json']
)
# Constants
SAMPLE_RATE = 16000
CHANNELS = 1
CHUNK_SIZE = 480 # 30ms chunks for VAD
VAD_MODE = 3 # Aggressiveness mode (3 is most aggressive)
desired_language = "de"
max_answer_length = 100
#response_generator_pipe = TextGenerationPipeline(max_length=max_answer_length)
# Initialize VAD
vad = webrtcvad.Vad(VAD_MODE)
async def detect_wakeword(audio_chunk: bytes) -> bool:
# TODO: Implement proper wake word detection
# For now, this is a placeholder that should be replaced with a proper wake word detection model
# You might want to use libraries like Porcupine or build your own wake word detector
return True
async def process_audio_stream(websocket: WebSocket) -> AsyncGenerator[str, None]:
buffer = []
is_speaking = False
silence_frames = 0
while True:
try:
# Add a timeout to prevent indefinite waiting
try:
audio_data = await asyncio.wait_for(websocket.receive_bytes(), timeout=5.0)
except asyncio.TimeoutError:
print("WebSocket receive timeout")
continue
except Exception as receive_error:
print(f"Error receiving audio data: {receive_error}")
# Break the loop if there's a persistent receive error
break
# Validate audio data
if not audio_data or len(audio_data) == 0:
print("Received empty audio data")
continue
# Ensure audio data meets minimum size for VAD processing
if len(audio_data) < CHUNK_SIZE:
print(f"Audio chunk too small: {len(audio_data)} bytes")
continue
try:
# Convert audio data to the right format for VAD
is_speech = vad.is_speech(audio_data, SAMPLE_RATE)
except Exception as vad_error:
print(f"VAD processing error: {vad_error}")
continue
if is_speech:
silence_frames = 0
buffer.append(audio_data)
is_speaking = True
elif is_speaking:
silence_frames += 1
if silence_frames > 30: # End of utterance detection
# Process complete utterance
try:
audio_bytes = b''.join(buffer)
# Convert to wave file for speech recognition
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wav_file:
wav_file.setnchannels(CHANNELS)
wav_file.setsampwidth(2) # 16-bit audio
wav_file.setframerate(SAMPLE_RATE)
wav_file.writeframes(audio_bytes)
# Reset state
buffer = []
is_speaking = False
silence_frames = 0
# Check for wake word
if await detect_wakeword(audio_bytes):
# Process the audio and get response
user_speech_text = stt(wav_buffer, desired_language)
if "computer" in user_speech_text.lower():
translated_text = to_en_translation(user_speech_text, desired_language)
response = await agent.arun(translated_text) # Assuming agent.run is made async
bot_response_de = from_en_translation(response, desired_language)
# Stream the response
yield json.dumps({
"user_text": user_speech_text,
"response_de": bot_response_de,
"response_en": response
})
# Generate and stream audio response
bot_voice = tts(bot_response_de, desired_language)
bot_voice_bytes = tts_to_bytesio(bot_voice)
yield json.dumps({
"audio": bot_voice_bytes.decode('latin1')
})
except Exception as processing_error:
print(f"Error processing speech utterance: {processing_error}")
except Exception as e:
print(f"Unexpected error in audio stream processing: {e}")
# Add a small delay to prevent rapid reconnection attempts
await asyncio.sleep(1)
break
@app.get("/", response_class=HTMLResponse)
async def get_index():
with open("static/index.html") as f:
return f.read()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
# Keep the WebSocket connection open with a persistent loop
while True:
try:
async for response in process_audio_stream(websocket):
await websocket.send_text(response)
except Exception as stream_error:
print(f"Audio stream error: {stream_error}")
# Attempt to restart the stream
await asyncio.sleep(1)
except Exception as e:
print(f"WebSocket endpoint error: {e}")
finally:
try:
await websocket.close(code=1000)
except Exception as close_error:
print(f"Error closing WebSocket: {close_error}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)