Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Request | |
from fastapi.responses import JSONResponse | |
import torch | |
import os | |
from pydub import AudioSegment | |
import tempfile | |
from transformers import pipeline as transorm_pipline,AutoModelForSpeechSeq2Seq, AutoProcessor | |
from openai import OpenAI | |
from pydantic import BaseModel | |
import logging | |
import subprocess | |
from dotenv import load_dotenv | |
import re | |
import requests | |
from typing import Optional, Dict | |
import time | |
import numpy as np | |
import io | |
app = FastAPI() | |
load_dotenv() | |
UPLOAD_FOLDER = tempfile.gettempdir() | |
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
os.environ['TRANSFORMERS_CACHE'] = '/asif/cache/' | |
logging.basicConfig(level=logging.INFO) | |
fluency_pipe = transorm_pipline("audio-classification", model="megathil/fluency_prediction") | |
# Define device and torch_dtype | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
# Define the Hugging Face model and processor | |
distil_model_id = "distil-whisper/distil-small.en" | |
distil_model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
distil_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
) | |
distil_model.to(device) | |
processor = AutoProcessor.from_pretrained(distil_model_id) | |
async def hello_world(): | |
return { | |
"Hello World" | |
} | |
client = OpenAI( | |
api_key=os.environ['AUTH_TOKEN_AI'], | |
base_url="https://api.deepinfra.com/v1/openai", | |
) | |
class EnhanceTextRequest(BaseModel): | |
text: str | |
class AnalyzeTextRequest(BaseModel): | |
text: str | |
question: str | |
class AnalyzeTextResponse(BaseModel): | |
relevance_rating: int | |
vocabulary_rating: int | |
class User(BaseModel): | |
firstName: str | |
lastName: Optional[str] | |
city: Optional[str] = None | |
college: Optional[str] = None | |
department: Optional[str] = None | |
course: Optional[str] = None | |
class ModuleModel(BaseModel): | |
questions: Optional[Dict[str, str]] = None | |
unitId: Optional[str] = None | |
moduleId: Optional[str] = None | |
class MyModel(BaseModel): | |
userProgressStatusId: str | |
# Define the enhance_text function | |
def enhance_text(text, question): | |
response = client.chat.completions.create( | |
model="meta-llama/Meta-Llama-3-8B-Instruct", | |
messages=[ | |
{"role": "system", "content": "You will be provided with a job interview question and answer.Your task is to rewrite the answer in standard English, making it more comprehensive and suitable for an fresher in interview. Ensure the response is at least 80 words and includes any relevant standard points related to the question.Be concise and omit disclaimers."}, | |
{"role": "user", "content": f"question:{question} \n Answer:{text}"} | |
], | |
temperature=0.7, | |
max_tokens=400, | |
top_p=1 | |
) | |
enhanced_text = response.choices[0].message.content | |
return enhanced_text | |
def extract_rating_and_reason(text, rating_pattern, reason_pattern): | |
rating_match = re.search(rating_pattern, text, re.IGNORECASE) | |
reason_match = re.search(reason_pattern, text, re.IGNORECASE | re.DOTALL) | |
if rating_match and reason_match: | |
rating = int(rating_match.group(1)) | |
reason = reason_match.group(1).strip() | |
return {"rating": rating, "reason": reason} | |
else: | |
return {"rating": "", "reason": "Unable to retrieve rating and reason due to insufficient information."} | |
def vocabulary_text(text, question): | |
response = client.chat.completions.create( | |
model="meta-llama/Meta-Llama-3-8B-Instruct", | |
messages=[ | |
{"role": "system", "content": "You will be provided with question and answer, and your task is to check the rate the Vocabulary in the answer in the range of 1-100 and reasoning (Provide answer in this format vocablary_rating : rating, reason: , in 100 words and Respond without any introductory or conclusion text)"}, | |
{"role": "user", "content": f"question:{question} \n Answer:{text}"} | |
], | |
temperature=0.7, | |
max_tokens=250, | |
top_p=1 | |
) | |
vocabulary_rating = response.choices[0].message.content | |
print("vocabulary_rating-->",vocabulary_rating) | |
# Define patterns to match rating and reason | |
vocab_rating_pattern = r"(?:vocabulary\s*rating|vocab\.\s*rating|vocab\s*rating|rating)\s*:\s*(\d+)" | |
reason_pattern = r"reason\s*:\s*(.*)" | |
# Extract and format the rating and reason | |
result = extract_rating_and_reason(vocabulary_rating, vocab_rating_pattern, reason_pattern) | |
formatted_vocabulary_rating = {"Vocabulary_rating": result["rating"], "reason": result["reason"]} | |
return formatted_vocabulary_rating | |
def relevance_check(text, question): | |
response = client.chat.completions.create( | |
model="meta-llama/Meta-Llama-3-8B-Instruct", | |
messages=[ | |
{"role": "system", "content": "You will be provided with question and answer from an job interview and your task is to check the Answer is relevant to the question in the range of 1-100 and reasoning (Provide answer in this format relevance_rating : rating , reason: , in 100 words and Respond without any introductory or conclusion text)"}, | |
{"role": "user", "content": f"question:{question} \n Answer:{text}"} | |
], | |
temperature=0.7, | |
max_tokens=250, | |
top_p=1 | |
) | |
relevance_rating = response.choices[0].message.content | |
print("relevance_rating-->",relevance_rating) | |
# Define patterns to match rating and reason | |
rel_rating_pattern = r"(?:relevance\s*rating|rel\.\s*rating|rel\s*rating|rating)\s*:\s*(\d+)" | |
reason_pattern = r"reason\s*:\s*(.*)" | |
# Extract and format the rating and reason | |
result = extract_rating_and_reason(relevance_rating, rel_rating_pattern, reason_pattern) | |
formatted_relevance_rating = {"Relevance_rating": result["rating"], "reason": result["reason"]} | |
return formatted_relevance_rating | |
def transcribe_audio(file_path: str) -> str: | |
whisper = transorm_pipline( | |
"automatic-speech-recognition", | |
model=distil_model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
max_new_tokens=128, | |
torch_dtype=torch_dtype, | |
device=device, | |
) | |
transcription = whisper(file_path, chunk_length_s=30, stride_length_s=5, batch_size=8) | |
return transcription | |
def calculate_speech_rate(text, audio_duration): | |
# Calculate the word count from the text | |
full_transcription = text['text'] | |
word_count = len(full_transcription.split()) | |
logging.info(f"Total word count: {word_count}") | |
# Calculate the speech rate in words per minute | |
speech_rate_wpm = word_count / audio_duration * 60 | |
# Define minimum and maximum speech rates for normalization | |
min_speech_rate = 80 # words per minute (0%) | |
max_speech_rate = 200 # words per minute (100%) | |
# Normalize the speech rate to a percentage | |
if speech_rate_wpm < min_speech_rate: | |
speech_rate_percentage = 0 | |
elif speech_rate_wpm > max_speech_rate: | |
speech_rate_percentage = 100 | |
else: | |
speech_rate_percentage = ((speech_rate_wpm - min_speech_rate) / | |
(max_speech_rate - min_speech_rate)) * 100 | |
return speech_rate_percentage | |
async def process_audio( | |
request: Request, | |
audio_file: UploadFile = File(...), | |
question: Optional[str] = Form("Tell me about yourself?"), | |
unitId: Optional[str] = Form(""), | |
moduleId: Optional[str] = Form(""), | |
questionId: Optional[str] = Form(""), | |
userProgressStatusId: Optional[str] = Form(""), | |
firstName: Optional[str] = Form(""), | |
lastName: Optional[str] = Form(""), | |
city: Optional[str] = Form(""), | |
college: Optional[str] = Form(""), | |
department: Optional[str] = Form(""), | |
course: Optional[str] = Form(""), | |
userId: Optional[str] = Form("") | |
): | |
# Logging the raw request data | |
logging.info(f"Request headers: {request.headers}") | |
logging.info(f"Request query params: {request.query_params}") | |
logging.info(f"Request path params: {request.path_params}") | |
logging.info(f"Request cookies: {request.cookies}") | |
# Logging form data and file details | |
logging.info(f"Received audio file: {audio_file.filename}") | |
logging.info(f"Received question: {question}") | |
user = { | |
"firstName": firstName, | |
"lastName": lastName, | |
"city": city, | |
"college": college, | |
"department": department, | |
"course": course, | |
} | |
logging.info(user) | |
start_time = time.time() | |
try: | |
file_path = os.path.join(UPLOAD_FOLDER, audio_file.filename) | |
with open(file_path, "wb") as f: | |
f.write(await audio_file.read()) | |
filename = audio_file.filename | |
file_size_bytes = os.path.getsize(file_path) | |
file_size_mb = file_size_bytes / (1024 * 1024) | |
logging.info(f"File Size: {file_size_mb:.2f} MB") | |
if filename.endswith(('.mp4', '.avi', '.mkv', '.mov')): | |
try: | |
output_audio_path = os.path.join(UPLOAD_FOLDER, 'output_audio.mp3') | |
command = ['ffmpeg', '-i', file_path, '-vn', '-acodec', 'libmp3lame', '-y', output_audio_path] | |
subprocess.run(command, check=True) | |
audio_file_path = output_audio_path | |
except Exception as e: | |
return JSONResponse(content={'error': f'Error converting video to audio: {str(e)}'}, status_code=500) | |
elif filename.endswith('.mp3'): | |
try: | |
output_audio_path = os.path.join(UPLOAD_FOLDER, filename.rsplit('.', 1)[0] + '.wav') | |
command = ['ffmpeg', '-i', file_path, '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', '-y', output_audio_path] | |
subprocess.run(command, check=True) | |
audio_file_path = output_audio_path | |
except Exception as e: | |
return JSONResponse(content={'error': f'Error converting MP3 to WAV: {str(e)}'}, status_code=500) | |
else: | |
audio_file_path = file_path | |
audio = AudioSegment.from_file(audio_file_path) | |
audio_duration = audio.duration_seconds | |
logging.info(f"Audio Duration: {audio_duration}") | |
if audio_duration < 15: | |
data = { | |
"code":400, | |
"messageCode":"1020", | |
"message": "Recording Duration too short." | |
} | |
return JSONResponse(content=data) | |
transcribe_results = transcribe_audio(audio_file_path) | |
if not transcribe_results['text'].strip(): | |
data = { | |
"code":400, | |
"messageCode":"1020", | |
"message": "No content in Audio" | |
} | |
elif len(transcribe_results['text'].split()) < 20: | |
data = { | |
"code":400, | |
"messageCode":"1020", | |
"message": "Audio lacks content" | |
} | |
return JSONResponse(content=data) | |
else: | |
logging.info(f"transcribed content from the audio: {transcribe_results}") | |
transcribed_content = transcribe_results['text'] | |
# logging.info(f"transcribed content from the audio: {transcribed_content}") | |
logging.info(f"Audio Duration: {audio_duration}") | |
# Trim audio to 10 seconds for emotion and fluency analysis | |
trimmed_audio = audio[:10000] # First 10 seconds | |
trimmed_audio_path = os.path.join(UPLOAD_FOLDER, 'trimmed_audio.wav') | |
trimmed_audio.export(trimmed_audio_path, format="wav") | |
# Get the volume level of the audio | |
volume_db = get_audio_volume_level(audio) | |
volume_classification, score = classify_volume_level(volume_db) | |
emotionRecognition={ | |
"label": volume_classification, "score": score | |
} | |
fluency_results = fluency_pipe(trimmed_audio_path) | |
max_result = max(fluency_results, key=lambda x: x['score']) | |
fluencyLevel = { | |
"label": max_result['label'], | |
"score": round(max_result['score'], 2) | |
} | |
# fluencyLevel = { | |
# "label": "Fluent", | |
# "score": 0.99 | |
# } | |
enhance_text_result = enhance_text(transcribe_results['text'], question) | |
relevance_rating = relevance_check(transcribe_results['text'], question) | |
contentRelevanceRating = relevance_rating['Relevance_rating'] | |
contentRelevanceRatingReason = relevance_rating['reason'] | |
voacablary_level = vocabulary_text(transcribe_results['text'], question) | |
vocabularyRating = voacablary_level['Vocabulary_rating'] | |
vocabularyRatingReason = voacablary_level['reason'] | |
speech_rate = calculate_speech_rate(transcribe_results, audio_duration) | |
os.remove(file_path) | |
os.remove(trimmed_audio_path) | |
if audio_file_path != file_path: | |
os.remove(audio_file_path) | |
node_data = { | |
"aiToken": "25b0b507a8c467b4bd0d011df44fe72b6a4aa38ee60a868f3705745c45254aa6", | |
"userId": userId, | |
"userProgressStatusId": userProgressStatusId, | |
"unitId": unitId, | |
"moduleId": moduleId, | |
"questionId": questionId, | |
"aiResponseData": { | |
'contentRelevanceRating':contentRelevanceRating, | |
'contentRelevanceRatingReason':contentRelevanceRatingReason, | |
'vocabularyRating':vocabularyRating, | |
'vocabularyRatingReason':vocabularyRatingReason, | |
"transcription": transcribe_results, | |
"emotionRecognition": emotionRecognition, | |
"fluencyLevel": fluencyLevel, | |
"enhancedText": enhance_text_result, | |
"speechRate": speech_rate, | |
} | |
} | |
data = { | |
"code": 200, | |
"messageCode":"1004", | |
"data":{ | |
'contentRelevanceRating':contentRelevanceRating, | |
'contentRelevanceRatingReason':contentRelevanceRatingReason, | |
'vocabularyRating':vocabularyRating, | |
'vocabularyRatingReason':vocabularyRatingReason, | |
"transcription": transcribe_results, | |
"emotionRecognition": emotionRecognition, | |
"fluencyLevel": fluencyLevel, | |
"enhancedText": enhance_text_result, | |
"speechRate": speech_rate, | |
}, | |
"message": "Record(s) Found." | |
} | |
node_server_url = "https://backendapi.megathil.com/processAudio/saveUserResult" | |
requests.post(node_server_url, json=node_data) | |
end_time = time.time() # Record end time | |
elapsed_time = end_time - start_time # Calculate elapsed time | |
logging.info(f"Total processing time: {elapsed_time:.2f} seconds") | |
return JSONResponse(content=data) | |
except Exception as e: | |
logging.error(f"Error processing request: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
def get_audio_volume_level(audio: AudioSegment) -> float: | |
# Convert the audio data to a numpy array | |
samples = np.array(audio.get_array_of_samples(), dtype=np.float32) | |
# Calculate the root mean square (RMS) of the samples | |
rms = np.sqrt(np.mean(samples**2)) | |
# Convert RMS to decibels (dB) | |
if rms == 0: | |
rms_db = -np.inf # Use -infinity dB for silence | |
else: | |
rms_db = 20 * np.log10(rms) | |
return rms_db | |
def classify_volume_level(volume_db: float) -> str: | |
"""Classify the audio volume level into 'High', 'Medium', or 'Low' with specific scores.""" | |
if volume_db == -np.inf or volume_db < 50.0: | |
return "Low", 40.00 | |
elif 50.0 <= volume_db < 70.0: | |
return "Medium", 60.00 | |
elif volume_db >= 70.0: | |
return "High", 100.00 | |
else: | |
return "High", 100.00 |