Spaces:
Sleeping
Sleeping
import logging | |
import uvicorn | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, Field | |
from typing import List, Optional | |
from datetime import datetime | |
from src.core.recommender import recommender | |
from src.database.mongodb import mongodb | |
from src.config.settings import API_TITLE, API_DESCRIPTION, API_VERSION | |
logger = logging.getLogger(__name__) | |
# Configure basic logging if not already set up elsewhere | |
if not logger.hasHandlers(): | |
logging.basicConfig(level=logging.INFO) | |
# Create a new FastAPI app instance instead of importing from src.main | |
app = FastAPI( | |
title=API_TITLE, | |
description=API_DESCRIPTION, | |
version=API_VERSION | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Initialize recommender on startup | |
async def startup_event(): | |
"""Initialize recommender system on startup.""" | |
try: | |
logger.info("Initializing recommender system...") | |
recommender.load_components() | |
app.state.recommender = recommender | |
logger.info("Recommender system initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize recommender system: {e}", exc_info=True) | |
# Don't raise here to allow the app to start even if recommender fails | |
async def shutdown_event(): | |
"""Cleanup on shutdown.""" | |
try: | |
mongodb.close() | |
logger.info("MongoDB connection closed") | |
except Exception as e: | |
logger.error(f"Error during shutdown: {e}", exc_info=True) | |
# Pydantic models for request/response bodies | |
class FeedbackPayload(BaseModel): | |
user_id: str | |
msid: str | |
clicked_msid: str # Comma-separated string of MSIDs | |
k: int = Field(default=5, ge=1, le=10) | |
class FeedbackResponse(BaseModel): | |
message: str | |
# API Endpoints | |
async def health_check(): | |
""" | |
Health check endpoint to diagnose system status. | |
""" | |
health_status = { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"components": {} | |
} | |
# Check recommender system | |
if hasattr(app.state, "recommender") and app.state.recommender is not None: | |
health_status["components"]["recommender"] = { | |
"status": "available", | |
"models_loaded": { | |
"embed_model": app.state.recommender.embed_model is not None, | |
"reranker": app.state.recommender.reranker is not None, | |
"generator": app.state.recommender.generator is not None | |
}, | |
"data_available": app.state.recommender.df is not None and not app.state.recommender.df.empty, | |
"faiss_index_available": app.state.recommender.index is not None, | |
"faiss_vectors": app.state.recommender.index.ntotal if app.state.recommender.index else 0 | |
} | |
else: | |
health_status["components"]["recommender"] = {"status": "not_available"} | |
health_status["status"] = "degraded" | |
# Check MongoDB connection | |
try: | |
if mongodb.db is not None: | |
# Try a simple operation to test connection | |
mongodb.db.command("ping") | |
health_status["components"]["mongodb"] = {"status": "connected"} | |
else: | |
health_status["components"]["mongodb"] = {"status": "not_connected"} | |
health_status["status"] = "degraded" | |
except Exception as e: | |
health_status["components"]["mongodb"] = { | |
"status": "error", | |
"error": str(e) | |
} | |
health_status["status"] = "degraded" | |
return health_status | |
async def get_recommendations_api(query: str, k: int = 5): | |
""" | |
Get recommendations based on a textual query. | |
""" | |
try: | |
if not hasattr(app.state, "recommender") or app.state.recommender is None: | |
logger.error("Recommender is not available.") | |
raise HTTPException(status_code=503, detail="Recommender service not available") | |
response = app.state.recommender.get_recommendations(query, k) | |
return response | |
except Exception as e: | |
logger.error(f"API Error in get_recommendations_api for query '{query}': {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
async def get_recommendations_by_id_api(msid: str, k: int = 5): | |
""" | |
Get recommendations based on a given MSID. | |
""" | |
try: | |
# Validate input parameters | |
if not msid or not isinstance(msid, str): | |
raise HTTPException(status_code=400, detail="Invalid MSID provided") | |
if not isinstance(k, int) or k < 1 or k > 10: | |
raise HTTPException(status_code=400, detail="k must be an integer between 1 and 10") | |
# Check if recommender service is available | |
if not hasattr(app.state, "recommender") or app.state.recommender is None: | |
logger.error("Recommender is not available.") | |
raise HTTPException(status_code=503, detail="Recommender service not available") | |
# Check if recommender has the necessary data | |
if app.state.recommender.df is None or app.state.recommender.df.empty: | |
logger.error("Recommender data not available (MongoDB connection issue).") | |
raise HTTPException( | |
status_code=503, | |
detail="Recommender data not available. The service is currently unable to access the required data." | |
) | |
# Get recommendations with error handling | |
try: | |
response = app.state.recommender.get_recommendations_by_id(msid, k) | |
if not response: | |
raise HTTPException(status_code=404, detail=f"No recommendations found for MSID: {msid}") | |
return response | |
except ValueError as ve: | |
logger.error(f"Value error in get_recommendations_by_id for msid '{msid}': {ve}") | |
raise HTTPException(status_code=400, detail=str(ve)) | |
except Exception as e: | |
logger.error(f"Error getting recommendations for msid '{msid}': {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="Internal server error while getting recommendations") | |
except HTTPException as he: | |
raise he | |
except Exception as e: | |
logger.error(f"Unexpected error in get_recommendations_by_id_api for msid '{msid}': {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="An unexpected error occurred") | |
async def submit_user_feedback_api(payload: FeedbackPayload): | |
""" | |
Submit user feedback (e.g., clicked articles) and save it. | |
Optionally, this endpoint can also trigger re-computation of recommendations based on feedback, | |
though the primary response here is the status of feedback submission. | |
""" | |
try: | |
if not hasattr(app.state, "recommender") or app.state.recommender is None: | |
logger.error("Recommender is not available.") | |
raise HTTPException(status_code=503, detail="Recommender service not available") | |
# (Optional) Compute recommendations based on feedback, similar to Gradio function. | |
# The result of this call is not the primary output of this API endpoint. | |
try: | |
_ = app.state.recommender.get_recommendations_user_feedback( | |
payload.user_id, payload.msid, payload.clicked_msid, payload.k | |
) | |
logger.info(f"API: (Computed recommendations for user '{payload.user_id}' based on click, not part of this response)") | |
except Exception as e: | |
logger.warning(f"Could not compute recommendations based on feedback: {e}") | |
# Save feedback to MongoDB (optional - only if MongoDB is available) | |
try: | |
actual_clicked_msids = [s.strip() for s in payload.clicked_msid.split(',') if s.strip()] | |
if not actual_clicked_msids: | |
logger.warning(f"API: Invalid clicked_msid: '{payload.clicked_msid}' for user '{payload.user_id}'") | |
raise HTTPException(status_code=400, detail="clicked_msid parameter is invalid or does not contain valid MSIDs.") | |
logger.info( | |
f"API: Saving feedback for user '{payload.user_id}', context msid: '{payload.msid}', clicked msids: {actual_clicked_msids}" | |
) | |
feedback_collection_name = "user_feedback_tracking" | |
# Check if MongoDB is available | |
if mongodb.db is None: | |
logger.warning("MongoDB database connection is not available. Skipping feedback storage.") | |
return FeedbackResponse(message="Response processed successfully (feedback storage unavailable)") | |
feedback_collection = mongodb.db[feedback_collection_name] | |
user_doc = feedback_collection.find_one({"user_id": payload.user_id}) | |
if user_doc: | |
feedback_collection.update_one( | |
{"user_id": payload.user_id}, | |
{"$addToSet": {"Articles": {"msid": payload.msid, "Read": actual_clicked_msids}}} | |
) | |
else: | |
feedback_collection.insert_one({ | |
"user_id": payload.user_id, | |
"Articles": [{"msid": payload.msid, "Read": actual_clicked_msids}] | |
}) | |
logger.info(f"API: Successfully saved feedback for user '{payload.user_id}'") | |
return FeedbackResponse(message="Response saved successfully") | |
except Exception as e: | |
logger.error(f"Error saving feedback to MongoDB: {e}", exc_info=True) | |
# Don't fail the entire request if MongoDB is unavailable | |
return FeedbackResponse(message="Response processed successfully (feedback storage failed)") | |
except HTTPException: | |
raise # Re-raise HTTPException directly | |
except Exception as e: | |
logger.error(f"API Error in submit_user_feedback_api for user '{payload.user_id}': {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
async def get_recommendations_summary_api(msid: str, k: int = 5, summary: bool = True, smart_tip: bool = True): | |
""" | |
Get recommendations with optional summary and smart tip for a given MSID. | |
""" | |
try: | |
if not hasattr(app.state, "recommender") or app.state.recommender is None: | |
logger.error("Recommender is not available.") | |
raise HTTPException(status_code=503, detail="Recommender service not available") | |
try: | |
response = app.state.recommender.get_recommendations_summary(msid, k, summary, smart_tip) | |
except RuntimeError as e: | |
# Catch the meta tensor error and return a fallback | |
if "meta tensor" in str(e): | |
logger.error("Summary model error: %s", e) | |
response = { | |
"msid": msid, | |
"recommendations": [], | |
"summary": [], | |
"smart_tip": [], | |
"error": "Summary model is not available on this server." | |
} | |
else: | |
raise | |
return response | |
except Exception as e: | |
logger.error(f"API Error in get_recommendations_summary_api for msid '{msid}': {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
if __name__ == "__main__": | |
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |