Spaces:
Sleeping
Sleeping
import asyncio | |
from fastapi import FastAPI, WebSocket | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
import uvicorn | |
import json | |
from transformers import pipeline | |
from collections import deque | |
from collections import defaultdict | |
import math | |
import sys | |
import random | |
import os | |
sys.path.append(".") | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory="../frontend"), name="static") | |
# ๐น Load Emotion Detection Model | |
emotion_classifier = pipeline( | |
"zero-shot-classification", | |
model="MarfinF/marfin_emotion", | |
framework="pt" | |
) | |
# ๐น Emotion-to-Mood Mapping | |
emotion_to_mood = { | |
"senang": "happy", | |
"sedih": "sad", | |
"marah": "excited", | |
"takut": "relaxed", | |
"cinta": "romantic" | |
} | |
# ๐น WebSocket Clients | |
clients = {} | |
chat_history = deque(maxlen=4) | |
mood_to_genre = { | |
"happy": "pop", | |
"sad": "acoustic", | |
"excited": "rock", | |
"intense": "cinematic", | |
"romantic": "rnb", | |
"chill": "chill" | |
} | |
# ๐น Detect Emotion | |
def detect_emotion(text): | |
labels = ["takut", "marah", "sedih", "senang", "cinta"] | |
result = emotion_classifier(text, candidate_labels=labels) | |
top_emotion = result['labels'][0] | |
top_scores = result['scores'][0] | |
return top_emotion, top_scores | |
# ๐น Get Music Recommendations | |
def get_recommendations_by_mood(mood): | |
genre_folder = mood_to_genre.get(mood, "pop") | |
folder_path = f"../music/{genre_folder}" | |
print("folder path") | |
print(folder_path) | |
# Check if folder exists | |
if not os.path.exists(folder_path): | |
return [] | |
print("folder exist") | |
# List and shuffle songs | |
songs = [f"../music/{genre_folder}/{song}" for song in os.listdir(folder_path) if song.endswith(".mp3")] | |
random.shuffle(songs) | |
return songs[:3] # Return top 3 shuffled songs | |
def softmax(scores): | |
exp_scores = [math.exp(score) for score in scores] | |
total = sum(exp_scores) | |
return [exp_score / total for exp_score in exp_scores] | |
# ๐น Broadcast User List | |
async def broadcast_user_list(): | |
user_list = list(clients.keys()) | |
message = json.dumps({ | |
"type": "user_list", | |
"users": user_list | |
}) | |
for client in clients.values(): | |
await client.send_text(message) | |
# ๐น Periodic Music Recommender every 30 seconds | |
async def periodic_recommendation(): | |
while True: | |
user_list = list(clients.keys()) | |
if len(user_list) >= 2: | |
await asyncio.sleep(10) | |
if clients: # Only run if someone is connected | |
if len(chat_history) > 0: | |
# 1. Detect emotion dan ambil (label, score) | |
print("chat history") | |
print(chat_history) | |
emotions = [detect_emotion(msg) for msg in chat_history] | |
print("Detected Emotions:", emotions) | |
# 2. Group by emotion + sum score | |
emotion_score_sum = defaultdict(float) | |
for label, score in emotions: | |
emotion_score_sum[label] += score | |
# 3. Softmax | |
labels = list(emotion_score_sum.keys()) | |
scores = list(emotion_score_sum.values()) | |
softmax_scores = softmax(scores) | |
# 4. Pair label + softmax_score | |
softmax_result = list(zip(labels, softmax_scores)) | |
print("Softmax Result:", softmax_result) | |
# 5. Dominant emotion | |
most_common_emotion = max(softmax_result, key=lambda x: x[1])[0] | |
print("Dominant Emotion:", most_common_emotion) | |
mood = emotion_to_mood.get(most_common_emotion, "chill") | |
music_recommendations = get_recommendations_by_mood(mood) | |
else: | |
music_recommendations = ["chill"] # default if no chat | |
recommendation_response = { | |
"recommendations": music_recommendations, | |
"genre": mood_to_genre.get(mood, "pop") | |
} | |
for client in clients.values(): | |
await client.send_text(json.dumps(recommendation_response)) | |
else: | |
await asyncio.sleep(2) | |
await broadcast_user_list() | |
# ๐น Start periodic task | |
async def start_recommender(): | |
asyncio.create_task(periodic_recommendation()) | |
# ๐น WebSocket Endpoint | |
async def chat_endpoint(websocket: WebSocket, username: str): | |
await websocket.accept() | |
clients[username] = websocket | |
print(f"{username} joined") | |
await broadcast_user_list() | |
try: | |
while True: | |
data = await websocket.receive_text() | |
message_data = json.loads(data) | |
chat_history.append(message_data["message"]) | |
response = { | |
"username": message_data["username"], | |
"message": message_data["message"] | |
} | |
# Broadcast message to all clients | |
for client in clients.values(): | |
await client.send_text(json.dumps(response)) | |
except Exception as e: | |
print(f"{username} disconnected: {e}") | |
del clients[username] | |
await broadcast_user_list() | |
def read_root(): | |
return FileResponse("../frontend/index.html") | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |