christopher commited on
Commit
c708265
·
1 Parent(s): e113735

removed async

Browse files
Files changed (1) hide show
  1. database/query_processor.py +14 -37
database/query_processor.py CHANGED
@@ -4,8 +4,6 @@ import numpy as np
4
  from models.LexRank import degree_centrality_scores
5
  import logging
6
  from datetime import datetime as dt
7
- import asyncio
8
- from concurrent.futures import ThreadPoolExecutor
9
 
10
  logger = logging.getLogger(__name__)
11
 
@@ -15,7 +13,6 @@ class QueryProcessor:
15
  self.summarization_model = summarization_model
16
  self.nlp_model = nlp_model
17
  self.db_service = db_service
18
- self.executor = ThreadPoolExecutor(max_workers=4) # For CPU-bound tasks
19
  logger.info("QueryProcessor initialized")
20
 
21
  async def process(
@@ -26,33 +23,33 @@ class QueryProcessor:
26
  end_date: Optional[str] = None
27
  ) -> Dict[str, Any]:
28
  try:
29
- # Date handling (sync but fast)
30
  start_dt = self._parse_date(start_date) if start_date else None
31
  end_dt = self._parse_date(end_date) if end_date else None
32
 
33
- # Async query processing
34
- query_embedding = await self._async_encode(query)
35
  logger.debug(f"Generated embedding for query: {query[:50]}...")
36
 
37
- # Entity extraction (sync but fast)
38
- entities = await asyncio.to_thread(self.nlp_model.extract_entities, query)
39
  logger.debug(f"Extracted entities: {entities}")
40
 
41
- # Async database search
42
  articles = await self._execute_semantic_search(
43
  query_embedding,
44
  start_dt,
45
  end_dt,
46
  topic,
47
- [ent[0] for ent in entities]
48
  )
49
 
50
  if not articles:
51
  logger.info("No articles found matching criteria")
52
  return {"message": "No articles found", "articles": []}
53
 
54
- # Async summary generation
55
- summary_data = await self._async_generate_summary(articles)
56
 
57
  return {
58
  "summary": summary_data["summary"],
@@ -94,35 +91,14 @@ class QueryProcessor:
94
  logger.error(f"Semantic search failed: {str(e)}")
95
  raise
96
 
97
- async def _async_encode(self, text: str) -> List[float]:
98
- """Run embedding in thread pool"""
99
- loop = asyncio.get_running_loop()
100
- return await loop.run_in_executor(
101
- self.executor,
102
- lambda: self.embedding_model.encode(text).tolist()
103
- )
104
-
105
- async def _async_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
106
- """Run summary generation in thread pool"""
107
- loop = asyncio.get_running_loop()
108
- return await loop.run_in_executor(
109
- self.executor,
110
- lambda: self._sync_generate_summary(articles)
111
- )
112
-
113
- def _sync_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
114
- """Synchronous version for thread pool execution"""
115
  try:
116
  # Extract and process content
117
  sentences = []
118
  for article in articles:
119
  if content := article.get("content"):
120
- sentences.extend(
121
- asyncio.run_coroutine_threadsafe(
122
- asyncio.to_thread(self.nlp_model.tokenize_sentences, content),
123
- loop=asyncio.get_event_loop()
124
- ).result()
125
- )
126
 
127
  if not sentences:
128
  logger.warning("No sentences available for summarization")
@@ -131,11 +107,12 @@ class QueryProcessor:
131
  "key_sentences": []
132
  }
133
 
134
- # CPU-intensive operations
135
  embeddings = self.embedding_model.encode(sentences)
136
  similarity_matrix = np.inner(embeddings, embeddings)
137
  centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
138
 
 
139
  top_indices = np.argsort(-centrality_scores)[:10]
140
  key_sentences = [sentences[idx].strip() for idx in top_indices]
141
 
 
4
  from models.LexRank import degree_centrality_scores
5
  import logging
6
  from datetime import datetime as dt
 
 
7
 
8
  logger = logging.getLogger(__name__)
9
 
 
13
  self.summarization_model = summarization_model
14
  self.nlp_model = nlp_model
15
  self.db_service = db_service
 
16
  logger.info("QueryProcessor initialized")
17
 
18
  async def process(
 
23
  end_date: Optional[str] = None
24
  ) -> Dict[str, Any]:
25
  try:
26
+ # Date handling
27
  start_dt = self._parse_date(start_date) if start_date else None
28
  end_dt = self._parse_date(end_date) if end_date else None
29
 
30
+ # Query processing
31
+ query_embedding = self.embedding_model.encode(query).tolist()
32
  logger.debug(f"Generated embedding for query: {query[:50]}...")
33
 
34
+ # Entity extraction
35
+ entities = self.nlp_model.extract_entities(query)
36
  logger.debug(f"Extracted entities: {entities}")
37
 
38
+ # Database search
39
  articles = await self._execute_semantic_search(
40
  query_embedding,
41
  start_dt,
42
  end_dt,
43
  topic,
44
+ [ent[0] for ent in entities] # Just the entity texts
45
  )
46
 
47
  if not articles:
48
  logger.info("No articles found matching criteria")
49
  return {"message": "No articles found", "articles": []}
50
 
51
+ # Summary generation
52
+ summary_data = self._generate_summary(articles)
53
 
54
  return {
55
  "summary": summary_data["summary"],
 
91
  logger.error(f"Semantic search failed: {str(e)}")
92
  raise
93
 
94
+ def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
95
+ """Generate summary from articles with fallback handling"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  try:
97
  # Extract and process content
98
  sentences = []
99
  for article in articles:
100
  if content := article.get("content"):
101
+ sentences.extend(self.nlp_model.tokenize_sentences(content))
 
 
 
 
 
102
 
103
  if not sentences:
104
  logger.warning("No sentences available for summarization")
 
107
  "key_sentences": []
108
  }
109
 
110
+ # Generate summary
111
  embeddings = self.embedding_model.encode(sentences)
112
  similarity_matrix = np.inner(embeddings, embeddings)
113
  centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
114
 
115
+ # Get top 10 most central sentences
116
  top_indices = np.argsort(-centrality_scores)[:10]
117
  key_sentences = [sentences[idx].strip() for idx in top_indices]
118