chatbot_demo / retrieval.py
deddoggo's picture
update full version
c69c2f9
raw
history blame
36.7 kB
# retrieval.py
import json
import re
import numpy as np
import faiss # Thư viện này cần được cài đặt (faiss-cpu hoặc faiss-gpu)
from collections import defaultdict
from typing import List, Dict, Any, Tuple, Optional, Callable
# --- 1. CÁC HẰNG SỐ VÀ MAP CHO LOẠI PHƯƠNG TIỆN ---
VEHICLE_TYPE_MAP: Dict[str, List[str]] = {
"xe máy": ["xe máy", "xe mô tô", "xe gắn máy", "xe máy điện", "mô tô hai bánh", "mô tô ba bánh"],
"ô tô": ["xe ô tô", "ô tô con", "ô tô tải", "ô tô khách", "xe con", "xe tải", "xe khách", "ô tô điện"],
"xe cơ giới": ["xe cơ giới"], # Loại chung hơn
"xe thô sơ": ["xe thô sơ", "xe đạp", "xích lô", "xe đạp điện"], # Thêm xe đạp điện vào xe thô sơ
"người đi bộ": ["người đi bộ"],
# Thêm các loại khác nếu cần, ví dụ "xe chuyên dùng" nếu muốn tách riêng
}
# --- 2. HÀM TIỆN ÍCH ---
def get_standardized_vehicle_type(text_input: Optional[str]) -> Optional[str]:
"""
Suy luận và chuẩn hóa loại phương tiện từ một chuỗi text.
Ưu tiên các loại cụ thể trước, sau đó đến các loại chung hơn.
"""
if not text_input or not isinstance(text_input, str):
return None
text_lower = text_input.lower()
# Ưu tiên kiểm tra "xe máy" và các biến thể trước "xe cơ giới"
# Cẩn thận với "xe máy chuyên dùng"
is_moto = any(re.search(r'\b' + re.escape(kw) + r'\b', text_lower) for kw in VEHICLE_TYPE_MAP["xe máy"])
if is_moto:
# Tránh trường hợp "xe máy chuyên dùng" bị tính là "xe máy"
# nếu "xe máy chuyên dùng" là một category riêng hoặc cần xử lý đặc biệt.
# Hiện tại, nếu có "chuyên dùng" và "xe máy", nó vẫn sẽ là "xe máy" nếu không có category "xe máy chuyên dùng".
if "chuyên dùng" in text_lower and "xe máy chuyên dùng" not in text_lower: # Logic này có thể cần review tùy theo định nghĩa
# Nếu bạn có category "xe máy chuyên dùng" trong VEHICLE_TYPE_MAP, nó sẽ được xử lý ở vòng lặp sau.
# Nếu không, "xe máy chuyên dùng" vẫn có thể bị coi là "xe máy".
pass # Để nó rơi xuống các kiểm tra khác nếu cần
else:
return "xe máy"
# Kiểm tra "ô tô" và các biến thể
is_car = any(re.search(r'\b' + re.escape(kw) + r'\b', text_lower) for kw in VEHICLE_TYPE_MAP["ô tô"])
if is_car:
return "ô tô"
# Kiểm tra các loại chung hơn hoặc khác
# Thứ tự này quan trọng nếu có sự chồng chéo (ví dụ: "xe cơ giới" bao gồm "ô tô", "xe máy")
# Đã xử lý ô tô, xe máy ở trên nên thứ tự ở đây ít quan trọng hơn giữa các loại còn lại.
for standard_type, keywords in VEHICLE_TYPE_MAP.items():
if standard_type in ["xe máy", "ô tô"]: # Đã xử lý ở trên
continue
if any(re.search(r'\b' + re.escape(kw) + r'\b', text_lower) for kw in keywords):
return standard_type
return None # Trả về None nếu không khớp rõ ràng
def tokenize_vi_for_bm25_setup(text: str) -> List[str]:
"""
Tokenize tiếng Việt đơn giản cho BM25: lowercase, loại bỏ dấu câu, split theo khoảng trắng.
"""
text = text.lower()
text = re.sub(r'[^\w\s]', '', text) # Loại bỏ các ký tự không phải chữ, số, hoặc khoảng trắng
return text.split()
# --- 3. HÀM XỬ LÝ DỮ LIỆU LUẬT TỪ JSON SANG CHUNKS ---
def process_law_data_to_chunks(structured_data_input: Any) -> List[Dict[str, Any]]:
"""
Xử lý dữ liệu luật có cấu trúc (từ JSON) thành một danh sách phẳng các "chunks".
Mỗi chunk bao gồm "text" và "metadata".
"""
flat_list: List[Dict[str, Any]] = []
if isinstance(structured_data_input, dict) and "article" in structured_data_input:
articles_list: List[Dict[str, Any]] = [structured_data_input]
elif isinstance(structured_data_input, list):
articles_list = structured_data_input
else:
print("Lỗi: Dữ liệu đầu vào không phải là danh sách các Điều luật hoặc một đối tượng Điều luật.")
return flat_list
for article_data in articles_list:
if not isinstance(article_data, dict):
# print(f"Cảnh báo: Bỏ qua mục không phải dict trong articles_list: {article_data}")
continue
raw_article_title = article_data.get("article_title", "")
article_metadata_base = {
"source_document": article_data.get("source_document"),
"article": article_data.get("article"), # Số điều, ví dụ "Điều 5"
"article_title": raw_article_title # Tiêu đề của Điều, ví dụ "Xử phạt người điều khiển..."
}
article_level_vehicle_type = get_standardized_vehicle_type(raw_article_title) or "không xác định"
clauses = article_data.get("clauses", [])
if not isinstance(clauses, list):
# print(f"Cảnh báo: 'clauses' trong Điều {article_data.get('article')} không phải list.")
continue
for clause_data in clauses:
if not isinstance(clause_data, dict):
# print(f"Cảnh báo: Bỏ qua mục không phải dict trong 'clauses' của Điều {article_data.get('article')}")
continue
clause_metadata_base = article_metadata_base.copy()
clause_number = clause_data.get("clause_number") # Số khoản, ví dụ "1" hoặc "1a"
clause_metadata_base.update({"clause_number": clause_number})
clause_summary_data = clause_data.get("clause_metadata_summary")
if isinstance(clause_summary_data, dict):
clause_metadata_base["overall_fine_note_for_clause_text"] = clause_summary_data.get("overall_fine_note_for_clause")
clause_metadata_base["overall_points_deduction_note_for_clause_text"] = clause_summary_data.get("overall_points_deduction_note_for_clause")
points_in_clause = clause_data.get("points_in_clause", [])
if not isinstance(points_in_clause, list):
# print(f"Cảnh báo: 'points_in_clause' trong Khoản {clause_number} của Điều {article_data.get('article')} không phải list.")
continue
if points_in_clause: # Nếu có các Điểm trong Khoản
for point_data in points_in_clause:
if not isinstance(point_data, dict):
# print(f"Cảnh báo: Bỏ qua mục không phải dict trong 'points_in_clause'.")
continue
chunk_text = point_data.get("point_text_original")
if not chunk_text: chunk_text = point_data.get("violation_description_summary") # Fallback
if not chunk_text: continue # Bỏ qua nếu không có text
current_chunk_metadata = clause_metadata_base.copy()
current_chunk_metadata["point_id"] = point_data.get("point_id") # ID của điểm, ví dụ "a"
current_chunk_metadata["violation_description_summary"] = point_data.get("violation_description_summary")
# 1. Làm phẳng 'penalties_detail'
current_chunk_metadata.update({
'has_fine': False, 'has_points_deduction': False,
'has_license_suspension': False, 'has_confiscation': False
})
penalty_types_for_this_point: List[str] = []
points_values: List[Any] = []
s_min_months: List[float] = [] # Sửa thành float để chứa giá trị tháng lẻ
s_max_months: List[float] = []
confiscation_items_list: List[str] = []
penalties = point_data.get("penalties_detail", [])
if isinstance(penalties, list):
for p_item in penalties:
if not isinstance(p_item, dict): continue
p_type_original, p_details = p_item.get("penalty_type"), p_item.get("details", {})
if p_type_original: penalty_types_for_this_point.append(str(p_type_original))
if not isinstance(p_details, dict): continue
p_type_lower = str(p_type_original).lower()
if "phạt tiền" in p_type_lower:
current_chunk_metadata['has_fine'] = True
if p_details.get("individual_fine_min") is not None: current_chunk_metadata['individual_fine_min'] = p_details.get("individual_fine_min")
if p_details.get("individual_fine_max") is not None: current_chunk_metadata['individual_fine_max'] = p_details.get("individual_fine_max")
if "trừ điểm" in p_type_lower or "điểm giấy phép lái xe" in p_type_lower : # Mở rộng kiểm tra
current_chunk_metadata['has_points_deduction'] = True
if p_details.get("points_deducted") is not None: points_values.append(p_details.get("points_deducted"))
if "tước quyền sử dụng giấy phép lái xe" in p_type_lower or "tước bằng lái" in p_type_lower:
current_chunk_metadata['has_license_suspension'] = True
if p_details.get("suspension_duration_min_months") is not None: s_min_months.append(float(p_details.get("suspension_duration_min_months")))
if p_details.get("suspension_duration_max_months") is not None: s_max_months.append(float(p_details.get("suspension_duration_max_months")))
if "tịch thu" in p_type_lower:
current_chunk_metadata['has_confiscation'] = True
if p_details.get("confiscation_item"): confiscation_items_list.append(str(p_details.get("confiscation_item")))
if penalty_types_for_this_point: current_chunk_metadata['penalty_types_str'] = ", ".join(sorted(list(set(penalty_types_for_this_point))))
if points_values: current_chunk_metadata['points_deducted_values_str'] = ", ".join(map(str, sorted(list(set(points_values)))))
if s_min_months: current_chunk_metadata['suspension_min_months'] = min(s_min_months)
if s_max_months: current_chunk_metadata['suspension_max_months'] = max(s_max_months)
if confiscation_items_list: current_chunk_metadata['confiscation_items_str'] = ", ".join(sorted(list(set(confiscation_items_list))))
# 2. Thông tin tốc độ
if point_data.get("speed_limit") is not None: current_chunk_metadata['speed_limit_value'] = point_data.get("speed_limit")
if point_data.get("speed_limit_min") is not None: current_chunk_metadata['speed_limit_min_value'] = point_data.get("speed_limit_min")
if point_data.get("speed_type"): current_chunk_metadata['speed_category'] = point_data.get("speed_type")
# 3. Thông tin loại xe/đường cụ thể từ point_data
speed_limits_extra = point_data.get("speed_limits_by_vehicle_type_and_road_type", [])
point_specific_vehicle_types_raw: List[str] = []
point_specific_road_types: List[str] = []
if isinstance(speed_limits_extra, list):
for sl_item in speed_limits_extra:
if isinstance(sl_item, dict):
if sl_item.get("vehicle_type"): point_specific_vehicle_types_raw.append(str(sl_item.get("vehicle_type")).lower())
if sl_item.get("road_type"): point_specific_road_types.append(str(sl_item.get("road_type")).lower())
if point_specific_vehicle_types_raw: current_chunk_metadata['point_specific_vehicle_types_str'] = ", ".join(sorted(list(set(point_specific_vehicle_types_raw))))
if point_specific_road_types: current_chunk_metadata['point_specific_road_types_str'] = ", ".join(sorted(list(set(point_specific_road_types))))
# 4. Gán 'applicable_vehicle_type' chính
derived_vehicle_type_from_point = "không xác định"
if point_specific_vehicle_types_raw:
normalized_types_from_point_data = set()
for vt_raw in set(point_specific_vehicle_types_raw):
standard_type = get_standardized_vehicle_type(vt_raw)
if standard_type: normalized_types_from_point_data.add(standard_type)
if len(normalized_types_from_point_data) == 1:
derived_vehicle_type_from_point = list(normalized_types_from_point_data)[0]
elif len(normalized_types_from_point_data) > 1:
# Xử lý trường hợp có nhiều loại xe đã chuẩn hóa
if "ô tô" in normalized_types_from_point_data and "xe máy" in normalized_types_from_point_data:
derived_vehicle_type_from_point = "ô tô và xe máy"
# Có thể thêm các logic ưu tiên khác nếu cần (ví dụ: nếu có "xe cơ giới" và "ô tô" -> "ô tô")
elif "ô tô" in normalized_types_from_point_data: derived_vehicle_type_from_point = "ô tô"
elif "xe máy" in normalized_types_from_point_data: derived_vehicle_type_from_point = "xe máy"
else: # Nếu không có cặp ưu tiên rõ ràng
derived_vehicle_type_from_point = "nhiều loại cụ thể" # Hoặc join các loại: ", ".join(sorted(list(normalized_types_from_point_data)))
# Ưu tiên thông tin từ point, nếu không rõ ràng thì mới dùng từ article
# Các loại được coi là rõ ràng: "ô tô", "xe máy", "xe cơ giới", "xe thô sơ", "người đi bộ", "ô tô và xe máy"
clear_types = ["ô tô", "xe máy", "xe cơ giới", "xe thô sơ", "người đi bộ", "ô tô và xe máy"]
if derived_vehicle_type_from_point not in clear_types and derived_vehicle_type_from_point != "không xác định":
# Nếu là "nhiều loại cụ thể" mà không phải "ô tô và xe máy" hoặc các loại không rõ ràng khác
current_chunk_metadata['applicable_vehicle_type'] = article_level_vehicle_type
elif derived_vehicle_type_from_point == "không xác định":
current_chunk_metadata['applicable_vehicle_type'] = article_level_vehicle_type
else: # Các trường hợp còn lại (rõ ràng từ point)
current_chunk_metadata['applicable_vehicle_type'] = derived_vehicle_type_from_point
# 5. Các trường khác
if point_data.get("applies_to"): current_chunk_metadata['applies_to_context'] = point_data.get("applies_to")
if point_data.get("location"): current_chunk_metadata['specific_location_info'] = point_data.get("location")
final_metadata_cleaned = {k: v for k, v in current_chunk_metadata.items() if v is not None}
flat_list.append({ "text": chunk_text, "metadata": final_metadata_cleaned })
else: # Nếu Khoản không có Điểm nào, thì text của Khoản là một chunk
chunk_text = clause_data.get("clause_text_original")
if chunk_text: # Chỉ thêm nếu có text
current_clause_level_metadata = clause_metadata_base.copy()
# Với chunk cấp độ Khoản, loại xe sẽ lấy từ article_level_vehicle_type
current_clause_level_metadata['applicable_vehicle_type'] = article_level_vehicle_type
# Kiểm tra xem có thông tin phạt tiền tổng thể ở Khoản không
if current_clause_level_metadata.get("overall_fine_note_for_clause_text"):
current_clause_level_metadata['has_fine_clause_level'] = True # Dùng để boost nếu cần
final_metadata_cleaned = {k:v for k,v in current_clause_level_metadata.items() if v is not None}
flat_list.append({ "text": chunk_text, "metadata": final_metadata_cleaned })
return flat_list
# --- 4. HÀM PHÂN TÍCH QUERY ---
def analyze_query(query_text: str) -> Dict[str, Any]:
"""
Phân tích query để xác định ý định của người dùng (ví dụ: hỏi về phạt tiền, điểm, loại xe...).
"""
query_lower = query_text.lower()
analysis: Dict[str, Any] = {
"mentions_fine": bool(re.search(r'tiền|phạt|bao nhiêu đồng|bao nhiêu tiền|mức phạt|xử phạt hành chính|nộp phạt', query_lower)),
"mentions_points": bool(re.search(r'điểm|trừ điểm|mấy điểm|trừ bao nhiêu điểm|bằng lái|gplx|giấy phép lái xe', query_lower)),
"mentions_suspension": bool(re.search(r'tước bằng|tước giấy phép lái xe|giam bằng|treo bằng|thu bằng lái|tước quyền sử dụng', query_lower)),
"mentions_confiscation": bool(re.search(r'tịch thu|thu xe|thu phương tiện', query_lower)),
"mentions_max_speed": bool(re.search(r'tốc độ tối đa|giới hạn tốc độ|chạy quá tốc độ|vượt tốc độ', query_lower)),
"mentions_min_speed": bool(re.search(r'tốc độ tối thiểu|chạy chậm hơn', query_lower)),
"mentions_safe_distance": bool(re.search(r'khoảng cách an toàn|cự ly an toàn|cự ly tối thiểu|giữ khoảng cách', query_lower)),
"mentions_remedial_measures": bool(re.search(r'biện pháp khắc phục|khắc phục hậu quả', query_lower)),
"vehicle_type_query": None, # Sẽ được điền bằng loại xe chuẩn hóa nếu có
}
# Sử dụng lại VEHICLE_TYPE_MAP để chuẩn hóa loại xe trong query
# Ưu tiên loại cụ thể trước
queried_vehicle_standardized = get_standardized_vehicle_type(query_lower)
if queried_vehicle_standardized:
analysis["vehicle_type_query"] = queried_vehicle_standardized
return analysis
# --- 5. HÀM TÌM KIẾM KẾT HỢP (HYBRID SEARCH) ---
def search_relevant_laws(
query_text: str,
embedding_model, # Kiểu dữ liệu: SentenceTransformer model
faiss_index, # Kiểu dữ liệu: faiss.Index
chunks_data: List[Dict[str, Any]],
bm25_model, # Kiểu dữ liệu: BM25Okapi model
k: int = 5,
initial_k_multiplier: int = 10,
rrf_k_constant: int = 60,
# Trọng số cho các loại boost (có thể điều chỉnh)
boost_fine: float = 0.15,
boost_points: float = 0.15,
boost_both_fine_points: float = 0.10, # Boost thêm nếu khớp cả hai
boost_vehicle_type: float = 0.20,
boost_suspension: float = 0.18,
boost_confiscation: float = 0.18,
boost_max_speed: float = 0.15,
boost_min_speed: float = 0.15,
boost_safe_distance: float = 0.12,
boost_remedial_measures: float = 0.10
) -> List[Dict[str, Any]]:
"""
Thực hiện tìm kiếm kết hợp (semantic + keyword) với RRF và metadata re-ranking.
"""
if k <= 0:
print("Lỗi: k (số lượng kết quả) phải là số dương.")
return []
if not chunks_data:
print("Lỗi: chunks_data rỗng, không thể tìm kiếm.")
return []
print(f"\n🔎 Đang tìm kiếm (Hybrid) cho truy vấn: '{query_text}'")
# === 1. Phân tích Query ===
query_analysis = analyze_query(query_text)
# print(f" Phân tích query: {json.dumps(query_analysis, ensure_ascii=False, indent=2)}")
num_vectors_in_index = faiss_index.ntotal
if num_vectors_in_index == 0:
print("Lỗi: FAISS index rỗng.")
return []
# Số lượng ứng viên ban đầu từ mỗi retriever
num_candidates_each_retriever = max(min(k * initial_k_multiplier, num_vectors_in_index), min(k, num_vectors_in_index))
if num_candidates_each_retriever == 0:
print(f" Không thể lấy đủ số lượng ứng viên ban đầu (num_candidates = 0).")
return []
# === 2. Semantic Search (FAISS) ===
semantic_indices_raw: np.ndarray = np.array([[]], dtype=int) # Khởi tạo rỗng
try:
query_embedding_tensor = embedding_model.encode(
[query_text], convert_to_tensor=True, device=embedding_model.device
)
query_embedding_np = query_embedding_tensor.cpu().numpy().astype('float32')
faiss.normalize_L2(query_embedding_np) # Chuẩn hóa vector query
# print(f" Đã tạo và chuẩn hóa vector truy vấn shape: {query_embedding_np.shape}")
# print(f" Tìm kiếm {num_candidates_each_retriever} kết quả ngữ nghĩa (FAISS)...")
_, semantic_indices_raw = faiss_index.search(query_embedding_np, num_candidates_each_retriever)
# print(f" ✅ Tìm kiếm ngữ nghĩa (FAISS) hoàn tất.")
except Exception as e:
print(f"Lỗi khi tìm kiếm ngữ nghĩa (FAISS): {e}")
# semantic_indices_raw đã được khởi tạo là rỗng
# === 3. Keyword Search (BM25) ===
# print(f" Tìm kiếm {num_candidates_each_retriever} kết quả từ khóa (BM25)...")
tokenized_query_bm25 = tokenize_vi_for_bm25_setup(query_text)
top_bm25_results: List[Dict[str, Any]] = []
try:
if bm25_model and tokenized_query_bm25:
all_bm25_scores = bm25_model.get_scores(tokenized_query_bm25)
# Lấy chỉ số và score cho các document có score > 0
bm25_results_with_indices = [
{'index': i, 'score': score} for i, score in enumerate(all_bm25_scores) if score > 0
]
# Sắp xếp theo score giảm dần
bm25_results_with_indices.sort(key=lambda x: x['score'], reverse=True)
top_bm25_results = bm25_results_with_indices[:num_candidates_each_retriever]
# print(f" ✅ Tìm kiếm từ khóa (BM25) hoàn tất, tìm thấy {len(top_bm25_results)} ứng viên.")
else:
# print(" Cảnh báo: BM25 model hoặc tokenized query không hợp lệ, bỏ qua BM25.")
pass
except Exception as e:
print(f"Lỗi khi tìm kiếm từ khóa (BM25): {e}")
# === 4. Result Fusion (Reciprocal Rank Fusion - RRF) ===
# print(f" Kết hợp kết quả từ FAISS và BM25 bằng RRF (k_const={rrf_k_constant})...")
rrf_scores: Dict[int, float] = defaultdict(float)
all_retrieved_indices_set: set[int] = set()
if semantic_indices_raw.size > 0:
for rank, doc_idx_int in enumerate(semantic_indices_raw[0]):
doc_idx = int(doc_idx_int) # Đảm bảo là int
if 0 <= doc_idx < len(chunks_data): # Kiểm tra doc_idx hợp lệ với chunks_data
rrf_scores[doc_idx] += 1.0 / (rrf_k_constant + rank)
all_retrieved_indices_set.add(doc_idx)
for rank, item in enumerate(top_bm25_results):
doc_idx = item['index']
if 0 <= doc_idx < len(chunks_data):
rrf_scores[doc_idx] += 1.0 / (rrf_k_constant + rank)
all_retrieved_indices_set.add(doc_idx)
fused_initial_results: List[Dict[str, Any]] = []
for doc_idx in all_retrieved_indices_set:
fused_initial_results.append({
'index': doc_idx,
'fused_score': rrf_scores[doc_idx]
})
fused_initial_results.sort(key=lambda x: x['fused_score'], reverse=True)
# print(f" ✅ Kết hợp RRF hoàn tất, có {len(fused_initial_results)} ứng viên duy nhất.")
# === 5. Xử lý Metadata, Lọc và Tái xếp hạng cuối cùng ===
final_processed_results: List[Dict[str, Any]] = []
# Xử lý metadata cho số lượng ứng viên lớn hơn để có đủ lựa chọn sau khi lọc
num_to_process_metadata = min(len(fused_initial_results), num_candidates_each_retriever * 2 if num_candidates_each_retriever > 0 else k * 3)
# print(f" Xử lý metadata và tính điểm cuối cùng cho top {num_to_process_metadata} ứng viên lai ghép...")
for rank_idx, res_item in enumerate(fused_initial_results[:num_to_process_metadata]):
result_index = res_item['index']
base_score_from_fusion = res_item['fused_score']
metadata_boost_components: Dict[str, float] = defaultdict(float)
passes_all_strict_filters = True
try:
original_chunk = chunks_data[result_index]
chunk_metadata = original_chunk.get('metadata', {})
chunk_text_lower = original_chunk.get('text', '').lower()
# 5.1 & 5.2: Tiền phạt và Điểm phạt
has_fine_info_in_chunk = chunk_metadata.get("has_fine", False) or chunk_metadata.get("has_fine_clause_level", False)
has_points_info_in_chunk = chunk_metadata.get("has_points_deduction", False)
if query_analysis["mentions_fine"]:
if has_fine_info_in_chunk:
metadata_boost_components["fine"] += boost_fine
elif not query_analysis["mentions_points"]: # Chỉ hỏi tiền mà chunk không có -> lọc
passes_all_strict_filters = False
if query_analysis["mentions_points"]:
if has_points_info_in_chunk:
metadata_boost_components["points"] += boost_points
elif not query_analysis["mentions_fine"]: # Chỉ hỏi điểm mà chunk không có -> lọc
passes_all_strict_filters = False
if query_analysis["mentions_fine"] and query_analysis["mentions_points"]:
if not has_fine_info_in_chunk and not has_points_info_in_chunk: # Query hỏi cả hai mà chunk ko có cả hai
passes_all_strict_filters = False
elif has_fine_info_in_chunk and has_points_info_in_chunk:
metadata_boost_components["both_fine_points"] += boost_both_fine_points
# 5.3. Loại xe
queried_vehicle = query_analysis["vehicle_type_query"]
if queried_vehicle:
applicable_vehicle_meta = chunk_metadata.get("applicable_vehicle_type", "").lower()
point_specific_vehicles_meta = chunk_metadata.get("point_specific_vehicle_types_str", "").lower()
article_title_lower = chunk_metadata.get("article_title", "").lower()
match_vehicle = False
if queried_vehicle in applicable_vehicle_meta: match_vehicle = True
elif queried_vehicle in point_specific_vehicles_meta: match_vehicle = True
# Kiểm tra xem queried_vehicle (đã chuẩn hóa) có trong article_title không
elif queried_vehicle in article_title_lower: match_vehicle = True # Đơn giản hóa, có thể dùng regex nếu cần chính xác hơn
# Kiểm tra xem queried_vehicle (đã chuẩn hóa) có trong chunk_text không
elif queried_vehicle in chunk_text_lower: match_vehicle = True
if match_vehicle:
metadata_boost_components["vehicle_type"] += boost_vehicle_type
else:
# Logic lọc: nếu query có loại xe cụ thể, mà applicable_vehicle_type của chunk là một loại khác
# và không phải là "không xác định", "nhiều loại cụ thể", hoặc loại cha chung ("xe cơ giới" cho "ô tô")
if applicable_vehicle_meta and \
applicable_vehicle_meta not in ["không xác định", "nhiều loại cụ thể", "ô tô và xe máy"] and \
not (applicable_vehicle_meta == "xe cơ giới" and queried_vehicle in ["ô tô", "xe máy"]):
passes_all_strict_filters = False
# 5.4. Tước quyền sử dụng GPLX
if query_analysis["mentions_suspension"] and chunk_metadata.get("has_license_suspension"):
metadata_boost_components["suspension"] += boost_suspension
# 5.5. Tịch thu
if query_analysis["mentions_confiscation"] and chunk_metadata.get("has_confiscation"):
metadata_boost_components["confiscation"] += boost_confiscation
# 5.6. Tốc độ tối đa
if query_analysis["mentions_max_speed"]:
if chunk_metadata.get("speed_limit_value") is not None or \
"tốc độ tối đa" in chunk_metadata.get("speed_category","").lower() or \
any(kw in chunk_text_lower for kw in ["quá tốc độ", "tốc độ tối đa", "vượt tốc độ quy định"]):
metadata_boost_components["max_speed"] += boost_max_speed
# 5.7. Tốc độ tối thiểu
if query_analysis["mentions_min_speed"]:
if chunk_metadata.get("speed_limit_min_value") is not None or \
"tốc độ tối thiểu" in chunk_metadata.get("speed_category","").lower() or \
"tốc độ tối thiểu" in chunk_text_lower:
metadata_boost_components["min_speed"] += boost_min_speed
# 5.8. Khoảng cách an toàn
if query_analysis["mentions_safe_distance"]:
if any(kw in chunk_text_lower for kw in ["khoảng cách an toàn", "cự ly an toàn", "cự ly tối thiểu", "giữ khoảng cách"]):
metadata_boost_components["safe_distance"] += boost_safe_distance
# 5.9. Biện pháp khắc phục
if query_analysis["mentions_remedial_measures"]:
if any(kw in chunk_text_lower for kw in ["biện pháp khắc phục", "khắc phục hậu quả"]):
metadata_boost_components["remedial_measures"] += boost_remedial_measures
if not passes_all_strict_filters:
continue
total_metadata_boost = sum(metadata_boost_components.values())
final_score_calculated = base_score_from_fusion + total_metadata_boost
final_processed_results.append({
"rank_after_fusion": rank_idx + 1,
"index": int(result_index),
"base_score_rrf": float(base_score_from_fusion),
"metadata_boost_components": dict(metadata_boost_components), # Lưu lại để debug
"metadata_boost_total": float(total_metadata_boost),
"final_score": final_score_calculated,
"text": original_chunk.get('text', '*Không có text*'),
"metadata": chunk_metadata, # Giữ nguyên metadata gốc
"query_analysis_for_boost": query_analysis # (Tùy chọn) Lưu lại query analysis dùng cho boosting
})
except IndexError:
print(f"Lỗi Index: Chỉ số {result_index} nằm ngoài chunks_data (size: {len(chunks_data)}). Bỏ qua chunk này.")
except Exception as e:
print(f"Lỗi khi xử lý ứng viên lai ghép tại chỉ số {result_index}: {e}. Bỏ qua chunk này.")
final_processed_results.sort(key=lambda x: x["final_score"], reverse=True)
final_results_top_k = final_processed_results[:k]
print(f" ✅ Xử lý, kết hợp metadata boost và tái xếp hạng hoàn tất. Trả về {len(final_results_top_k)} kết quả.")
return final_results_top_k
# --- (Tùy chọn) Hàm main để test nhanh file này ---
if __name__ == '__main__':
print("Chạy test cho retrieval.py...")
# --- Test get_standardized_vehicle_type ---
print("\n--- Test get_standardized_vehicle_type ---")
test_vehicles = [
"người điều khiển xe ô tô con", "xe gắn máy", "xe cơ giới", "xe máy chuyên dùng",
"xe đạp điện", "người đi bộ", "ô tô tải và xe mô tô", None, ""
]
for tv in test_vehicles:
print(f"'{tv}' -> '{get_standardized_vehicle_type(tv)}'")
# --- Test analyze_query ---
print("\n--- Test analyze_query ---")
test_queries = [
"xe máy không gương phạt bao nhiêu tiền?",
"ô tô chạy quá tốc độ 20km bị trừ mấy điểm gplx",
"đi bộ ở đâu thì đúng luật",
"biện pháp khắc phục khi gây tai nạn là gì"
]
for tq in test_queries:
print(f"Query: '{tq}'\nAnalysis: {json.dumps(analyze_query(tq), indent=2, ensure_ascii=False)}")
# --- Test process_law_data_to_chunks (cần file JSON mẫu) ---
# Giả sử bạn có file JSON mẫu 'sample_law_data.json' cùng cấp
# sample_data = {
# "article": "Điều 5",
# "article_title": "Xử phạt người điều khiển xe ô tô và các loại xe tương tự xe ô tô vi phạm quy tắc giao thông đường bộ",
# "source_document": "Nghị định 100/2019/NĐ-CP",
# "clauses": [
# {
# "clause_number": "1",
# "clause_text_original": "Phạt tiền từ 200.000 đồng đến 400.000 đồng đối với người điều khiển xe thực hiện một trong các hành vi vi phạm sau đây:",
# "points_in_clause": [
# {
# "point_id": "a",
# "point_text_original": "Không chấp hành hiệu lệnh, chỉ dẫn của biển báo hiệu, vạch kẻ đường, trừ các hành vi vi phạm quy định tại điểm a khoản 2, điểm c khoản 3, điểm đ khoản 4, điểm g khoản 5, điểm b khoản 6, điểm b khoản 7, điểm d khoản 8 Điều này;",
# "penalties_detail": [
# {"penalty_type": "Phạt tiền", "details": {"individual_fine_min": 200000, "individual_fine_max": 400000}}
# ],
# "speed_limits_by_vehicle_type_and_road_type": [{"vehicle_type": "xe ô tô con"}]
# }
# ]
# },
# {
# "clause_number": "12", # Khoản không có điểm
# "clause_text_original": "Ngoài việc bị phạt tiền, người điều khiển xe thực hiện hành vi vi phạm còn bị áp dụng các hình thức xử phạt bổ sung sau đây: ...",
# "clause_metadata_summary": {"overall_fine_note_for_clause": "Áp dụng hình phạt bổ sung"}
# }
# ]
# }
#
# print("\n--- Test process_law_data_to_chunks ---")
# chunks = process_law_data_to_chunks(sample_data)
# print(f"Số chunks được tạo: {len(chunks)}")
# if chunks:
# print("Chunk đầu tiên:")
# print(json.dumps(chunks[0], indent=2, ensure_ascii=False))
# print("Chunk cuối cùng (nếu có nhiều hơn 1):")
# if len(chunks) > 1:
# print(json.dumps(chunks[-1], indent=2, ensure_ascii=False))
# Để test search_relevant_laws, bạn cần mock embedding_model, faiss_index, bm25_model
# và có chunks_data đã được xử lý.
print("\n--- Các test khác (ví dụ: search_relevant_laws) cần mock hoặc dữ liệu đầy đủ. ---")