from langfuse import Langfuse from langfuse.decorators import observe, langfuse_context from config.config import settings from services.llama_generator import LlamaGenerator import os # Initialize Langfuse os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-04d2302a-aa5c-4870-9703-58ab64c3bcae" os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-d34ea200-feec-428e-a621-784fce93a5af" os.environ["LANGFUSE_HOST"] = "https://chris4k-langfuse-template-space.hf.space" # 🇪🇺 EU region try: langfuse = Langfuse() except Exception as e: print("Langfuse Offline") # main.py from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles from fastapi.responses import StreamingResponse, HTMLResponse import asyncio import json import webrtcvad import numpy as np import wave import io from typing import AsyncGenerator from utils import ( from_en_translation, to_en_translation, tts, tts_to_bytesio, ) from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") # Initialize tools and agent model = HfApiModel() search_tool = DuckDuckGoSearchTool() visit_webpage_tool = VisitWebpageTool() agent = CodeAgent( tools=[search_tool, visit_webpage_tool], model=model, additional_authorized_imports=['requests', 'bs4', 'pandas', 'concurrent.futures', 'csv', 'json'] ) # Constants SAMPLE_RATE = 16000 CHANNELS = 1 CHUNK_SIZE = 480 # 30ms chunks for VAD VAD_MODE = 3 # Aggressiveness mode (3 is most aggressive) desired_language = "de" max_answer_length = 100 #response_generator_pipe = TextGenerationPipeline(max_length=max_answer_length) # Initialize VAD vad = webrtcvad.Vad(VAD_MODE) async def detect_wakeword(audio_chunk: bytes) -> bool: # TODO: Implement proper wake word detection # For now, this is a placeholder that should be replaced with a proper wake word detection model # You might want to use libraries like Porcupine or build your own wake word detector return True async def process_audio_stream(websocket: WebSocket) -> AsyncGenerator[str, None]: buffer = [] is_speaking = False silence_frames = 0 while True: try: # Add a timeout to prevent indefinite waiting try: audio_data = await asyncio.wait_for(websocket.receive_bytes(), timeout=5.0) except asyncio.TimeoutError: print("WebSocket receive timeout") continue except Exception as receive_error: print(f"Error receiving audio data: {receive_error}") # Break the loop if there's a persistent receive error break # Validate audio data if not audio_data or len(audio_data) == 0: print("Received empty audio data") continue # Ensure audio data meets minimum size for VAD processing if len(audio_data) < CHUNK_SIZE: print(f"Audio chunk too small: {len(audio_data)} bytes") continue try: # Convert audio data to the right format for VAD is_speech = vad.is_speech(audio_data, SAMPLE_RATE) except Exception as vad_error: print(f"VAD processing error: {vad_error}") continue if is_speech: silence_frames = 0 buffer.append(audio_data) is_speaking = True elif is_speaking: silence_frames += 1 if silence_frames > 30: # End of utterance detection # Process complete utterance try: audio_bytes = b''.join(buffer) # Convert to wave file for speech recognition wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wav_file: wav_file.setnchannels(CHANNELS) wav_file.setsampwidth(2) # 16-bit audio wav_file.setframerate(SAMPLE_RATE) wav_file.writeframes(audio_bytes) # Reset state buffer = [] is_speaking = False silence_frames = 0 # Check for wake word if await detect_wakeword(audio_bytes): # Process the audio and get response user_speech_text = stt(wav_buffer, desired_language) if "computer" in user_speech_text.lower(): translated_text = to_en_translation(user_speech_text, desired_language) response = await agent.arun(translated_text) # Assuming agent.run is made async bot_response_de = from_en_translation(response, desired_language) # Stream the response yield json.dumps({ "user_text": user_speech_text, "response_de": bot_response_de, "response_en": response }) # Generate and stream audio response bot_voice = tts(bot_response_de, desired_language) bot_voice_bytes = tts_to_bytesio(bot_voice) yield json.dumps({ "audio": bot_voice_bytes.decode('latin1') }) except Exception as processing_error: print(f"Error processing speech utterance: {processing_error}") except Exception as e: print(f"Unexpected error in audio stream processing: {e}") # Add a small delay to prevent rapid reconnection attempts await asyncio.sleep(1) break @app.get("/", response_class=HTMLResponse) async def get_index(): with open("static/index.html") as f: return f.read() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: # Keep the WebSocket connection open with a persistent loop while True: try: async for response in process_audio_stream(websocket): await websocket.send_text(response) except Exception as stream_error: print(f"Audio stream error: {stream_error}") # Attempt to restart the stream await asyncio.sleep(1) except Exception as e: print(f"WebSocket endpoint error: {e}") finally: try: await websocket.close(code=1000) except Exception as close_error: print(f"Error closing WebSocket: {close_error}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)