from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Request from fastapi.responses import JSONResponse import torch import os from pydub import AudioSegment import tempfile from transformers import pipeline as transorm_pipline,AutoModelForSpeechSeq2Seq, AutoProcessor from openai import OpenAI from pydantic import BaseModel import logging import subprocess from dotenv import load_dotenv import re import requests from typing import Optional, Dict import time import numpy as np import io app = FastAPI() load_dotenv() UPLOAD_FOLDER = tempfile.gettempdir() os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" os.environ['TRANSFORMERS_CACHE'] = '/asif/cache/' logging.basicConfig(level=logging.INFO) fluency_pipe = transorm_pipline("audio-classification", model="megathil/fluency_prediction") # Define device and torch_dtype device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 # Define the Hugging Face model and processor distil_model_id = "distil-whisper/distil-small.en" distil_model = AutoModelForSpeechSeq2Seq.from_pretrained( distil_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True ) distil_model.to(device) processor = AutoProcessor.from_pretrained(distil_model_id) async def hello_world(): return { "Hello World" } client = OpenAI( api_key=os.environ['AUTH_TOKEN_AI'], base_url="https://api.deepinfra.com/v1/openai", ) class EnhanceTextRequest(BaseModel): text: str class AnalyzeTextRequest(BaseModel): text: str question: str class AnalyzeTextResponse(BaseModel): relevance_rating: int vocabulary_rating: int class User(BaseModel): firstName: str lastName: Optional[str] city: Optional[str] = None college: Optional[str] = None department: Optional[str] = None course: Optional[str] = None class ModuleModel(BaseModel): questions: Optional[Dict[str, str]] = None unitId: Optional[str] = None moduleId: Optional[str] = None class MyModel(BaseModel): userProgressStatusId: str # Define the enhance_text function def enhance_text(text, question): response = client.chat.completions.create( model="meta-llama/Meta-Llama-3-8B-Instruct", messages=[ {"role": "system", "content": "You will be provided with a job interview question and answer.Your task is to rewrite the answer in standard English, making it more comprehensive and suitable for an fresher in interview. Ensure the response is at least 80 words and includes any relevant standard points related to the question.Be concise and omit disclaimers."}, {"role": "user", "content": f"question:{question} \n Answer:{text}"} ], temperature=0.7, max_tokens=400, top_p=1 ) enhanced_text = response.choices[0].message.content return enhanced_text def extract_rating_and_reason(text, rating_pattern, reason_pattern): rating_match = re.search(rating_pattern, text, re.IGNORECASE) reason_match = re.search(reason_pattern, text, re.IGNORECASE | re.DOTALL) if rating_match and reason_match: rating = int(rating_match.group(1)) reason = reason_match.group(1).strip() return {"rating": rating, "reason": reason} else: return {"rating": "", "reason": "Unable to retrieve rating and reason due to insufficient information."} def vocabulary_text(text, question): response = client.chat.completions.create( model="meta-llama/Meta-Llama-3-8B-Instruct", messages=[ {"role": "system", "content": "You will be provided with question and answer, and your task is to check the rate the Vocabulary in the answer in the range of 1-100 and reasoning (Provide answer in this format vocablary_rating : rating, reason: , in 100 words and Respond without any introductory or conclusion text)"}, {"role": "user", "content": f"question:{question} \n Answer:{text}"} ], temperature=0.7, max_tokens=250, top_p=1 ) vocabulary_rating = response.choices[0].message.content print("vocabulary_rating-->",vocabulary_rating) # Define patterns to match rating and reason vocab_rating_pattern = r"(?:vocabulary\s*rating|vocab\.\s*rating|vocab\s*rating|rating)\s*:\s*(\d+)" reason_pattern = r"reason\s*:\s*(.*)" # Extract and format the rating and reason result = extract_rating_and_reason(vocabulary_rating, vocab_rating_pattern, reason_pattern) formatted_vocabulary_rating = {"Vocabulary_rating": result["rating"], "reason": result["reason"]} return formatted_vocabulary_rating def relevance_check(text, question): response = client.chat.completions.create( model="meta-llama/Meta-Llama-3-8B-Instruct", messages=[ {"role": "system", "content": "You will be provided with question and answer from an job interview and your task is to check the Answer is relevant to the question in the range of 1-100 and reasoning (Provide answer in this format relevance_rating : rating , reason: , in 100 words and Respond without any introductory or conclusion text)"}, {"role": "user", "content": f"question:{question} \n Answer:{text}"} ], temperature=0.7, max_tokens=250, top_p=1 ) relevance_rating = response.choices[0].message.content print("relevance_rating-->",relevance_rating) # Define patterns to match rating and reason rel_rating_pattern = r"(?:relevance\s*rating|rel\.\s*rating|rel\s*rating|rating)\s*:\s*(\d+)" reason_pattern = r"reason\s*:\s*(.*)" # Extract and format the rating and reason result = extract_rating_and_reason(relevance_rating, rel_rating_pattern, reason_pattern) formatted_relevance_rating = {"Relevance_rating": result["rating"], "reason": result["reason"]} return formatted_relevance_rating def transcribe_audio(file_path: str) -> str: whisper = transorm_pipline( "automatic-speech-recognition", model=distil_model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, torch_dtype=torch_dtype, device=device, ) transcription = whisper(file_path, chunk_length_s=30, stride_length_s=5, batch_size=8) return transcription def calculate_speech_rate(text, audio_duration): # Calculate the word count from the text full_transcription = text['text'] word_count = len(full_transcription.split()) logging.info(f"Total word count: {word_count}") # Calculate the speech rate in words per minute speech_rate_wpm = word_count / audio_duration * 60 # Define minimum and maximum speech rates for normalization min_speech_rate = 80 # words per minute (0%) max_speech_rate = 200 # words per minute (100%) # Normalize the speech rate to a percentage if speech_rate_wpm < min_speech_rate: speech_rate_percentage = 0 elif speech_rate_wpm > max_speech_rate: speech_rate_percentage = 100 else: speech_rate_percentage = ((speech_rate_wpm - min_speech_rate) / (max_speech_rate - min_speech_rate)) * 100 return speech_rate_percentage @app.post("/process_audio_master") async def process_audio( request: Request, audio_file: UploadFile = File(...), question: Optional[str] = Form("Tell me about yourself?"), unitId: Optional[str] = Form(""), moduleId: Optional[str] = Form(""), questionId: Optional[str] = Form(""), userProgressStatusId: Optional[str] = Form(""), firstName: Optional[str] = Form(""), lastName: Optional[str] = Form(""), city: Optional[str] = Form(""), college: Optional[str] = Form(""), department: Optional[str] = Form(""), course: Optional[str] = Form(""), userId: Optional[str] = Form("") ): # Logging the raw request data logging.info(f"Request headers: {request.headers}") logging.info(f"Request query params: {request.query_params}") logging.info(f"Request path params: {request.path_params}") logging.info(f"Request cookies: {request.cookies}") # Logging form data and file details logging.info(f"Received audio file: {audio_file.filename}") logging.info(f"Received question: {question}") user = { "firstName": firstName, "lastName": lastName, "city": city, "college": college, "department": department, "course": course, } logging.info(user) start_time = time.time() try: file_path = os.path.join(UPLOAD_FOLDER, audio_file.filename) with open(file_path, "wb") as f: f.write(await audio_file.read()) filename = audio_file.filename file_size_bytes = os.path.getsize(file_path) file_size_mb = file_size_bytes / (1024 * 1024) logging.info(f"File Size: {file_size_mb:.2f} MB") if filename.endswith(('.mp4', '.avi', '.mkv', '.mov')): try: output_audio_path = os.path.join(UPLOAD_FOLDER, 'output_audio.mp3') command = ['ffmpeg', '-i', file_path, '-vn', '-acodec', 'libmp3lame', '-y', output_audio_path] subprocess.run(command, check=True) audio_file_path = output_audio_path except Exception as e: return JSONResponse(content={'error': f'Error converting video to audio: {str(e)}'}, status_code=500) elif filename.endswith('.mp3'): try: output_audio_path = os.path.join(UPLOAD_FOLDER, filename.rsplit('.', 1)[0] + '.wav') command = ['ffmpeg', '-i', file_path, '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', '-y', output_audio_path] subprocess.run(command, check=True) audio_file_path = output_audio_path except Exception as e: return JSONResponse(content={'error': f'Error converting MP3 to WAV: {str(e)}'}, status_code=500) else: audio_file_path = file_path audio = AudioSegment.from_file(audio_file_path) audio_duration = audio.duration_seconds logging.info(f"Audio Duration: {audio_duration}") if audio_duration < 15: data = { "code":400, "messageCode":"1020", "message": "Recording Duration too short." } return JSONResponse(content=data) transcribe_results = transcribe_audio(audio_file_path) if not transcribe_results['text'].strip(): data = { "code":400, "messageCode":"1020", "message": "No content in Audio" } elif len(transcribe_results['text'].split()) < 20: data = { "code":400, "messageCode":"1020", "message": "Audio lacks content" } return JSONResponse(content=data) else: logging.info(f"transcribed content from the audio: {transcribe_results}") transcribed_content = transcribe_results['text'] # logging.info(f"transcribed content from the audio: {transcribed_content}") logging.info(f"Audio Duration: {audio_duration}") # Trim audio to 10 seconds for emotion and fluency analysis trimmed_audio = audio[:10000] # First 10 seconds trimmed_audio_path = os.path.join(UPLOAD_FOLDER, 'trimmed_audio.wav') trimmed_audio.export(trimmed_audio_path, format="wav") # Get the volume level of the audio volume_db = get_audio_volume_level(audio) volume_classification, score = classify_volume_level(volume_db) emotionRecognition={ "label": volume_classification, "score": score } fluency_results = fluency_pipe(trimmed_audio_path) max_result = max(fluency_results, key=lambda x: x['score']) fluencyLevel = { "label": max_result['label'], "score": round(max_result['score'], 2) } # fluencyLevel = { # "label": "Fluent", # "score": 0.99 # } enhance_text_result = enhance_text(transcribe_results['text'], question) relevance_rating = relevance_check(transcribe_results['text'], question) contentRelevanceRating = relevance_rating['Relevance_rating'] contentRelevanceRatingReason = relevance_rating['reason'] voacablary_level = vocabulary_text(transcribe_results['text'], question) vocabularyRating = voacablary_level['Vocabulary_rating'] vocabularyRatingReason = voacablary_level['reason'] speech_rate = calculate_speech_rate(transcribe_results, audio_duration) os.remove(file_path) os.remove(trimmed_audio_path) if audio_file_path != file_path: os.remove(audio_file_path) node_data = { "aiToken": "25b0b507a8c467b4bd0d011df44fe72b6a4aa38ee60a868f3705745c45254aa6", "userId": userId, "userProgressStatusId": userProgressStatusId, "unitId": unitId, "moduleId": moduleId, "questionId": questionId, "aiResponseData": { 'contentRelevanceRating':contentRelevanceRating, 'contentRelevanceRatingReason':contentRelevanceRatingReason, 'vocabularyRating':vocabularyRating, 'vocabularyRatingReason':vocabularyRatingReason, "transcription": transcribe_results, "emotionRecognition": emotionRecognition, "fluencyLevel": fluencyLevel, "enhancedText": enhance_text_result, "speechRate": speech_rate, } } data = { "code": 200, "messageCode":"1004", "data":{ 'contentRelevanceRating':contentRelevanceRating, 'contentRelevanceRatingReason':contentRelevanceRatingReason, 'vocabularyRating':vocabularyRating, 'vocabularyRatingReason':vocabularyRatingReason, "transcription": transcribe_results, "emotionRecognition": emotionRecognition, "fluencyLevel": fluencyLevel, "enhancedText": enhance_text_result, "speechRate": speech_rate, }, "message": "Record(s) Found." } node_server_url = "https://backendapi.megathil.com/processAudio/saveUserResult" requests.post(node_server_url, json=node_data) end_time = time.time() # Record end time elapsed_time = end_time - start_time # Calculate elapsed time logging.info(f"Total processing time: {elapsed_time:.2f} seconds") return JSONResponse(content=data) except Exception as e: logging.error(f"Error processing request: {e}") return JSONResponse(content={"error": str(e)}, status_code=500) def get_audio_volume_level(audio: AudioSegment) -> float: # Convert the audio data to a numpy array samples = np.array(audio.get_array_of_samples(), dtype=np.float32) # Calculate the root mean square (RMS) of the samples rms = np.sqrt(np.mean(samples**2)) # Convert RMS to decibels (dB) if rms == 0: rms_db = -np.inf # Use -infinity dB for silence else: rms_db = 20 * np.log10(rms) return rms_db def classify_volume_level(volume_db: float) -> str: """Classify the audio volume level into 'High', 'Medium', or 'Low' with specific scores.""" if volume_db == -np.inf or volume_db < 50.0: return "Low", 40.00 elif 50.0 <= volume_db < 70.0: return "Medium", 60.00 elif volume_db >= 70.0: return "High", 100.00 else: return "High", 100.00