jinhai-2012 commited on
Commit
26d4422
·
1 Parent(s): f1351d2

Refactor parse progress (#3781)

Browse files

### What problem does this PR solve?

Refactor parse file progress

### Type of change

- [x] Refactoring

Signed-off-by: jinhai <[email protected]>

Files changed (2) hide show
  1. rag/app/naive.py +10 -12
  2. rag/svr/task_executor.py +63 -62
rag/app/naive.py CHANGED
@@ -193,7 +193,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
193
  Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'.
194
  """
195
 
196
- eng = lang.lower() == "english" # is_english(cks)
197
  parser_config = kwargs.get(
198
  "parser_config", {
199
  "chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True})
@@ -206,8 +206,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
206
  pdf_parser = None
207
  if re.search(r"\.docx$", filename, re.IGNORECASE):
208
  callback(0.1, "Start to parse.")
209
- sections, tbls = Docx()(filename, binary)
210
- res = tokenize_table(tbls, doc, eng) # just for table
211
 
212
  callback(0.8, "Finish parsing.")
213
  st = timer()
@@ -220,16 +220,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
220
  if kwargs.get("section_only", False):
221
  return chunks
222
 
223
- res.extend(tokenize_chunks_docx(chunks, doc, eng, images))
224
  logging.info("naive_merge({}): {}".format(filename, timer() - st))
225
  return res
226
 
227
  elif re.search(r"\.pdf$", filename, re.IGNORECASE):
228
- pdf_parser = Pdf(
229
- ) if parser_config.get("layout_recognize", True) else PlainParser()
230
- sections, tbls = pdf_parser(filename if not binary else binary,
231
- from_page=from_page, to_page=to_page, callback=callback)
232
- res = tokenize_table(tbls, doc, eng)
233
 
234
  elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
235
  callback(0.1, "Start to parse.")
@@ -248,8 +246,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
248
 
249
  elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE):
250
  callback(0.1, "Start to parse.")
251
- sections, tbls = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary)
252
- res = tokenize_table(tbls, doc, eng)
253
  callback(0.8, "Finish parsing.")
254
 
255
  elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE):
@@ -289,7 +287,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
289
  if kwargs.get("section_only", False):
290
  return chunks
291
 
292
- res.extend(tokenize_chunks(chunks, doc, eng, pdf_parser))
293
  logging.info("naive_merge({}): {}".format(filename, timer() - st))
294
  return res
295
 
 
193
  Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'.
194
  """
195
 
196
+ is_english = lang.lower() == "english" # is_english(cks)
197
  parser_config = kwargs.get(
198
  "parser_config", {
199
  "chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True})
 
206
  pdf_parser = None
207
  if re.search(r"\.docx$", filename, re.IGNORECASE):
208
  callback(0.1, "Start to parse.")
209
+ sections, tables = Docx()(filename, binary)
210
+ res = tokenize_table(tables, doc, is_english) # just for table
211
 
212
  callback(0.8, "Finish parsing.")
213
  st = timer()
 
220
  if kwargs.get("section_only", False):
221
  return chunks
222
 
223
+ res.extend(tokenize_chunks_docx(chunks, doc, is_english, images))
224
  logging.info("naive_merge({}): {}".format(filename, timer() - st))
225
  return res
226
 
227
  elif re.search(r"\.pdf$", filename, re.IGNORECASE):
228
+ pdf_parser = Pdf() if parser_config.get("layout_recognize", True) else PlainParser()
229
+ sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, callback=callback)
230
+ res = tokenize_table(tables, doc, is_english)
 
 
231
 
232
  elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
233
  callback(0.1, "Start to parse.")
 
246
 
247
  elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE):
248
  callback(0.1, "Start to parse.")
249
+ sections, tables = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary)
250
+ res = tokenize_table(tables, doc, is_english)
251
  callback(0.8, "Finish parsing.")
252
 
253
  elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE):
 
287
  if kwargs.get("section_only", False):
288
  return chunks
289
 
290
+ res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
291
  logging.info("naive_merge({}): {}".format(filename, timer() - st))
292
  return res
293
 
rag/svr/task_executor.py CHANGED
@@ -19,6 +19,7 @@
19
 
20
  import logging
21
  import sys
 
22
  from api.utils.log_utils import initRootLogger
23
 
24
  CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
@@ -166,52 +167,44 @@ def get_storage_binary(bucket, name):
166
  return STORAGE_IMPL.get(bucket, name)
167
 
168
 
169
- def build(row):
170
- if row["size"] > DOC_MAXIMUM_SIZE:
171
- set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
172
- (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
173
  return []
174
 
175
- callback = partial(
176
- set_progress,
177
- row["id"],
178
- row["from_page"],
179
- row["to_page"])
180
- chunker = FACTORY[row["parser_id"].lower()]
181
  try:
182
  st = timer()
183
- bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"])
184
  binary = get_storage_binary(bucket, name)
185
- logging.info(
186
- "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"]))
187
  except TimeoutError:
188
- callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.")
189
- logging.exception(
190
- "Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"]))
191
  raise
192
  except Exception as e:
193
  if re.search("(No such file|not found)", str(e)):
194
- callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"])
195
  else:
196
- callback(-1, "Get file from minio: %s" % str(e).replace("'", ""))
197
- logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"]))
198
  raise
199
 
200
  try:
201
- cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
202
- to_page=row["to_page"], lang=row["language"], callback=callback,
203
- kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
204
- logging.info("Chunking({}) {}/{} done".format(timer() - st, row["location"], row["name"]))
205
  except Exception as e:
206
- callback(-1, "Internal server error while chunking: %s" %
207
- str(e).replace("'", ""))
208
- logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"]))
209
  raise
210
 
211
  docs = []
212
  doc = {
213
- "doc_id": row["doc_id"],
214
- "kb_id": str(row["kb_id"])
215
  }
216
  el = 0
217
  for ck in cks:
@@ -240,41 +233,40 @@ def build(row):
240
  d["image"].save(output_buffer, format='JPEG')
241
 
242
  st = timer()
243
- STORAGE_IMPL.put(row["kb_id"], d["id"], output_buffer.getvalue())
244
  el += timer() - st
245
  except Exception:
246
- logging.exception(
247
- "Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"]))
248
  raise
249
 
250
- d["img_id"] = "{}-{}".format(row["kb_id"], d["id"])
251
  del d["image"]
252
  docs.append(d)
253
- logging.info("MINIO PUT({}):{}".format(row["name"], el))
254
 
255
- if row["parser_config"].get("auto_keywords", 0):
256
  st = timer()
257
- callback(msg="Start to generate keywords for every chunk ...")
258
- chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"])
259
  for d in docs:
260
  d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"],
261
- row["parser_config"]["auto_keywords"]).split(",")
262
  d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))
263
- callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st))
264
 
265
- if row["parser_config"].get("auto_questions", 0):
266
  st = timer()
267
- callback(msg="Start to generate questions for every chunk ...")
268
- chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"])
269
  for d in docs:
270
- qst = question_proposal(chat_mdl, d["content_with_weight"], row["parser_config"]["auto_questions"])
271
  d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"]
272
  qst = rag_tokenizer.tokenize(qst)
273
  if "content_ltks" in d:
274
  d["content_ltks"] += " " + qst
275
  if "content_sm_ltks" in d:
276
  d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst)
277
- callback(msg="Question generation completed in {:.2f}s".format(timer() - st))
278
 
279
  return docs
280
 
@@ -389,7 +381,9 @@ def do_handle_task(task):
389
  # bind embedding model
390
  embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
391
  except Exception as e:
392
- progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}')
 
 
393
  raise
394
 
395
  # Either using RAPTOR or Standard chunking methods
@@ -399,14 +393,16 @@ def do_handle_task(task):
399
  chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
400
 
401
  # run RAPTOR
402
- chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
403
  except Exception as e:
404
- progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}')
 
 
405
  raise
406
  else:
407
  # Standard chunking methods
408
  start_ts = timer()
409
- chunks = build(task)
410
  logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
411
  if chunks is None:
412
  return
@@ -418,38 +414,43 @@ def do_handle_task(task):
418
  progress_callback(msg="Generate {} chunks".format(len(chunks)))
419
  start_ts = timer()
420
  try:
421
- tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
422
  except Exception as e:
423
- progress_callback(-1, "Generate embedding error:{}".format(str(e)))
424
- logging.exception("run_embedding got exception")
425
- tk_count = 0
 
426
  raise
427
- logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
428
- progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts))
 
429
  # logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
430
  init_kb(task, vector_size)
431
  chunk_count = len(set([chunk["id"] for chunk in chunks]))
432
  start_ts = timer()
433
- es_r = ""
434
  es_bulk_size = 4
435
  for b in range(0, len(chunks), es_bulk_size):
436
- es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
437
  if b % 128 == 0:
438
  progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
439
  logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
440
- if es_r:
441
- progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
 
442
  settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
443
- logging.error('Insert chunk error: ' + str(es_r))
444
- raise Exception('Insert chunk error: ' + str(es_r))
445
 
446
  if TaskService.do_cancel(task_id):
447
  settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
448
  return
449
 
450
- progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts))
451
- DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0)
452
- logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts))
 
 
453
 
454
 
455
  def handle_task():
 
19
 
20
  import logging
21
  import sys
22
+
23
  from api.utils.log_utils import initRootLogger
24
 
25
  CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
 
167
  return STORAGE_IMPL.get(bucket, name)
168
 
169
 
170
+ def build_chunks(task, progress_callback):
171
+ if task["size"] > DOC_MAXIMUM_SIZE:
172
+ set_progress(task["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
173
+ (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
174
  return []
175
 
176
+ chunker = FACTORY[task["parser_id"].lower()]
 
 
 
 
 
177
  try:
178
  st = timer()
179
+ bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])
180
  binary = get_storage_binary(bucket, name)
181
+ logging.info("From minio({}) {}/{}".format(timer() - st, task["location"], task["name"]))
 
182
  except TimeoutError:
183
+ progress_callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.")
184
+ logging.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(task["location"], task["name"]))
 
