Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from typing import Dict, List | |
| app = FastAPI() | |
| # Allow CORS for development | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Store active connections and messages | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: Dict[str, WebSocket] = {} | |
| self.messages: List[dict] = [] | |
| self.users: Dict[str, dict] = {} | |
| async def connect(self, websocket: WebSocket, user_id: str, username: str, email: str): | |
| await websocket.accept() | |
| self.active_connections[user_id] = websocket | |
| self.users[user_id] = {"username": username, "email": email} | |
| await self.send_user_list() | |
| def disconnect(self, user_id: str): | |
| if user_id in self.active_connections: | |
| del self.active_connections[user_id] | |
| del self.users[user_id] | |
| async def send_personal_message(self, message: str, websocket: WebSocket): | |
| await websocket.send_text(message) | |
| async def broadcast(self, message: dict): | |
| for connection in self.active_connections.values(): | |
| await connection.send_json(message) | |
| async def send_user_list(self): | |
| user_list = { | |
| "type": "user_list", | |
| "data": list(self.users.values()) | |
| } | |
| await self.broadcast(user_list) | |
| manager = ConnectionManager() | |
| async def websocket_endpoint(websocket: WebSocket, user_id: str, username: str, email: str): | |
| await manager.connect(websocket, user_id, username, email) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| if data["type"] == "message": | |
| message = { | |
| "id": str(uuid.uuid4()), | |
| "sender": { | |
| "id": user_id, | |
| "username": username, | |
| "email": email | |
| }, | |
| "content": data["content"], | |
| "timestamp": data["timestamp"], | |
| "type": data["message_type"], | |
| "fileName": data.get("fileName"), | |
| "fileSize": data.get("fileSize") | |
| } | |
| manager.messages.append(message) | |
| await manager.broadcast({ | |
| "type": "new_message", | |
| "data": message | |
| }) | |
| except WebSocketDisconnect: | |
| manager.disconnect(user_id) | |
| await manager.send_user_list() | |
| async def get_messages(): | |
| return manager.messages | |
| async def get_users(): | |
| return list(manager.users.values()) | |
| # Serve frontend files | |
| app.mount("/", StaticFiles(directory="../frontend", html=True), name="frontend") | |
| if __name__ == "__main__": | |
| import uvicorn | |