import asyncio from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import uvicorn import json from transformers import pipeline from collections import deque from collections import defaultdict import math import sys import random import os sys.path.append(".") app = FastAPI() app.mount("/static", StaticFiles(directory="../frontend"), name="static") # 🔹 Load Emotion Detection Model emotion_classifier = pipeline( "zero-shot-classification", model="MarfinF/marfin_emotion", framework="pt" ) # 🔹 Emotion-to-Mood Mapping emotion_to_mood = { "senang": "happy", "sedih": "sad", "marah": "excited", "takut": "relaxed", "cinta": "romantic" } # 🔹 WebSocket Clients clients = {} chat_history = deque(maxlen=4) mood_to_genre = { "happy": "pop", "sad": "acoustic", "excited": "rock", "intense": "cinematic", "romantic": "rnb", "chill": "chill" } # 🔹 Detect Emotion def detect_emotion(text): labels = ["takut", "marah", "sedih", "senang", "cinta"] result = emotion_classifier(text, candidate_labels=labels) top_emotion = result['labels'][0] top_scores = result['scores'][0] return top_emotion, top_scores # 🔹 Get Music Recommendations def get_recommendations_by_mood(mood): genre_folder = mood_to_genre.get(mood, "pop") folder_path = f"../music/{genre_folder}" print("folder path") print(folder_path) # Check if folder exists if not os.path.exists(folder_path): return [] print("folder exist") # List and shuffle songs songs = [f"../music/{genre_folder}/{song}" for song in os.listdir(folder_path) if song.endswith(".mp3")] random.shuffle(songs) return songs[:3] # Return top 3 shuffled songs def softmax(scores): exp_scores = [math.exp(score) for score in scores] total = sum(exp_scores) return [exp_score / total for exp_score in exp_scores] # 🔹 Broadcast User List async def broadcast_user_list(): user_list = list(clients.keys()) message = json.dumps({ "type": "user_list", "users": user_list }) for client in clients.values(): await client.send_text(message) # 🔹 Periodic Music Recommender every 30 seconds async def periodic_recommendation(): while True: user_list = list(clients.keys()) if len(user_list) >= 2: await asyncio.sleep(10) if clients: # Only run if someone is connected if len(chat_history) > 0: # 1. Detect emotion dan ambil (label, score) print("chat history") print(chat_history) emotions = [detect_emotion(msg) for msg in chat_history] print("Detected Emotions:", emotions) # 2. Group by emotion + sum score emotion_score_sum = defaultdict(float) for label, score in emotions: emotion_score_sum[label] += score # 3. Softmax labels = list(emotion_score_sum.keys()) scores = list(emotion_score_sum.values()) softmax_scores = softmax(scores) # 4. Pair label + softmax_score softmax_result = list(zip(labels, softmax_scores)) print("Softmax Result:", softmax_result) # 5. Dominant emotion most_common_emotion = max(softmax_result, key=lambda x: x[1])[0] print("Dominant Emotion:", most_common_emotion) mood = emotion_to_mood.get(most_common_emotion, "chill") music_recommendations = get_recommendations_by_mood(mood) else: music_recommendations = ["chill"] # default if no chat recommendation_response = { "recommendations": music_recommendations, "genre": mood_to_genre.get(mood, "pop") } for client in clients.values(): await client.send_text(json.dumps(recommendation_response)) else: await asyncio.sleep(2) await broadcast_user_list() # 🔹 Start periodic task @app.on_event("startup") async def start_recommender(): asyncio.create_task(periodic_recommendation()) # 🔹 WebSocket Endpoint @app.websocket("/chat/{username}") async def chat_endpoint(websocket: WebSocket, username: str): await websocket.accept() clients[username] = websocket print(f"{username} joined") await broadcast_user_list() try: while True: data = await websocket.receive_text() message_data = json.loads(data) chat_history.append(message_data["message"]) response = { "username": message_data["username"], "message": message_data["message"] } # Broadcast message to all clients for client in clients.values(): await client.send_text(json.dumps(response)) except Exception as e: print(f"{username} disconnected: {e}") del clients[username] await broadcast_user_list() @app.get("/") def read_root(): return FileResponse("../frontend/index.html") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)