Spaces:
Sleeping
Sleeping
Commit
·
339c97b
1
Parent(s):
ca2bd00
Bug fix
Browse files
__pycache__/app.cpython-311.pyc
CHANGED
Binary files a/__pycache__/app.cpython-311.pyc and b/__pycache__/app.cpython-311.pyc differ
|
|
src/api/routes.py
CHANGED
@@ -74,45 +74,66 @@ def _execute_mongo_op_in_thread(op_spec: Dict, client: MongoClient, database_nam
|
|
74 |
Executes a single MongoDB operation.
|
75 |
This function is intended to be run in a separate thread.
|
76 |
"""
|
77 |
-
|
78 |
-
|
79 |
-
|
80 |
-
|
81 |
-
|
82 |
-
|
83 |
-
|
84 |
-
|
85 |
-
|
86 |
-
|
87 |
-
|
88 |
-
|
89 |
-
|
90 |
-
|
91 |
-
|
92 |
-
|
93 |
-
|
94 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
95 |
|
96 |
# Batch processing for MongoDB operations
|
97 |
async def batch_mongodb_operations(operations: List[Dict]) -> None:
|
98 |
"""Execute MongoDB operations in batches."""
|
99 |
batch_size = 50
|
100 |
try:
|
|
|
|
|
|
|
|
|
|
|
101 |
# Attempt to get DB name from the existing mongodb setup
|
102 |
-
|
103 |
-
|
104 |
-
|
105 |
-
|
106 |
-
|
107 |
-
|
108 |
-
|
|
|
|
|
|
|
|
|
109 |
|
110 |
for i in range(0, len(operations), batch_size):
|
111 |
batch = operations[i:i + batch_size]
|
112 |
-
|
113 |
-
asyncio.
|
114 |
-
|
115 |
-
|
|
|
|
|
|
|
|
|
116 |
|
117 |
#Getting recommendation using without user association
|
118 |
@router.get("/recommendations/", response_model=RecommendationResponse)
|
@@ -189,21 +210,26 @@ async def get_recommendations_by_id_endpoint(
|
|
189 |
# Add msid to the response
|
190 |
recommendations_data["msid"] = msid
|
191 |
|
192 |
-
# Store session in MongoDB asynchronously using batch processing
|
193 |
-
|
194 |
-
|
195 |
-
|
196 |
-
|
197 |
-
|
198 |
-
await batch_mongodb_operations([
|
199 |
-
{
|
200 |
-
"collection": "sessions",
|
201 |
-
"operation": "update_one",
|
202 |
-
"filter": {"user_id": user_id},
|
203 |
-
"update": {"$set": session_data},
|
204 |
-
"upsert": True
|
205 |
}
|
206 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
207 |
|
208 |
end_time = time.time()
|
209 |
logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms")
|
@@ -275,6 +301,11 @@ async def feedback_recommendation_endpoint(
|
|
275 |
|
276 |
# Store feedback in MongoDB
|
277 |
try:
|
|
|
|
|
|
|
|
|
|
|
278 |
# First, try to find if the user document exists
|
279 |
user_doc = await asyncio.get_event_loop().run_in_executor(
|
280 |
thread_pool,
|
@@ -376,29 +407,33 @@ async def get_recommendations_with_summary_endpoint(
|
|
376 |
articles_details_map = {}
|
377 |
|
378 |
if doc_ids_to_fetch and (include_summary or include_smart_tip):
|
379 |
-
|
380 |
-
|
381 |
-
|
382 |
-
|
383 |
-
|
384 |
-
|
385 |
-
|
386 |
-
|
387 |
-
|
388 |
-
|
389 |
-
|
390 |
-
|
391 |
-
|
392 |
-
|
393 |
-
|
394 |
-
|
395 |
-
|
396 |
-
|
397 |
-
|
398 |
-
|
399 |
-
|
400 |
-
article
|
401 |
-
|
|
|
|
|
|
|
|
|
402 |
|
403 |
# Process documents in parallel using asyncio.gather
|
404 |
async def process_doc_batch(docs_batch):
|
|
|
74 |
Executes a single MongoDB operation.
|
75 |
This function is intended to be run in a separate thread.
|
76 |
"""
|
77 |
+
try:
|
78 |
+
if client is None:
|
79 |
+
logger.warning("MongoDB client is None. Skipping operation.")
|
80 |
+
return
|
81 |
+
|
82 |
+
db = client[database_name]
|
83 |
+
collection = db[op_spec["collection"]]
|
84 |
+
operation_name = op_spec["operation"]
|
85 |
+
|
86 |
+
if operation_name == "update_one":
|
87 |
+
collection.update_one(
|
88 |
+
op_spec["filter"],
|
89 |
+
op_spec["update"],
|
90 |
+
upsert=op_spec.get("upsert", False),
|
91 |
+
array_filters=op_spec.get("array_filters")
|
92 |
+
)
|
93 |
+
elif operation_name == "insert_one":
|
94 |
+
# Ensure 'document' key exists for insert_one
|
95 |
+
collection.insert_one(op_spec["document"])
|
96 |
+
# Add other specific operations as needed (e.g., find_one, delete_one)
|
97 |
+
else:
|
98 |
+
logger.error(f"Unsupported MongoDB operation in batch: {operation_name}")
|
99 |
+
raise ValueError(f"Unsupported MongoDB operation: {operation_name}")
|
100 |
+
except Exception as e:
|
101 |
+
logger.error(f"Error executing MongoDB operation {op_spec.get('operation', 'unknown')}: {e}")
|
102 |
+
# Don't raise the exception to avoid failing the entire batch
|
103 |
|
104 |
# Batch processing for MongoDB operations
|
105 |
async def batch_mongodb_operations(operations: List[Dict]) -> None:
|
106 |
"""Execute MongoDB operations in batches."""
|
107 |
batch_size = 50
|
108 |
try:
|
109 |
+
# Check if MongoDB is available
|
110 |
+
if mongodb.db is None:
|
111 |
+
logger.warning("MongoDB not available. Skipping batch operations.")
|
112 |
+
return
|
113 |
+
|
114 |
# Attempt to get DB name from the existing mongodb setup
|
115 |
+
try:
|
116 |
+
db_name = mongodb.news_collection.database.name
|
117 |
+
except AttributeError:
|
118 |
+
logger.warning(
|
119 |
+
"Could not determine database name from mongodb.news_collection. "
|
120 |
+
"Using fallback 'recommender_db'. Please configure DB name properly."
|
121 |
+
)
|
122 |
+
db_name = "recommender_db" # FIXME: This should be configured via settings or a central DB config
|
123 |
+
except Exception as e:
|
124 |
+
logger.warning(f"Could not access MongoDB: {e}. Skipping batch operations.")
|
125 |
+
return
|
126 |
|
127 |
for i in range(0, len(operations), batch_size):
|
128 |
batch = operations[i:i + batch_size]
|
129 |
+
try:
|
130 |
+
await asyncio.gather(*[
|
131 |
+
asyncio.to_thread(_execute_mongo_op_in_thread, op, mongodb._client, db_name)
|
132 |
+
for op in batch
|
133 |
+
])
|
134 |
+
except Exception as e:
|
135 |
+
logger.error(f"Error executing batch MongoDB operations: {e}")
|
136 |
+
# Don't raise the exception to avoid failing the entire request
|
137 |
|
138 |
#Getting recommendation using without user association
|
139 |
@router.get("/recommendations/", response_model=RecommendationResponse)
|
|
|
210 |
# Add msid to the response
|
211 |
recommendations_data["msid"] = msid
|
212 |
|
213 |
+
# Store session in MongoDB asynchronously using batch processing (optional)
|
214 |
+
try:
|
215 |
+
session_data = {
|
216 |
+
"user_id": user_id,
|
217 |
+
"recommendations": recommendations_data.get("retrieved_documents", []),
|
218 |
+
"timestamp": datetime.now()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
219 |
}
|
220 |
+
await batch_mongodb_operations([
|
221 |
+
{
|
222 |
+
"collection": "sessions",
|
223 |
+
"operation": "update_one",
|
224 |
+
"filter": {"user_id": user_id},
|
225 |
+
"update": {"$set": session_data},
|
226 |
+
"upsert": True
|
227 |
+
}
|
228 |
+
])
|
229 |
+
logger.info(f"Session data saved to MongoDB for user {user_id}")
|
230 |
+
except Exception as mongo_error:
|
231 |
+
logger.warning(f"Could not save session data to MongoDB: {mongo_error}")
|
232 |
+
# Don't fail the request if MongoDB is unavailable
|
233 |
|
234 |
end_time = time.time()
|
235 |
logger.info(f"Recommendation generated in {(end_time - start_time)*1000:.2f}ms")
|
|
|
301 |
|
302 |
# Store feedback in MongoDB
|
303 |
try:
|
304 |
+
# Check if MongoDB is available
|
305 |
+
if mongodb.db is None or mongodb.news_collection is None:
|
306 |
+
logger.warning("MongoDB not available. Skipping feedback storage.")
|
307 |
+
return {"message": "Response processed successfully (feedback storage unavailable)"}
|
308 |
+
|
309 |
# First, try to find if the user document exists
|
310 |
user_doc = await asyncio.get_event_loop().run_in_executor(
|
311 |
thread_pool,
|
|
|
407 |
articles_details_map = {}
|
408 |
|
409 |
if doc_ids_to_fetch and (include_summary or include_smart_tip):
|
410 |
+
try:
|
411 |
+
projection = {"_id": 0, "id": 1}
|
412 |
+
if include_summary:
|
413 |
+
projection.update({"story": 1, "syn": 1}) # Add syn field as fallback
|
414 |
+
if include_smart_tip:
|
415 |
+
projection.update({"seolocation": 1, "tn": 1, "hl": 1})
|
416 |
+
|
417 |
+
# Use batch size of 50 for MongoDB queries
|
418 |
+
batch_size = 50
|
419 |
+
for i in range(0, len(doc_ids_to_fetch), batch_size):
|
420 |
+
batch_ids = doc_ids_to_fetch[i:i + batch_size]
|
421 |
+
fetched_articles_list = await asyncio.get_event_loop().run_in_executor(
|
422 |
+
thread_pool,
|
423 |
+
lambda: list(mongodb.news_collection.find(
|
424 |
+
{"id": {"$in": batch_ids}},
|
425 |
+
projection
|
426 |
+
))
|
427 |
+
)
|
428 |
+
for article in fetched_articles_list:
|
429 |
+
if article.get("id"):
|
430 |
+
# Use synopsis as fallback if story is not available
|
431 |
+
if include_summary and not article.get("story") and article.get("syn"):
|
432 |
+
article["story"] = article["syn"]
|
433 |
+
articles_details_map[article["id"]] = article
|
434 |
+
except Exception as e:
|
435 |
+
logger.warning(f"Could not fetch article details from MongoDB: {e}")
|
436 |
+
# Continue without article details - recommendations will still work
|
437 |
|
438 |
# Process documents in parallel using asyncio.gather
|
439 |
async def process_doc_batch(docs_batch):
|
src/core/__pycache__/recommender.cpython-311.pyc
CHANGED
Binary files a/src/core/__pycache__/recommender.cpython-311.pyc and b/src/core/__pycache__/recommender.cpython-311.pyc differ
|
|
src/database/__pycache__/mongodb.cpython-311.pyc
CHANGED
Binary files a/src/database/__pycache__/mongodb.cpython-311.pyc and b/src/database/__pycache__/mongodb.cpython-311.pyc differ
|
|