185
  raise
186
  except Exception as e:
187
  if re.search("(No such file|not found)", str(e)):
188
+ progress_callback(-1, "Can not find file <%s> from minio. Could you try it again?" % task["name"])
189
  else:
190
+ progress_callback(-1, "Get file from minio: %s" % str(e).replace("'", ""))
191
+ logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
192
  raise
193
 
194
  try:
195
+ cks = chunker.chunk(task["name"], binary=binary, from_page=task["from_page"],
196
+ to_page=task["to_page"], lang=task["language"], callback=progress_callback,
197
+ kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"])
198
+ logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))
199
  except Exception as e:
200
+ progress_callback(-1, "Internal server error while chunking: %s" % str(e).replace("'", ""))
201
+ logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
 
202
  raise
203
 
204
  docs = []
205
  doc = {
206
+ "doc_id": task["doc_id"],
207
+ "kb_id": str(task["kb_id"])
208
  }
209
  el = 0
210
  for ck in cks:
 
233
  d["image"].save(output_buffer, format='JPEG')
234
 
235
  st = timer()
236
+ STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())
237
  el += timer() - st
238
  except Exception:
239
+ logging.exception("Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["_id"]))
 
240
  raise
241
 
242
+ d["img_id"] = "{}-{}".format(task["kb_id"], d["id"])
243
  del d["image"]
244
  docs.append(d)
245
+ logging.info("MINIO PUT({}):{}".format(task["name"], el))
246
 
247
+ if task["parser_config"].get("auto_keywords", 0):
248
  st = timer()
249
+ progress_callback(msg="Start to generate keywords for every chunk ...")
250
+ chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
251
  for d in docs:
252
  d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"],
253
+ task["parser_config"]["auto_keywords"]).split(",")
254
  d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))
255
+ progress_callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st))
256
 
257
+ if task["parser_config"].get("auto_questions", 0):
258
  st = timer()
259
+ progress_callback(msg="Start to generate questions for every chunk ...")
260
+ chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
261
  for d in docs:
262
+ qst = question_proposal(chat_mdl, d["content_with_weight"], task["parser_config"]["auto_questions"])
263
  d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"]
264
  qst = rag_tokenizer.tokenize(qst)
265
  if "content_ltks" in d:
266
  d["content_ltks"] += " " + qst
267
  if "content_sm_ltks" in d:
268
  d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst)
269
+ progress_callback(msg="Question generation completed in {:.2f}s".format(timer() - st))
270
 
271
  return docs
272
 
 
381
  # bind embedding model
382
  embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
383
  except Exception as e:
384
+ error_message = f'Fail to bind embedding model: {str(e)}'
385
+ progress_callback(-1, msg=error_message)
386
+ logging.exception(error_message)
387
  raise
388
 
389
  # Either using RAPTOR or Standard chunking methods
 
393
  chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
394
 
395
  # run RAPTOR
396
+ chunks, token_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
397
  except Exception as e:
398
+ error_message = f'Fail to bind LLM used by RAPTOR: {str(e)}'
399
+ progress_callback(-1, msg=error_message)
400
+ logging.exception(error_message)
401
  raise
402
  else:
403
  # Standard chunking methods
404
  start_ts = timer()
405
+ chunks = build_chunks(task, progress_callback)
406
  logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
407
  if chunks is None:
408
  return
 
414
  progress_callback(msg="Generate {} chunks".format(len(chunks)))
415
  start_ts = timer()
416
  try:
417
+ token_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
418
  except Exception as e:
419
+ error_message = "Generate embedding error:{}".format(str(e))
420
+ progress_callback(-1, error_message)
421
+ logging.exception(error_message)
422
+ token_count = 0
423
  raise
424
+ progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts)
425
+ logging.info(progress_message)
426
+ progress_callback(msg=progress_message)
427
  # logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
428
  init_kb(task, vector_size)
429
  chunk_count = len(set([chunk["id"] for chunk in chunks]))
430
  start_ts = timer()
431
+ doc_store_result = ""
432
  es_bulk_size = 4
433
  for b in range(0, len(chunks), es_bulk_size):
434
+ doc_store_result = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
435
  if b % 128 == 0:
436
  progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
437
  logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
438
+ if doc_store_result:
439
+ error_message = "Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
440
+ progress_callback(-1, msg=error_message)
441
  settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
442
+ logging.error(error_message)
443
+ raise Exception(error_message)
444
 
445
  if TaskService.do_cancel(task_id):
446
  settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
447
  return
448
 
449
+ DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
450
+
451
+ time_cost = timer() - start_ts
452
+ progress_callback(prog=1.0, msg="Done ({:.2f}s)".format(time_cost))
453
+ logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, token_count, len(chunks), time_cost))
454
 
455
 
456
  def handle_task():