entidi2608 commited on
Commit
97c3215
·
1 Parent(s): 25e6e74

update: retriever

Browse files
Files changed (1) hide show
  1. services/document_service.py +142 -76
services/document_service.py CHANGED
@@ -2,25 +2,25 @@ from pathlib import Path
2
  from llama_parse import LlamaParse
3
  import docx
4
  import pypandoc
5
- import shutil
 
 
6
  import os
7
  import logging
8
  from langchain_core.documents import Document
9
  import config
10
  from db.weaviateDB import connect_to_weaviate
11
- import utils.utils as utils
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
15
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
16
 
17
  class ApiKeyManager:
18
- """
19
- Quản lý một danh sách các API key.
20
- Cung cấp key tiếp theo trong danh sách và cho phép xoay vòng.
21
- """
22
  def __init__(self, api_key_string: str):
23
- # Tách chuỗi thành danh sách các key, loại bỏ khoảng trắng thừa
24
  self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
25
  if not self.keys:
26
  raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
@@ -39,105 +39,171 @@ class ApiKeyManager:
39
  logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
40
  return self.get_key()
41
 
42
- # Khởi tạo một instance của Key Manager với chuỗi key từ config
43
- # Đặt ở cấp module để duy trì trạng thái qua các lần gọi hàm
44
- llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
 
45
 
 
46
 
47
- def convert_to_text_content(source_path: str) -> str:
48
- source_file = Path(source_path)
49
- file_extension = source_file.suffix.lower()
50
- logger.info(f"Đang trích xuất nội dung từ: {source_file.name}")
 
51
  content = ""
52
 
53
- if file_extension == ".pdf":
54
- # Bắt đầu vòng lặp để thử các API key
55
- while (current_key := llama_key_manager.get_key()) is not None:
56
- try:
57
- logger.info(f"Đang thử chuyển đổi PDF bằng key index: {llama_key_manager.current_key_index}...")
58
- parser = LlamaParse(
59
- api_key=current_key,
60
- result_type="text",
61
- verbose=True,
62
- language="vi"
63
- )
64
- # parser.load_data có thể ném ra lỗi nếu key không hợp lệ/hết hạn
65
- documents = parser.load_data([str(source_file)])
66
-
67
- if documents and documents[0].text.strip():
68
- content = documents[0].text
69
- logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
70
- # Nếu thành công, thoát khỏi vòng lặp
71
- break
72
- else:
73
- # Trường hợp hiếm: không có lỗi nhưng nội dung rỗng
74
- raise ValueError("LlamaParse trả về nội dung rỗng.")
75
-
76
- except Exception as e:
77
- logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index}: {e}")
78
- # Nếu có lỗi, thử key tiếp theo trong lần lặp tới
79
- if llama_key_manager.get_next_key() is None:
80
- # Nếu đã hết key để thử
81
- logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại.")
82
- raise Exception("Không thể chuyển đổi file PDF sau khi đã thử tất cả các API key.") from e
83
- # Nếu còn key, vòng lặp while sẽ tiếp tục với key mới
84
-
85
- # Sau vòng lặp, nếu không có content, nghĩa là đã có lỗi nghiêm trọng
86
- if not content:
87
- raise ValueError("Không thể trích xuất nội dung PDF sau khi thử các key.")
88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  elif file_extension == ".docx":
91
- doc = docx.Document(source_path)
 
92
  content = '\n'.join([para.text for para in doc.paragraphs])
 
93
  elif file_extension == ".doc":
94
- content = pypandoc.convert_file(source_path, 'plain', format='doc')
 
 
 
 
 
 
 
 
 
 
 
95
  else:
96
  raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
97
 
98
  if not content.strip():
99
- # Lỗi này chỉ nên xảy ra với docx/doc hoặc nếu LlamaParse thất bại một cách thầm lặng
100
- raise ValueError("Nội dung trích xuất bị rỗng.")
101
 
102
- logger.info(f"✅ Trích xuất nội dung thành công từ {source_file.name}.")
103
  return content
104
 
105
- def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_model):
106
- filename = os.path.basename(filepath)
107
- logger.info(f"BACKGROUND TASK: Starting full pipeline for: {filename} (Hash: {file_hash[:10]}...)")
 
 
 
 
 
108
  weaviate_client = None
109
  try:
110
- raw_content = convert_to_text_content(filepath)
111
-
112
- doc_metadata = extract_document_metadata(raw_content, filename)
 
 
113
  doc_metadata["source"] = filename
114
- cleaned_content = clean_document_text(raw_content)
115
- doc_metadata["field"] = infer_field(cleaned_content, doc_metadata.get("ten_van_ban"))
116
- doc_metadata["entity_type"] = infer_entity_type(cleaned_content, doc_metadata.get("field", ""))
 
117
 
118
  doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
119
- chunks_from_file = hierarchical_split_law_document(doc_to_split)
120
 
121
  if not chunks_from_file:
122
  raise ValueError("File did not yield any chunks after processing.")
123
 
124
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
125
 
126
- weaviate_client = connect_to_weaviate()
127
- embeddings_model = embedding_model
128
- collection_name = config.WEAVIATE_COLLECTION_NAME
129
- create_weaviate_schema_if_not_exists(weaviate_client, collection_name)
 
 
 
 
 
130
 
131
- ingest_chunks_with_native_batching(weaviate_client, collection_name, processed_chunks, embeddings_model)
 
 
132
 
133
- utils.log_processed_hash(file_hash)
134
- logger.info(f"✅ Successfully ingested '{filename}'.")
135
- shutil.move(filepath, os.path.join(config.PROCESSED_FILES_FOLDER, filename))
136
- logger.info(f"Moved '{filename}' to processed folder.")
137
  except Exception as e:
138
  logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
139
- shutil.move(filepath, os.path.join(config.FAILED_FILES_FOLDER, filename))
140
- logger.info(f"Moved '{filename}' to failed folder.")
141
  finally:
142
  if weaviate_client and weaviate_client.is_connected():
143
- weaviate_client.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from llama_parse import LlamaParse
3
  import docx
4
  import pypandoc
5
+ from datetime import datetime, timezone
6
+ from rag_components import get_huggingface_embeddings
7
+ from io import BytesIO
8
  import os
9
  import logging
10
  from langchain_core.documents import Document
11
  import config
12
  from db.weaviateDB import connect_to_weaviate
13
+ from db.mongoDB import mongo_db
14
+
15
+ from fastapi.concurrency import run_in_threadpool
16
  logger = logging.getLogger(__name__)
17
 
18
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
19
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
20
 
21
  class ApiKeyManager:
22
+ """Quản lý một danh sách các API key."""
 
 
 
23
  def __init__(self, api_key_string: str):
 
24
  self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
25
  if not self.keys:
26
  raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
 
39
  logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
40
  return self.get_key()
41
 
42
+ def reset(self):
43
+ """Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới."""
44
+ self.current_key_index = 0
45
+ logger.info("Key Manager đã được reset.")
46
 
47
+ llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
48
 
49
+ # --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
50
+ def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
51
+ """Trích xuất nội dung text từ một stream trong bộ nhớ."""
52
+ file_extension = Path(original_filename).suffix.lower()
53
+ logger.info(f"Extracting content from: {original_filename}")
54
  content = ""
55
 
56
+ source_stream.seek(0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+ if file_extension == ".pdf":
59
+ # Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN
60
+ # và tái sử dụng đường dẫn này trong vòng lặp thử key.
61
+
62
+ # Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song
63
+
64
+ temp_pdf_path = f"/tmp/{original_filename}"
65
+
66
+ try:
67
+ with open(temp_pdf_path, "wb") as f:
68
+ f.write(source_stream.getvalue())
69
+
70
+ # Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên
71
+ llama_key_manager.reset()
72
+
73
+ # Bắt đầu vòng lặp để thử các API key
74
+ while (current_key := llama_key_manager.get_key()) is not None:
75
+ try:
76
+ logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...")
77
+ parser = LlamaParse(
78
+ api_key=current_key,
79
+ result_type="text",
80
+ verbose=True, # Giữ để debug
81
+ language="vi"
82
+ )
83
+ # Sử dụng đường dẫn file tạm đã tạo
84
+ documents = parser.load_data([temp_pdf_path])
85
+
86
+ if documents and documents[0].text.strip():
87
+ content = documents[0].text
88
+ logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
89
+ break # Thành công, thoát khỏi vòng lặp
90
+ else:
91
+ raise ValueError("LlamaParse trả về nội dung rỗng.")
92
+
93
+ except Exception as e:
94
+ logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}")
95
+ if llama_key_manager.get_next_key() is None:
96
+ logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.")
97
+ raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e
98
+
99
+ if not content:
100
+ raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.")
101
+
102
+ finally:
103
+ # Luôn dọn dẹp file tạm, dù thành công hay thất bại
104
+ if os.path.exists(temp_pdf_path):
105
+ os.remove(temp_pdf_path)
106
+ logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}")
107
 
