Added kb_id filter to knn. Fix #3458 (#3513)
Browse files### What problem does this PR solve?
Added kb_id filter to knn. Fix #3458
- [x] Bug Fix (non-breaking change which fixes an issue)
- deepdoc/parser/pdf_parser.py +1 -1
- rag/nlp/synonym.py +1 -1
- rag/utils/es_conn.py +37 -41
deepdoc/parser/pdf_parser.py
CHANGED
|
@@ -757,7 +757,7 @@ class RAGFlowPdfParser:
|
|
| 757 |
if ii is not None:
|
| 758 |
b = louts[ii]
|
| 759 |
else:
|
| 760 |
-
logging.
|
| 761 |
f"Missing layout match: {pn + 1},%s" %
|
| 762 |
(bxs[0].get(
|
| 763 |
"layoutno", "")))
|
|
|
|
| 757 |
if ii is not None:
|
| 758 |
b = louts[ii]
|
| 759 |
else:
|
| 760 |
+
logging.warning(
|
| 761 |
f"Missing layout match: {pn + 1},%s" %
|
| 762 |
(bxs[0].get(
|
| 763 |
"layoutno", "")))
|
rag/nlp/synonym.py
CHANGED
|
@@ -33,7 +33,7 @@ class Dealer:
|
|
| 33 |
try:
|
| 34 |
self.dictionary = json.load(open(path, 'r'))
|
| 35 |
except Exception:
|
| 36 |
-
logging.
|
| 37 |
self.dictionary = {}
|
| 38 |
|
| 39 |
if not redis:
|
|
|
|
| 33 |
try:
|
| 34 |
self.dictionary = json.load(open(path, 'r'))
|
| 35 |
except Exception:
|
| 36 |
+
logging.warning("Missing synonym.json")
|
| 37 |
self.dictionary = {}
|
| 38 |
|
| 39 |
if not redis:
|
rag/utils/es_conn.py
CHANGED
|
@@ -35,7 +35,7 @@ class ESConnection(DocStoreConnection):
|
|
| 35 |
self.info = self.es.info()
|
| 36 |
break
|
| 37 |
except Exception as e:
|
| 38 |
-
logging.
|
| 39 |
time.sleep(5)
|
| 40 |
if not self.es.ping():
|
| 41 |
msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
|
|
@@ -80,7 +80,7 @@ class ESConnection(DocStoreConnection):
|
|
| 80 |
settings=self.mapping["settings"],
|
| 81 |
mappings=self.mapping["mappings"])
|
| 82 |
except Exception:
|
| 83 |
-
logging.exception("
|
| 84 |
|
| 85 |
def deleteIdx(self, indexName: str, knowledgebaseId: str):
|
| 86 |
try:
|
|
@@ -88,7 +88,7 @@ class ESConnection(DocStoreConnection):
|
|
| 88 |
except NotFoundError:
|
| 89 |
pass
|
| 90 |
except Exception:
|
| 91 |
-
logging.exception("
|
| 92 |
|
| 93 |
def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
|
| 94 |
s = Index(indexName, self.es)
|
|
@@ -96,7 +96,7 @@ class ESConnection(DocStoreConnection):
|
|
| 96 |
try:
|
| 97 |
return s.exists()
|
| 98 |
except Exception as e:
|
| 99 |
-
logging.exception("
|
| 100 |
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
| 101 |
continue
|
| 102 |
return False
|
|
@@ -115,8 +115,21 @@ class ESConnection(DocStoreConnection):
|
|
| 115 |
indexNames = indexNames.split(",")
|
| 116 |
assert isinstance(indexNames, list) and len(indexNames) > 0
|
| 117 |
assert "_id" not in condition
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 118 |
s = Search()
|
| 119 |
-
bqry = None
|
| 120 |
vector_similarity_weight = 0.5
|
| 121 |
for m in matchExprs:
|
| 122 |
if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
|
|
@@ -130,13 +143,12 @@ class ESConnection(DocStoreConnection):
|
|
| 130 |
minimum_should_match = "0%"
|
| 131 |
if "minimum_should_match" in m.extra_options:
|
| 132 |
minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
|
| 133 |
-
bqry
|
| 134 |
-
must=Q("query_string", fields=m.fields,
|
| 135 |
type="best_fields", query=m.matching_text,
|
| 136 |
minimum_should_match=minimum_should_match,
|
| 137 |
-
boost=1)
|
| 138 |
-
|
| 139 |
-
|
| 140 |
elif isinstance(m, MatchDenseExpr):
|
| 141 |
assert (bqry is not None)
|
| 142 |
similarity = 0.0
|
|
@@ -150,21 +162,6 @@ class ESConnection(DocStoreConnection):
|
|
| 150 |
similarity=similarity,
|
| 151 |
)
|
| 152 |
|
| 153 |
-
condition["kb_id"] = knowledgebaseIds
|
| 154 |
-
if condition:
|
| 155 |
-
if not bqry:
|
| 156 |
-
bqry = Q("bool", must=[])
|
| 157 |
-
for k, v in condition.items():
|
| 158 |
-
if not isinstance(k, str) or not v:
|
| 159 |
-
continue
|
| 160 |
-
if isinstance(v, list):
|
| 161 |
-
bqry.filter.append(Q("terms", **{k: v}))
|
| 162 |
-
elif isinstance(v, str) or isinstance(v, int):
|
| 163 |
-
bqry.filter.append(Q("term", **{k: v}))
|
| 164 |
-
else:
|
| 165 |
-
raise Exception(
|
| 166 |
-
f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.")
|
| 167 |
-
|
| 168 |
if bqry:
|
| 169 |
s = s.query(bqry)
|
| 170 |
for field in highlightFields:
|
|
@@ -181,8 +178,7 @@ class ESConnection(DocStoreConnection):
|
|
| 181 |
if limit > 0:
|
| 182 |
s = s[offset:limit]
|
| 183 |
q = s.to_dict()
|
| 184 |
-
|
| 185 |
-
logging.debug("ESConnection.search [Q]: " + json.dumps(q))
|
| 186 |
|
| 187 |
for i in range(3):
|
| 188 |
try:
|
|
@@ -194,15 +190,15 @@ class ESConnection(DocStoreConnection):
|
|
| 194 |
_source=True)
|
| 195 |
if str(res.get("timed_out", "")).lower() == "true":
|
| 196 |
raise Exception("Es Timeout.")
|
| 197 |
-
logging.debug("ESConnection.search res: " + str(res))
|
| 198 |
return res
|
| 199 |
except Exception as e:
|
| 200 |
-
logging.exception("
|
| 201 |
if str(e).find("Timeout") > 0:
|
| 202 |
continue
|
| 203 |
raise e
|
| 204 |
-
logging.error("
|
| 205 |
-
raise Exception("
|
| 206 |
|
| 207 |
def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
|
| 208 |
for i in range(3):
|
|
@@ -217,12 +213,12 @@ class ESConnection(DocStoreConnection):
|
|
| 217 |
chunk["id"] = chunkId
|
| 218 |
return chunk
|
| 219 |
except Exception as e:
|
| 220 |
-
logging.exception(f"
|
| 221 |
if str(e).find("Timeout") > 0:
|
| 222 |
continue
|
| 223 |
raise e
|
| 224 |
-
logging.error("
|
| 225 |
-
raise Exception("
|
| 226 |
|
| 227 |
def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
|
| 228 |
# Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
|
@@ -250,7 +246,7 @@ class ESConnection(DocStoreConnection):
|
|
| 250 |
res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
|
| 251 |
return res
|
| 252 |
except Exception as e:
|
| 253 |
-
logging.warning("
|
| 254 |
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
|
| 255 |
time.sleep(3)
|
| 256 |
continue
|
|
@@ -268,7 +264,7 @@ class ESConnection(DocStoreConnection):
|
|
| 268 |
return True
|
| 269 |
except Exception as e:
|
| 270 |
logging.exception(
|
| 271 |
-
f"
|
| 272 |
if str(e).find("Timeout") > 0:
|
| 273 |
continue
|
| 274 |
else:
|
|
@@ -307,7 +303,7 @@ class ESConnection(DocStoreConnection):
|
|
| 307 |
_ = ubq.execute()
|
| 308 |
return True
|
| 309 |
except Exception as e:
|
| 310 |
-
logging.error("
|
| 311 |
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
| 312 |
continue
|
| 313 |
return False
|
|
@@ -329,7 +325,7 @@ class ESConnection(DocStoreConnection):
|
|
| 329 |
qry.must.append(Q("term", **{k: v}))
|
| 330 |
else:
|
| 331 |
raise Exception("Condition value must be int, str or list.")
|
| 332 |
-
logging.debug("ESConnection.delete
|
| 333 |
for _ in range(10):
|
| 334 |
try:
|
| 335 |
res = self.es.delete_by_query(
|
|
@@ -338,7 +334,7 @@ class ESConnection(DocStoreConnection):
|
|
| 338 |
refresh=True)
|
| 339 |
return res["deleted"]
|
| 340 |
except Exception as e:
|
| 341 |
-
logging.warning("
|
| 342 |
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
|
| 343 |
time.sleep(3)
|
| 344 |
continue
|
|
@@ -447,10 +443,10 @@ class ESConnection(DocStoreConnection):
|
|
| 447 |
request_timeout="2s")
|
| 448 |
return res
|
| 449 |
except ConnectionTimeout:
|
| 450 |
-
logging.exception("ESConnection.sql timeout
|
| 451 |
continue
|
| 452 |
except Exception:
|
| 453 |
-
logging.exception("ESConnection.sql got exception
|
| 454 |
return None
|
| 455 |
logging.error("ESConnection.sql timeout for 3 times!")
|
| 456 |
return None
|
|
|
|
| 35 |
self.info = self.es.info()
|
| 36 |
break
|
| 37 |
except Exception as e:
|
| 38 |
+
logging.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
|
| 39 |
time.sleep(5)
|
| 40 |
if not self.es.ping():
|
| 41 |
msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
|
|
|
|
| 80 |
settings=self.mapping["settings"],
|
| 81 |
mappings=self.mapping["mappings"])
|
| 82 |
except Exception:
|
| 83 |
+
logging.exception("ESConnection.createIndex error %s" % (indexName))
|
| 84 |
|
| 85 |
def deleteIdx(self, indexName: str, knowledgebaseId: str):
|
| 86 |
try:
|
|
|
|
| 88 |
except NotFoundError:
|
| 89 |
pass
|
| 90 |
except Exception:
|
| 91 |
+
logging.exception("ESConnection.deleteIdx error %s" % (indexName))
|
| 92 |
|
| 93 |
def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
|
| 94 |
s = Index(indexName, self.es)
|
|
|
|
| 96 |
try:
|
| 97 |
return s.exists()
|
| 98 |
except Exception as e:
|
| 99 |
+
logging.exception("ESConnection.indexExist got exception")
|
| 100 |
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
| 101 |
continue
|
| 102 |
return False
|
|
|
|
| 115 |
indexNames = indexNames.split(",")
|
| 116 |
assert isinstance(indexNames, list) and len(indexNames) > 0
|
| 117 |
assert "_id" not in condition
|
| 118 |
+
|
| 119 |
+
bqry = Q("bool", must=[])
|
| 120 |
+
condition["kb_id"] = knowledgebaseIds
|
| 121 |
+
for k, v in condition.items():
|
| 122 |
+
if not isinstance(k, str) or not v:
|
| 123 |
+
continue
|
| 124 |
+
if isinstance(v, list):
|
| 125 |
+
bqry.filter.append(Q("terms", **{k: v}))
|
| 126 |
+
elif isinstance(v, str) or isinstance(v, int):
|
| 127 |
+
bqry.filter.append(Q("term", **{k: v}))
|
| 128 |
+
else:
|
| 129 |
+
raise Exception(
|
| 130 |
+
f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.")
|
| 131 |
+
|
| 132 |
s = Search()
|
|
|
|
| 133 |
vector_similarity_weight = 0.5
|
| 134 |
for m in matchExprs:
|
| 135 |
if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
|
|
|
|
| 143 |
minimum_should_match = "0%"
|
| 144 |
if "minimum_should_match" in m.extra_options:
|
| 145 |
minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
|
| 146 |
+
bqry.must.append(Q("query_string", fields=m.fields,
|
|
|
|
| 147 |
type="best_fields", query=m.matching_text,
|
| 148 |
minimum_should_match=minimum_should_match,
|
| 149 |
+
boost=1))
|
| 150 |
+
bqry.boost = 1.0 - vector_similarity_weight
|
| 151 |
+
|
| 152 |
elif isinstance(m, MatchDenseExpr):
|
| 153 |
assert (bqry is not None)
|
| 154 |
similarity = 0.0
|
|
|
|
| 162 |
similarity=similarity,
|
| 163 |
)
|
| 164 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 165 |
if bqry:
|
| 166 |
s = s.query(bqry)
|
| 167 |
for field in highlightFields:
|
|
|
|
| 178 |
if limit > 0:
|
| 179 |
s = s[offset:limit]
|
| 180 |
q = s.to_dict()
|
| 181 |
+
logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
|
|
|
|
| 182 |
|
| 183 |
for i in range(3):
|
| 184 |
try:
|
|
|
|
| 190 |
_source=True)
|
| 191 |
if str(res.get("timed_out", "")).lower() == "true":
|
| 192 |
raise Exception("Es Timeout.")
|
| 193 |
+
logging.debug(f"ESConnection.search {str(indexNames)} res: " + str(res))
|
| 194 |
return res
|
| 195 |
except Exception as e:
|
| 196 |
+
logging.exception(f"ESConnection.search {str(indexNames)} query: " + str(q))
|
| 197 |
if str(e).find("Timeout") > 0:
|
| 198 |
continue
|
| 199 |
raise e
|
| 200 |
+
logging.error("ESConnection.search timeout for 3 times!")
|
| 201 |
+
raise Exception("ESConnection.search timeout.")
|
| 202 |
|
| 203 |
def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
|
| 204 |
for i in range(3):
|
|
|
|
| 213 |
chunk["id"] = chunkId
|
| 214 |
return chunk
|
| 215 |
except Exception as e:
|
| 216 |
+
logging.exception(f"ESConnection.get({chunkId}) got exception")
|
| 217 |
if str(e).find("Timeout") > 0:
|
| 218 |
continue
|
| 219 |
raise e
|
| 220 |
+
logging.error("ESConnection.get timeout for 3 times!")
|
| 221 |
+
raise Exception("ESConnection.get timeout.")
|
| 222 |
|
| 223 |
def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
|
| 224 |
# Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
|
|
|
| 246 |
res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
|
| 247 |
return res
|
| 248 |
except Exception as e:
|
| 249 |
+
logging.warning("ESConnection.insert got exception: " + str(e))
|
| 250 |
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
|
| 251 |
time.sleep(3)
|
| 252 |
continue
|
|
|
|
| 264 |
return True
|
| 265 |
except Exception as e:
|
| 266 |
logging.exception(
|
| 267 |
+
f"ESConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception")
|
| 268 |
if str(e).find("Timeout") > 0:
|
| 269 |
continue
|
| 270 |
else:
|
|
|
|
| 303 |
_ = ubq.execute()
|
| 304 |
return True
|
| 305 |
except Exception as e:
|
| 306 |
+
logging.error("ESConnection.update got exception: " + str(e))
|
| 307 |
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
|
| 308 |
continue
|
| 309 |
return False
|
|
|
|
| 325 |
qry.must.append(Q("term", **{k: v}))
|
| 326 |
else:
|
| 327 |
raise Exception("Condition value must be int, str or list.")
|
| 328 |
+
logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
|
| 329 |
for _ in range(10):
|
| 330 |
try:
|
| 331 |
res = self.es.delete_by_query(
|
|
|
|
| 334 |
refresh=True)
|
| 335 |
return res["deleted"]
|
| 336 |
except Exception as e:
|
| 337 |
+
logging.warning("ESConnection.delete got exception: " + str(e))
|
| 338 |
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
|
| 339 |
time.sleep(3)
|
| 340 |
continue
|
|
|
|
| 443 |
request_timeout="2s")
|
| 444 |
return res
|
| 445 |
except ConnectionTimeout:
|
| 446 |
+
logging.exception("ESConnection.sql timeout")
|
| 447 |
continue
|
| 448 |
except Exception:
|
| 449 |
+
logging.exception("ESConnection.sql got exception")
|
| 450 |
return None
|
| 451 |
logging.error("ESConnection.sql timeout for 3 times!")
|
| 452 |
return None
|