Kevin Hu commited on
Commit
0c54322
·
1 Parent(s): 5607916

Make infinity adapt to condition `exist`. (#4657)

Browse files

### What problem does this PR solve?

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

api/apps/kb_app.py CHANGED
@@ -24,6 +24,7 @@ from api.db.services.document_service import DocumentService
24
  from api.db.services.file2document_service import File2DocumentService
25
  from api.db.services.file_service import FileService
26
  from api.db.services.user_service import TenantService, UserTenantService
 
27
  from api.utils.api_utils import server_error_response, get_data_error_result, validate_request, not_allowed_parameters
28
  from api.utils import get_uuid
29
  from api.db import StatusEnum, FileSource
@@ -96,6 +97,13 @@ def update():
96
  return get_data_error_result(
97
  message="Can't find this knowledgebase!")
98
 
 
 
 
 
 
 
 
99
  if req["name"].lower() != kb.name.lower() \
100
  and len(
101
  KnowledgebaseService.query(name=req["name"], tenant_id=current_user.id, status=StatusEnum.VALID.value)) > 1:
@@ -112,7 +120,7 @@ def update():
112
  search.index_name(kb.tenant_id), kb.id)
113
  else:
114
  # Elasticsearch requires PAGERANK_FLD be non-zero!
115
- settings.docStoreConn.update({"exist": PAGERANK_FLD}, {"remove": PAGERANK_FLD},
116
  search.index_name(kb.tenant_id), kb.id)
117
 
118
  e, kb = KnowledgebaseService.get_by_id(kb.id)
 
24
  from api.db.services.file2document_service import File2DocumentService
25
  from api.db.services.file_service import FileService
26
  from api.db.services.user_service import TenantService, UserTenantService
27
+ from api.settings import DOC_ENGINE
28
  from api.utils.api_utils import server_error_response, get_data_error_result, validate_request, not_allowed_parameters
29
  from api.utils import get_uuid
30
  from api.db import StatusEnum, FileSource
 
97
  return get_data_error_result(
98
  message="Can't find this knowledgebase!")
99
 
100
+ if req.get("parser_id", "") == "tag" and DOC_ENGINE == "infinity":
101
+ return get_json_result(
102
+ data=False,
103
+ message='The chunk method Tag has not been supported by Infinity yet.',
104
+ code=settings.RetCode.OPERATING_ERROR
105
+ )
106
+
107
  if req["name"].lower() != kb.name.lower() \
108
  and len(
109
  KnowledgebaseService.query(name=req["name"], tenant_id=current_user.id, status=StatusEnum.VALID.value)) > 1:
 
120
  search.index_name(kb.tenant_id), kb.id)
121
  else:
122
  # Elasticsearch requires PAGERANK_FLD be non-zero!
123
+ settings.docStoreConn.update({"exists": PAGERANK_FLD}, {"remove": PAGERANK_FLD},
124
  search.index_name(kb.tenant_id), kb.id)
125
 
126
  e, kb = KnowledgebaseService.get_by_id(kb.id)
rag/raptor.py CHANGED
@@ -71,7 +71,7 @@ class RecursiveAbstractiveProcessing4TreeOrganizedRetrieval:
71
  start, end = 0, len(chunks)
72
  if len(chunks) <= 1:
73
  return
74
- chunks = [(s, a) for s, a in chunks if len(a) > 0]
75
 
76
  def summarize(ck_idx, lock):
77
  nonlocal chunks
@@ -125,6 +125,8 @@ class RecursiveAbstractiveProcessing4TreeOrganizedRetrieval:
125
  threads = []
126
  for c in range(n_clusters):
127
  ck_idx = [i + start for i in range(len(lbls)) if lbls[i] == c]
 
 
128
  threads.append(executor.submit(summarize, ck_idx, lock))
129
  wait(threads, return_when=ALL_COMPLETED)
130
  for th in threads:
 
71
  start, end = 0, len(chunks)
72
  if len(chunks) <= 1:
73
  return
74
+ chunks = [(s, a) for s, a in chunks if s and len(a) > 0]
75
 
76
  def summarize(ck_idx, lock):
77
  nonlocal chunks
 
125
  threads = []
126
  for c in range(n_clusters):
127
  ck_idx = [i + start for i in range(len(lbls)) if lbls[i] == c]
128
+ if not ck_idx:
129
+ continue
130
  threads.append(executor.submit(summarize, ck_idx, lock))
131
  wait(threads, return_when=ALL_COMPLETED)
132
  for th in threads:
rag/utils/es_conn.py CHANGED
@@ -336,7 +336,7 @@ class ESConnection(DocStoreConnection):
336
  for k, v in condition.items():
337
  if not isinstance(k, str) or not v:
338
  continue
339
- if k == "exist":
340
  bqry.filter.append(Q("exists", field=v))
341
  continue
342
  if isinstance(v, list):
 
336
  for k, v in condition.items():
337
  if not isinstance(k, str) or not v:
338
  continue
339
+ if k == "exists":
340
  bqry.filter.append(Q("exists", field=v))
341
  continue
342
  if isinstance(v, list):
rag/utils/infinity_conn.py CHANGED
@@ -44,8 +44,23 @@ from rag.utils.doc_store_conn import (
44
  logger = logging.getLogger('ragflow.infinity_conn')
45
 
46
 
47
- def equivalent_condition_to_str(condition: dict) -> str | None:
48
  assert "_id" not in condition
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  cond = list()
50
  for k, v in condition.items():
51
  if not isinstance(k, str) or k in ["kb_id"] or not v:
@@ -61,8 +76,15 @@ def equivalent_condition_to_str(condition: dict) -> str | None:
61
  strInCond = ", ".join(inCond)
62
  strInCond = f"{k} IN ({strInCond})"
63
  cond.append(strInCond)
 
 
 
 
 
64
  elif isinstance(v, str):
65
  cond.append(f"{k}='{v}'")
 
 
66
  else:
67
  cond.append(f"{k}={str(v)}")
68
  return " AND ".join(cond) if cond else "1=1"
@@ -294,7 +316,11 @@ class InfinityConnection(DocStoreConnection):
294
  filter_cond = None
295
  filter_fulltext = ""
296
  if condition:
297
- filter_cond = equivalent_condition_to_str(condition)
 
 
 
 
298
  for matchExpr in matchExprs:
299
  if isinstance(matchExpr, MatchTextExpr):
300
  if filter_cond and "filter" not in matchExpr.extra_options:
@@ -434,12 +460,21 @@ class InfinityConnection(DocStoreConnection):
434
  self.createIdx(indexName, knowledgebaseId, vector_size)
435
  table_instance = db_instance.get_table(table_name)
436
 
 
 
 
 
 
 
 
 
 
437
  docs = copy.deepcopy(documents)
438
  for d in docs:
439
  assert "_id" not in d
440
  assert "id" in d
441
  for k, v in d.items():
442
- if k in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd"]:
443
  assert isinstance(v, list)
444
  d[k] = "###".join(v)
445
  elif re.search(r"_feas$", k):
@@ -454,6 +489,11 @@ class InfinityConnection(DocStoreConnection):
454
  elif k in ["page_num_int", "top_int"]:
455
  assert isinstance(v, list)
456
  d[k] = "_".join(f"{num:08x}" for num in v)
 
 
 
 
 
457
  ids = ["'{}'".format(d["id"]) for d in docs]
458
  str_ids = ", ".join(ids)
459
  str_filter = f"id IN ({str_ids})"
@@ -475,11 +515,11 @@ class InfinityConnection(DocStoreConnection):
475
  db_instance = inf_conn.get_database(self.dbName)
476
  table_name = f"{indexName}_{knowledgebaseId}"
477
  table_instance = db_instance.get_table(table_name)
478
- if "exist" in condition:
479
- del condition["exist"]
480
- filter = equivalent_condition_to_str(condition)
481
  for k, v in list(newValue.items()):
482
- if k in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd"]:
483
  assert isinstance(v, list)
484
  newValue[k] = "###".join(v)
485
  elif re.search(r"_feas$", k):
@@ -496,9 +536,11 @@ class InfinityConnection(DocStoreConnection):
496
  elif k in ["page_num_int", "top_int"]:
497
  assert isinstance(v, list)
498
  newValue[k] = "_".join(f"{num:08x}" for num in v)
499
- elif k == "remove" and v in [PAGERANK_FLD]:
500
  del newValue[k]
501
- newValue[v] = 0
 
 
502
  logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
503
  table_instance.update(filter, newValue)
504
  self.connPool.release_conn(inf_conn)
@@ -508,14 +550,14 @@ class InfinityConnection(DocStoreConnection):
508
  inf_conn = self.connPool.get_conn()
509
  db_instance = inf_conn.get_database(self.dbName)
510
  table_name = f"{indexName}_{knowledgebaseId}"
511
- filter = equivalent_condition_to_str(condition)
512
  try:
513
  table_instance = db_instance.get_table(table_name)
514
  except Exception:
515
  logger.warning(
516
- f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
517
  )
518
  return 0
 
519
  logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
520
  res = table_instance.delete(filter)
521
  self.connPool.release_conn(inf_conn)
@@ -553,7 +595,7 @@ class InfinityConnection(DocStoreConnection):
553
  v = res[fieldnm][i]
554
  if isinstance(v, Series):
555
  v = list(v)
556
- elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd"]:
557
  assert isinstance(v, str)
558
  v = [kwd for kwd in v.split("###") if kwd]
559
  elif fieldnm == "position_int":
@@ -584,6 +626,8 @@ class InfinityConnection(DocStoreConnection):
584
  ans = {}
585
  num_rows = len(res)
586
  column_id = res["id"]
 
 
587
  for i in range(num_rows):
588
  id = column_id[i]
589
  txt = res[fieldnm][i]
 
44
  logger = logging.getLogger('ragflow.infinity_conn')
45
 
46
 
47
+ def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None:
48
  assert "_id" not in condition
49
+ clmns = {}
50
+ if table_instance:
51
+ for n, ty, de, _ in table_instance.show_columns().rows():
52
+ clmns[n] = (ty, de)
53
+
54
+ def exists(cln):
55
+ nonlocal clmns
56
+ assert cln in clmns, f"'{cln}' should be in '{clmns}'."
57
+ ty, de = clmns[cln]
58
+ if ty.lower().find("cha"):
59
+ if not de:
60
+ de = ""
61
+ return f" {cln}!='{de}' "
62
+ return f"{cln}!={de}"
63
+
64
  cond = list()
65
  for k, v in condition.items():
66
  if not isinstance(k, str) or k in ["kb_id"] or not v:
 
76
  strInCond = ", ".join(inCond)
77
  strInCond = f"{k} IN ({strInCond})"
78
  cond.append(strInCond)
79
+ elif k == "must_not":
80
+ if isinstance(v, dict):
81
+ for kk, vv in v.items():
82
+ if kk == "exists":
83
+ cond.append("NOT (%s)" % exists(vv))
84
  elif isinstance(v, str):
85
  cond.append(f"{k}='{v}'")
86
+ elif k == "exists":
87
+ cond.append(exists(v))
88
  else:
89
  cond.append(f"{k}={str(v)}")
90
  return " AND ".join(cond) if cond else "1=1"
 
316
  filter_cond = None
317
  filter_fulltext = ""
318
  if condition:
319
+ for indexName in indexNames:
320
+ table_name = f"{indexName}_{knowledgebaseIds[0]}"
321
+ filter_cond = equivalent_condition_to_str(condition, db_instance.get_table(table_name))
322
+ break
323
+
324
  for matchExpr in matchExprs:
325
  if isinstance(matchExpr, MatchTextExpr):
326
  if filter_cond and "filter" not in matchExpr.extra_options:
 
460
  self.createIdx(indexName, knowledgebaseId, vector_size)
461
  table_instance = db_instance.get_table(table_name)
462
 
463
+ # embedding fields can't have a default value....
464
+ embedding_clmns = []
465
+ clmns = table_instance.show_columns().rows()
466
+ for n, ty, _, _ in clmns:
467
+ r = re.search(r"Embedding\([a-z]+,([0-9]+)\)", ty)
468
+ if not r:
469
+ continue
470
+ embedding_clmns.append((n, int(r.group(1))))
471
+
472
  docs = copy.deepcopy(documents)
473
  for d in docs:
474
  assert "_id" not in d
475
  assert "id" in d
476
  for k, v in d.items():
477
+ if k in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd", "source_id"]:
478
  assert isinstance(v, list)
479
  d[k] = "###".join(v)
480
  elif re.search(r"_feas$", k):
 
489
  elif k in ["page_num_int", "top_int"]:
490
  assert isinstance(v, list)
491
  d[k] = "_".join(f"{num:08x}" for num in v)
492
+
493
+ for n, vs in embedding_clmns:
494
+ if n in d:
495
+ continue
496
+ d[n] = [0] * vs
497
  ids = ["'{}'".format(d["id"]) for d in docs]
498
  str_ids = ", ".join(ids)
499
  str_filter = f"id IN ({str_ids})"
 
515
  db_instance = inf_conn.get_database(self.dbName)
516
  table_name = f"{indexName}_{knowledgebaseId}"
517
  table_instance = db_instance.get_table(table_name)
518
+ #if "exists" in condition:
519
+ # del condition["exists"]
520
+ filter = equivalent_condition_to_str(condition, table_instance)
521
  for k, v in list(newValue.items()):
522
+ if k in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd", "source_id"]:
523
  assert isinstance(v, list)
524
  newValue[k] = "###".join(v)
525
  elif re.search(r"_feas$", k):
 
536
  elif k in ["page_num_int", "top_int"]:
537
  assert isinstance(v, list)
538
  newValue[k] = "_".join(f"{num:08x}" for num in v)
539
+ elif k == "remove":
540
  del newValue[k]
541
+ if v in [PAGERANK_FLD]:
542
+ newValue[v] = 0
543
+
544
  logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
545
  table_instance.update(filter, newValue)
546
  self.connPool.release_conn(inf_conn)
 
550
  inf_conn = self.connPool.get_conn()
551
  db_instance = inf_conn.get_database(self.dbName)
552
  table_name = f"{indexName}_{knowledgebaseId}"
 
553
  try:
554
  table_instance = db_instance.get_table(table_name)
555
  except Exception:
556
  logger.warning(
557
+ f"Skipped deleting from table {table_name} since the table doesn't exist."
558
  )
559
  return 0
560
+ filter = equivalent_condition_to_str(condition, table_instance)
561
  logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
562
  res = table_instance.delete(filter)
563
  self.connPool.release_conn(inf_conn)
 
595
  v = res[fieldnm][i]
596
  if isinstance(v, Series):
597
  v = list(v)
598
+ elif fieldnm in ["important_kwd", "question_kwd", "entities_kwd", "tag_kwd", "source_id"]:
599
  assert isinstance(v, str)
600
  v = [kwd for kwd in v.split("###") if kwd]
601
  elif fieldnm == "position_int":
 
626
  ans = {}
627
  num_rows = len(res)
628
  column_id = res["id"]
629
+ if fieldnm not in res:
630
+ return {}
631
  for i in range(num_rows):
632
  id = column_id[i]
633
  txt = res[fieldnm][i]