Commit
·
f1351d2
1
Parent(s):
a64ddb6
Update file parsing progress info (#3780)
Browse files### What problem does this PR solve?
Refine the file parsing progress info
### Type of change
- [x] Refactoring
Signed-off-by: jinhai <haijin.chn@gmail.com>
- rag/svr/task_executor.py +57 -43
rag/svr/task_executor.py
CHANGED
@@ -370,72 +370,86 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None):
|
|
370 |
return res, tk_count, vector_size
|
371 |
|
372 |
|
373 |
-
def do_handle_task(
|
374 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
375 |
try:
|
376 |
-
|
|
|
377 |
except Exception as e:
|
378 |
-
|
379 |
raise
|
380 |
-
|
|
|
|
|
381 |
try:
|
382 |
-
|
383 |
-
|
|
|
|
|
|
|
384 |
except Exception as e:
|
385 |
-
|
386 |
raise
|
387 |
else:
|
388 |
-
|
389 |
-
|
390 |
-
|
391 |
-
|
|
|
392 |
return
|
393 |
-
if not
|
394 |
-
|
395 |
return
|
396 |
# TODO: exception handler
|
397 |
-
## set_progress(
|
398 |
-
|
399 |
-
|
400 |
-
)
|
401 |
-
st = timer()
|
402 |
try:
|
403 |
-
tk_count, vector_size = embedding(
|
404 |
except Exception as e:
|
405 |
-
|
406 |
-
logging.exception("
|
407 |
tk_count = 0
|
408 |
raise
|
409 |
-
logging.info("Embedding
|
410 |
-
|
411 |
-
# logging.info(f"task_executor init_kb index {search.index_name(
|
412 |
-
init_kb(
|
413 |
-
chunk_count = len(set([
|
414 |
-
|
415 |
es_r = ""
|
416 |
es_bulk_size = 4
|
417 |
-
for b in range(0, len(
|
418 |
-
es_r = settings.docStoreConn.insert(
|
419 |
if b % 128 == 0:
|
420 |
-
|
421 |
-
logging.info("Indexing
|
422 |
if es_r:
|
423 |
-
|
424 |
-
|
425 |
-
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"])
|
426 |
logging.error('Insert chunk error: ' + str(es_r))
|
427 |
raise Exception('Insert chunk error: ' + str(es_r))
|
428 |
|
429 |
-
if TaskService.do_cancel(
|
430 |
-
settings.docStoreConn.delete({"doc_id":
|
431 |
return
|
432 |
|
433 |
-
|
434 |
-
DocumentService.increment_chunk_num(
|
435 |
-
|
436 |
-
logging.info(
|
437 |
-
"Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(
|
438 |
-
r["id"], tk_count, len(cks), timer() - st))
|
439 |
|
440 |
|
441 |
def handle_task():
|
|
|
370 |
return res, tk_count, vector_size
|
371 |
|
372 |
|
373 |
+
def do_handle_task(task):
|
374 |
+
task_id = task["id"]
|
375 |
+
task_from_page = task["from_page"]
|
376 |
+
task_to_page = task["to_page"]
|
377 |
+
task_tenant_id = task["tenant_id"]
|
378 |
+
task_embedding_id = task["embd_id"]
|
379 |
+
task_language = task["language"]
|
380 |
+
task_llm_id = task["llm_id"]
|
381 |
+
task_dataset_id = task["kb_id"]
|
382 |
+
task_doc_id = task["doc_id"]
|
383 |
+
task_document_name = task["name"]
|
384 |
+
task_parser_config = task["parser_config"]
|
385 |
+
|
386 |
+
# prepare the progress callback function
|
387 |
+
progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)
|
388 |
try:
|
389 |
+
# bind embedding model
|
390 |
+
embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
|
391 |
except Exception as e:
|
392 |
+
progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}')
|
393 |
raise
|
394 |
+
|
395 |
+
# Either using RAPTOR or Standard chunking methods
|
396 |
+
if task.get("task_type", "") == "raptor":
|
397 |
try:
|
398 |
+
# bind LLM for raptor
|
399 |
+
chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
|
400 |
+
|
401 |
+
# run RAPTOR
|
402 |
+
chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
|
403 |
except Exception as e:
|
404 |
+
progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}')
|
405 |
raise
|
406 |
else:
|
407 |
+
# Standard chunking methods
|
408 |
+
start_ts = timer()
|
409 |
+
chunks = build(task)
|
410 |
+
logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
|
411 |
+
if chunks is None:
|
412 |
return
|
413 |
+
if not chunks:
|
414 |
+
progress_callback(1., msg=f"No chunk built from {task_document_name}")
|
415 |
return
|
416 |
# TODO: exception handler
|
417 |
+
## set_progress(task["did"], -1, "ERROR: ")
|
418 |
+
progress_callback(msg="Generate {} chunks".format(len(chunks)))
|
419 |
+
start_ts = timer()
|
|
|
|
|
420 |
try:
|
421 |
+
tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
|
422 |
except Exception as e:
|
423 |
+
progress_callback(-1, "Generate embedding error:{}".format(str(e)))
|
424 |
+
logging.exception("run_embedding got exception")
|
425 |
tk_count = 0
|
426 |
raise
|
427 |
+
logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
|
428 |
+
progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts))
|
429 |
+
# logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
|
430 |
+
init_kb(task, vector_size)
|
431 |
+
chunk_count = len(set([chunk["id"] for chunk in chunks]))
|
432 |
+
start_ts = timer()
|
433 |
es_r = ""
|
434 |
es_bulk_size = 4
|
435 |
+
for b in range(0, len(chunks), es_bulk_size):
|
436 |
+
es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
|
437 |
if b % 128 == 0:
|
438 |
+
progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
|
439 |
+
logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
|
440 |
if es_r:
|
441 |
+
progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
|
442 |
+
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
|
|
|
443 |
logging.error('Insert chunk error: ' + str(es_r))
|
444 |
raise Exception('Insert chunk error: ' + str(es_r))
|
445 |
|
446 |
+
if TaskService.do_cancel(task_id):
|
447 |
+
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
|
448 |
return
|
449 |
|
450 |
+
progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts))
|
451 |
+
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0)
|
452 |
+
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts))
|
|
|
|
|
|
|
453 |
|
454 |
|
455 |
def handle_task():
|