Kevin Hu
commited on
Commit
·
69ced1e
1
Parent(s):
168c8d9
Fix chunk number error after re-parsing. (#4043)
Browse files### What problem does this PR solve?
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
api/apps/document_app.py
CHANGED
|
@@ -356,12 +356,11 @@ def run():
|
|
| 356 |
try:
|
| 357 |
for id in req["doc_ids"]:
|
| 358 |
info = {"run": str(req["run"]), "progress": 0}
|
| 359 |
-
if str(req["run"]) == TaskStatus.RUNNING.value:
|
| 360 |
info["progress_msg"] = ""
|
| 361 |
info["chunk_num"] = 0
|
| 362 |
info["token_num"] = 0
|
| 363 |
DocumentService.update_by_id(id, info)
|
| 364 |
-
# if str(req["run"]) == TaskStatus.CANCEL.value:
|
| 365 |
tenant_id = DocumentService.get_tenant_id(id)
|
| 366 |
if not tenant_id:
|
| 367 |
return get_data_error_result(message="Tenant not found!")
|
|
|
|
| 356 |
try:
|
| 357 |
for id in req["doc_ids"]:
|
| 358 |
info = {"run": str(req["run"]), "progress": 0}
|
| 359 |
+
if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False):
|
| 360 |
info["progress_msg"] = ""
|
| 361 |
info["chunk_num"] = 0
|
| 362 |
info["token_num"] = 0
|
| 363 |
DocumentService.update_by_id(id, info)
|
|
|
|
| 364 |
tenant_id = DocumentService.get_tenant_id(id)
|
| 365 |
if not tenant_id:
|
| 366 |
return get_data_error_result(message="Tenant not found!")
|
api/db/services/task_service.py
CHANGED
|
@@ -248,8 +248,9 @@ def queue_tasks(doc: dict, bucket: str, name: str):
|
|
| 248 |
|
| 249 |
prev_tasks = TaskService.get_tasks(doc["id"])
|
| 250 |
if prev_tasks:
|
|
|
|
| 251 |
for task in tsks:
|
| 252 |
-
reuse_prev_task_chunks(task, prev_tasks, chunking_config)
|
| 253 |
TaskService.filter_delete([Task.doc_id == doc["id"]])
|
| 254 |
chunk_ids = []
|
| 255 |
for task in prev_tasks:
|
|
@@ -257,6 +258,7 @@ def queue_tasks(doc: dict, bucket: str, name: str):
|
|
| 257 |
chunk_ids.extend(task["chunk_ids"].split())
|
| 258 |
if chunk_ids:
|
| 259 |
settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]), chunking_config["kb_id"])
|
|
|
|
| 260 |
|
| 261 |
bulk_insert_into_db(Task, tsks, True)
|
| 262 |
DocumentService.begin2parse(doc["id"])
|
|
@@ -267,14 +269,17 @@ def queue_tasks(doc: dict, bucket: str, name: str):
|
|
| 267 |
SVR_QUEUE_NAME, message=t
|
| 268 |
), "Can't access Redis. Please check the Redis' status."
|
| 269 |
|
|
|
|
| 270 |
def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict):
|
| 271 |
idx = bisect.bisect_left(prev_tasks, task["from_page"], key=lambda x: x["from_page"])
|
| 272 |
if idx >= len(prev_tasks):
|
| 273 |
-
return
|
| 274 |
prev_task = prev_tasks[idx]
|
| 275 |
if prev_task["progress"] < 1.0 or prev_task["digest"] != task["digest"] or not prev_task["chunk_ids"]:
|
| 276 |
-
return
|
| 277 |
task["chunk_ids"] = prev_task["chunk_ids"]
|
| 278 |
task["progress"] = 1.0
|
| 279 |
task["progress_msg"] = f"Page({task['from_page']}~{task['to_page']}): reused previous task's chunks"
|
| 280 |
prev_task["chunk_ids"] = ""
|
|
|
|
|
|
|
|
|
| 248 |
|
| 249 |
prev_tasks = TaskService.get_tasks(doc["id"])
|
| 250 |
if prev_tasks:
|
| 251 |
+
ck_num = 0
|
| 252 |
for task in tsks:
|
| 253 |
+
ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
|
| 254 |
TaskService.filter_delete([Task.doc_id == doc["id"]])
|
| 255 |
chunk_ids = []
|
| 256 |
for task in prev_tasks:
|
|
|
|
| 258 |
chunk_ids.extend(task["chunk_ids"].split())
|
| 259 |
if chunk_ids:
|
| 260 |
settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]), chunking_config["kb_id"])
|
| 261 |
+
DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})
|
| 262 |
|
| 263 |
bulk_insert_into_db(Task, tsks, True)
|
| 264 |
DocumentService.begin2parse(doc["id"])
|
|
|
|
| 269 |
SVR_QUEUE_NAME, message=t
|
| 270 |
), "Can't access Redis. Please check the Redis' status."
|
| 271 |
|
| 272 |
+
|
| 273 |
def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict):
|
| 274 |
idx = bisect.bisect_left(prev_tasks, task["from_page"], key=lambda x: x["from_page"])
|
| 275 |
if idx >= len(prev_tasks):
|
| 276 |
+
return 0
|
| 277 |
prev_task = prev_tasks[idx]
|
| 278 |
if prev_task["progress"] < 1.0 or prev_task["digest"] != task["digest"] or not prev_task["chunk_ids"]:
|
| 279 |
+
return 0
|
| 280 |
task["chunk_ids"] = prev_task["chunk_ids"]
|
| 281 |
task["progress"] = 1.0
|
| 282 |
task["progress_msg"] = f"Page({task['from_page']}~{task['to_page']}): reused previous task's chunks"
|
| 283 |
prev_task["chunk_ids"] = ""
|
| 284 |
+
|
| 285 |
+
return len(task["chunk_ids"].split())
|