Kevin Hu
commited on
Commit
·
dc07a4b
1
Parent(s):
e2bbb9d
Cut down the attempt times of ES (#3550)
Browse files### What problem does this PR solve?
#3541
### Type of change
- [x] Refactoring
- [x] Performance Improvement
- rag/utils/es_conn.py +13 -11
rag/utils/es_conn.py
CHANGED
|
@@ -16,13 +16,15 @@ from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr,
|
|
| 16 |
FusionExpr
|
| 17 |
from rag.nlp import is_english, rag_tokenizer
|
| 18 |
|
|
|
|
|
|
|
| 19 |
|
| 20 |
@singleton
|
| 21 |
class ESConnection(DocStoreConnection):
|
| 22 |
def __init__(self):
|
| 23 |
self.info = {}
|
| 24 |
logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
|
| 25 |
-
for _ in range(
|
| 26 |
try:
|
| 27 |
self.es = Elasticsearch(
|
| 28 |
settings.ES["hosts"].split(","),
|
|
@@ -92,7 +94,7 @@ class ESConnection(DocStoreConnection):
|
|
| 92 |
|
| 93 |
def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
|
| 94 |
s = Index(indexName, self.es)
|
| 95 |
-
for i in range(
|
| 96 |
try:
|
| 97 |
return s.exists()
|
| 98 |
except Exception as e:
|
|
@@ -144,9 +146,9 @@ class ESConnection(DocStoreConnection):
|
|
| 144 |
if "minimum_should_match" in m.extra_options:
|
| 145 |
minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
|
| 146 |
bqry.must.append(Q("query_string", fields=m.fields,
|
| 147 |
-
|
| 148 |
-
|
| 149 |
-
|
| 150 |
bqry.boost = 1.0 - vector_similarity_weight
|
| 151 |
|
| 152 |
elif isinstance(m, MatchDenseExpr):
|
|
@@ -180,7 +182,7 @@ class ESConnection(DocStoreConnection):
|
|
| 180 |
q = s.to_dict()
|
| 181 |
logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
|
| 182 |
|
| 183 |
-
for i in range(
|
| 184 |
try:
|
| 185 |
res = self.es.search(index=indexNames,
|
| 186 |
body=q,
|
|
@@ -201,7 +203,7 @@ class ESConnection(DocStoreConnection):
|
|
| 201 |
raise Exception("ESConnection.search timeout.")
|
| 202 |
|
| 203 |
def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
|
| 204 |
-
for i in range(
|
| 205 |
try:
|
| 206 |
res = self.es.get(index=(indexName),
|
| 207 |
id=chunkId, source=True, )
|
|
@@ -233,7 +235,7 @@ class ESConnection(DocStoreConnection):
|
|
| 233 |
operations.append(d_copy)
|
| 234 |
|
| 235 |
res = []
|
| 236 |
-
for _ in range(
|
| 237 |
try:
|
| 238 |
r = self.es.bulk(index=(indexName), operations=operations,
|
| 239 |
refresh=False, timeout="600s")
|
|
@@ -258,7 +260,7 @@ class ESConnection(DocStoreConnection):
|
|
| 258 |
if "id" in condition and isinstance(condition["id"], str):
|
| 259 |
# update specific single document
|
| 260 |
chunkId = condition["id"]
|
| 261 |
-
for i in range(
|
| 262 |
try:
|
| 263 |
self.es.update(index=indexName, id=chunkId, doc=doc)
|
| 264 |
return True
|
|
@@ -326,7 +328,7 @@ class ESConnection(DocStoreConnection):
|
|
| 326 |
else:
|
| 327 |
raise Exception("Condition value must be int, str or list.")
|
| 328 |
logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
|
| 329 |
-
for _ in range(
|
| 330 |
try:
|
| 331 |
res = self.es.delete_by_query(
|
| 332 |
index=indexName,
|
|
@@ -437,7 +439,7 @@ class ESConnection(DocStoreConnection):
|
|
| 437 |
sql = sql.replace(p, r, 1)
|
| 438 |
logging.debug(f"ESConnection.sql to es: {sql}")
|
| 439 |
|
| 440 |
-
for i in range(
|
| 441 |
try:
|
| 442 |
res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format,
|
| 443 |
request_timeout="2s")
|
|
|
|
| 16 |
FusionExpr
|
| 17 |
from rag.nlp import is_english, rag_tokenizer
|
| 18 |
|
| 19 |
+
ATTEMPT_TIME = 2
|
| 20 |
+
|
| 21 |
|
| 22 |
@singleton
|
| 23 |
class ESConnection(DocStoreConnection):
|
| 24 |
def __init__(self):
|
| 25 |
self.info = {}
|
| 26 |
logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
|
| 27 |
+
for _ in range(ATTEMPT_TIME):
|
| 28 |
try:
|
| 29 |
self.es = Elasticsearch(
|
| 30 |
settings.ES["hosts"].split(","),
|
|
|
|
| 94 |
|
| 95 |
def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
|
| 96 |
s = Index(indexName, self.es)
|
| 97 |
+
for i in range(ATTEMPT_TIME):
|
| 98 |
try:
|
| 99 |
return s.exists()
|
| 100 |
except Exception as e:
|
|
|
|
| 146 |
if "minimum_should_match" in m.extra_options:
|
| 147 |
minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%"
|
| 148 |
bqry.must.append(Q("query_string", fields=m.fields,
|
| 149 |
+
type="best_fields", query=m.matching_text,
|
| 150 |
+
minimum_should_match=minimum_should_match,
|
| 151 |
+
boost=1))
|
| 152 |
bqry.boost = 1.0 - vector_similarity_weight
|
| 153 |
|
| 154 |
elif isinstance(m, MatchDenseExpr):
|
|
|
|
| 182 |
q = s.to_dict()
|
| 183 |
logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
|
| 184 |
|
| 185 |
+
for i in range(ATTEMPT_TIME):
|
| 186 |
try:
|
| 187 |
res = self.es.search(index=indexNames,
|
| 188 |
body=q,
|
|
|
|
| 203 |
raise Exception("ESConnection.search timeout.")
|
| 204 |
|
| 205 |
def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
|
| 206 |
+
for i in range(ATTEMPT_TIME):
|
| 207 |
try:
|
| 208 |
res = self.es.get(index=(indexName),
|
| 209 |
id=chunkId, source=True, )
|
|
|
|
| 235 |
operations.append(d_copy)
|
| 236 |
|
| 237 |
res = []
|
| 238 |
+
for _ in range(ATTEMPT_TIME):
|
| 239 |
try:
|
| 240 |
r = self.es.bulk(index=(indexName), operations=operations,
|
| 241 |
refresh=False, timeout="600s")
|
|
|
|
| 260 |
if "id" in condition and isinstance(condition["id"], str):
|
| 261 |
# update specific single document
|
| 262 |
chunkId = condition["id"]
|
| 263 |
+
for i in range(ATTEMPT_TIME):
|
| 264 |
try:
|
| 265 |
self.es.update(index=indexName, id=chunkId, doc=doc)
|
| 266 |
return True
|
|
|
|
| 328 |
else:
|
| 329 |
raise Exception("Condition value must be int, str or list.")
|
| 330 |
logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
|
| 331 |
+
for _ in range(ATTEMPT_TIME):
|
| 332 |
try:
|
| 333 |
res = self.es.delete_by_query(
|
| 334 |
index=indexName,
|
|
|
|
| 439 |
sql = sql.replace(p, r, 1)
|
| 440 |
logging.debug(f"ESConnection.sql to es: {sql}")
|
| 441 |
|
| 442 |
+
for i in range(ATTEMPT_TIME):
|
| 443 |
try:
|
| 444 |
res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format,
|
| 445 |
request_timeout="2s")
|