108
  elif file_extension == ".docx":
109
+ # docx có thể đọc trực tiếp từ stream
110
+ doc = docx.Document(source_stream)
111
  content = '\n'.join([para.text for para in doc.paragraphs])
112
+
113
  elif file_extension == ".doc":
114
+ # pypandoc cần file trên đĩa
115
+
116
+ temp_doc_path = f"/tmp/{original_filename}"
117
+ try:
118
+ with open(temp_doc_path, "wb") as f:
119
+ f.write(source_stream.getvalue())
120
+ content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc')
121
+ finally:
122
+ if os.path.exists(temp_doc_path):
123
+ os.remove(temp_doc_path)
124
+ logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}")
125
+
126
  else:
127
  raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
128
 
129
  if not content.strip():
130
+ raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.")
 
131
 
132
+ logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.")
133
  return content
134
 
135
+
136
+
137
+
138
+ async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str):
139
+ """
140
+ Pipeline xử lý nền đã được tối ưu hoàn toàn.
141
+ """
142
+ logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)")
143
  weaviate_client = None
144
  try:
145
+ embeddings_model = get_huggingface_embeddings(
146
+ config.EMBEDDING_MODEL_NAME
147
+ )
148
+ # Giai đoạn 1: Xử lý NLP (CPU-bound)
149
+ doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename)
150
  doc_metadata["source"] = filename
151
+
152
+ cleaned_content = await run_in_threadpool(clean_document_text, raw_content)
153
+ doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban"))
154
+ doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", ""))
155
 
156
  doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
157
+ chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split)
158
 
159
  if not chunks_from_file:
160
  raise ValueError("File did not yield any chunks after processing.")
161
 
162
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
163
 
164
+ # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
165
+ weaviate_client = connect_to_weaviate(run_diagnostics=False)
166
+
167
+
168
+ await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
169
+ await run_in_threadpool(
170
+ ingest_chunks_with_native_batching,
171
+ weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model
172
+ )
173
 
174
+ # Giai đoạn 3: Ghi log thành công
175
+ await log_processed_hash(file_hash, filename)
176
+ logger.info(f"✅✅✅ All tasks completed for '{filename}'.")
177
 
 
 
 
 
178
  except Exception as e:
179
  logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
180
+ await log_failed_process(file_hash, filename, str(e))
 
181
  finally:
182
  if weaviate_client and weaviate_client.is_connected():
183
+ weaviate_client.close()
184
+
185
+
186
+ async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None):
187
+ """Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB."""
188
+ try:
189
+ record = {
190
+ "file_hash": file_hash,
191
+ "original_filename": filename,
192
+ "processed_at": datetime.now(timezone.utc),
193
+ "status": status,
194
+ }
195
+ if error_message:
196
+ record["error_message"] = error_message
197
+
198
+ await mongo_db.processed_documents.insert_one(record)
199
+ except Exception as e:
200
+ logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}")
201
+
202
+ # Wrapper cho việc log lỗi
203
+ async def log_failed_process(file_hash: str, filename: str, error_message: str):
204
+ await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message)
205
+
206
+ # Hàm kiểm tra trùng lặp
207
+ async def check_if_hash_exists(file_hash: str) -> bool:
208
+ count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
209
+ return count > 0