zhichyu commited on
Commit
e2bab34
·
1 Parent(s): 362fa06

Set Log level by env (#3798)

Browse files

### What problem does this PR solve?

Set Log level by env

### Type of change

- [x] Refactoring

api/db/services/llm_service.py CHANGED
@@ -179,7 +179,7 @@ class TenantLLMService(CommonService):
179
  .where(cls.model.tenant_id == tenant_id, cls.model.llm_factory == tenant_llm.llm_factory, cls.model.llm_name == llm_name)\
180
  .execute()
181
  else:
182
- llm_factory = llm_name.split("/")[0] if "/" in llm_name else llm_name
183
  num = cls.model.create(tenant_id=tenant_id, llm_factory=llm_factory, llm_name=llm_name, used_tokens=used_tokens)
184
  except Exception:
185
  logging.exception("TenantLLMService.increase_usage got exception")
 
179
  .where(cls.model.tenant_id == tenant_id, cls.model.llm_factory == tenant_llm.llm_factory, cls.model.llm_name == llm_name)\
180
  .execute()
181
  else:
182
+ llm_factory = mdlnm.split("@")[1] if "@" in mdlnm else mdlnm
183
  num = cls.model.create(tenant_id=tenant_id, llm_factory=llm_factory, llm_name=llm_name, used_tokens=used_tokens)
184
  except Exception:
185
  logging.exception("TenantLLMService.increase_usage got exception")
api/ragflow_server.py CHANGED
@@ -19,15 +19,10 @@
19
  # beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
20
 
21
  import logging
 
22
  from api.utils.log_utils import initRootLogger
23
- initRootLogger("ragflow_server")
24
- for module in ["pdfminer"]:
25
- module_logger = logging.getLogger(module)
26
- module_logger.setLevel(logging.WARNING)
27
- for module in ["peewee"]:
28
- module_logger = logging.getLogger(module)
29
- module_logger.handlers.clear()
30
- module_logger.propagate = True
31
 
32
  import os
33
  import signal
 
19
  # beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
20
 
21
  import logging
22
+ import os
23
  from api.utils.log_utils import initRootLogger
24
+ LOG_LEVELS = os.environ.get("LOG_LEVELS", "")
25
+ initRootLogger("ragflow_server", LOG_LEVELS)
 
 
 
 
 
 
26
 
27
  import os
28
  import signal
api/utils/log_utils.py CHANGED
@@ -28,7 +28,7 @@ def get_project_base_directory():
28
  )
29
  return PROJECT_BASE
30
 
31
- def initRootLogger(logfile_basename: str, log_level: int = logging.INFO, log_format: str = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s"):
32
  logger = logging.getLogger()
33
  if logger.hasHandlers():
34
  return
@@ -36,19 +36,39 @@ def initRootLogger(logfile_basename: str, log_level: int = logging.INFO, log_for
36
  log_path = os.path.abspath(os.path.join(get_project_base_directory(), "logs", f"{logfile_basename}.log"))
37
 
38
  os.makedirs(os.path.dirname(log_path), exist_ok=True)
39
- logger.setLevel(log_level)
40
  formatter = logging.Formatter(log_format)
41
 
42
  handler1 = RotatingFileHandler(log_path, maxBytes=10*1024*1024, backupCount=5)
43
- handler1.setLevel(log_level)
44
  handler1.setFormatter(formatter)
45
  logger.addHandler(handler1)
46
 
47
  handler2 = logging.StreamHandler()
48
- handler2.setLevel(log_level)
49
  handler2.setFormatter(formatter)
50
  logger.addHandler(handler2)
51
 
52
  logging.captureWarnings(True)
53
- msg = f"{logfile_basename} log path: {log_path}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  logger.info(msg)
 
28
  )
29
  return PROJECT_BASE
30
 
31
+ def initRootLogger(logfile_basename: str, log_levels: str, log_format: str = "%(asctime)-15s %(levelname)-8s %(process)d %(message)s"):
32
  logger = logging.getLogger()
33
  if logger.hasHandlers():
34
  return
 
36
  log_path = os.path.abspath(os.path.join(get_project_base_directory(), "logs", f"{logfile_basename}.log"))
37
 
38
  os.makedirs(os.path.dirname(log_path), exist_ok=True)
 
39
  formatter = logging.Formatter(log_format)
40
 
41
  handler1 = RotatingFileHandler(log_path, maxBytes=10*1024*1024, backupCount=5)
 
42
  handler1.setFormatter(formatter)
43
  logger.addHandler(handler1)
44
 
45
  handler2 = logging.StreamHandler()
 
46
  handler2.setFormatter(formatter)
47
  logger.addHandler(handler2)
48
 
49
  logging.captureWarnings(True)
50
+
51
+ pkg_levels = {}
52
+ for pkg_name_level in log_levels.split(","):
53
+ terms = pkg_name_level.split("=")
54
+ if len(terms)!= 2:
55
+ continue
56
+ pkg_name, pkg_level = terms[0], terms[1]
57
+ pkg_name = pkg_name.strip()
58
+ pkg_level = logging.getLevelName(pkg_level.strip().upper())
59
+ if not isinstance(pkg_level, int):
60
+ pkg_level = logging.INFO
61
+ pkg_levels[pkg_name] = logging.getLevelName(pkg_level)
62
+
63
+ for pkg_name in ['peewee', 'pdfminer']:
64
+ if pkg_name not in pkg_levels:
65
+ pkg_levels[pkg_name] = logging.getLevelName(logging.WARNING)
66
+ if 'root' not in pkg_levels:
67
+ pkg_levels['root'] = logging.getLevelName(logging.INFO)
68
+
69
+ for pkg_name, pkg_level in pkg_levels.items():
70
+ pkg_logger = logging.getLogger(pkg_name)
71
+ pkg_logger.setLevel(pkg_level)
72
+
73
+ msg = f"{logfile_basename} log path: {log_path}, log levels: {pkg_levels}"
74
  logger.info(msg)
docker/.env CHANGED
@@ -129,3 +129,11 @@ TIMEZONE='Asia/Shanghai'
129
  # You can uncomment this line and update the value if you wish to change the 128M file size limit
130
  # MAX_CONTENT_LENGTH=134217728
131
 
 
 
 
 
 
 
 
 
 
129
  # You can uncomment this line and update the value if you wish to change the 128M file size limit
130
  # MAX_CONTENT_LENGTH=134217728
131
 
132
+ # The log level for the RAGFlow's owned packages and imported packages.
133
+ # Available level:
134
+ # - `DEBUG`
135
+ # - `INFO` (default)
136
+ # - `WARNING`
137
+ # - `ERROR`
138
+ # For example, following line changes the log level of `ragflow.es_conn` to `DEBUG`:
139
+ # LOG_LEVELS=ragflow.es_conn=DEBUG
rag/svr/task_executor.py CHANGED
@@ -19,19 +19,14 @@
19
 
20
  import logging
21
  import sys
 
22
 
23
  from api.utils.log_utils import initRootLogger
24
 
25
  CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
26
  CONSUMER_NAME = "task_executor_" + CONSUMER_NO
27
- initRootLogger(CONSUMER_NAME)
28
- for module in ["pdfminer"]:
29
- module_logger = logging.getLogger(module)
30
- module_logger.setLevel(logging.WARNING)
31
- for module in ["peewee"]:
32
- module_logger = logging.getLogger(module)
33
- module_logger.handlers.clear()
34
- module_logger.propagate = True
35
 
36
  from datetime import datetime
37
  import json
 
19
 
20
  import logging
21
  import sys
22
+ import os
23
 
24
  from api.utils.log_utils import initRootLogger
25
 
26
  CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
27
  CONSUMER_NAME = "task_executor_" + CONSUMER_NO
28
+ LOG_LEVELS = os.environ.get("LOG_LEVELS", "")
29
+ initRootLogger(CONSUMER_NAME, LOG_LEVELS)
 
 
 
 
 
 
30
 
31
  from datetime import datetime
32
  import json
rag/utils/es_conn.py CHANGED
@@ -18,12 +18,13 @@ from rag.nlp import is_english, rag_tokenizer
18
 
19
  ATTEMPT_TIME = 2
20
 
 
21
 
22
  @singleton
23
  class ESConnection(DocStoreConnection):
24
  def __init__(self):
25
  self.info = {}
26
- logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
27
  for _ in range(ATTEMPT_TIME):
28
  try:
29
  self.es = Elasticsearch(
@@ -37,25 +38,25 @@ class ESConnection(DocStoreConnection):
37
  self.info = self.es.info()
38
  break
39
  except Exception as e:
40
- logging.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
41
  time.sleep(5)
42
  if not self.es.ping():
43
  msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
44
- logging.error(msg)
45
  raise Exception(msg)
46
  v = self.info.get("version", {"number": "8.11.3"})
47
  v = v["number"].split(".")[0]
48
  if int(v) < 8:
49
  msg = f"Elasticsearch version must be greater than or equal to 8, current version: {v}"
50
- logging.error(msg)
51
  raise Exception(msg)
52
  fp_mapping = os.path.join(get_project_base_directory(), "conf", "mapping.json")
53
  if not os.path.exists(fp_mapping):
54
  msg = f"Elasticsearch mapping file not found at {fp_mapping}"
55
- logging.error(msg)
56
  raise Exception(msg)
57
  self.mapping = json.load(open(fp_mapping, "r"))
58
- logging.info(f"Elasticsearch {settings.ES['hosts']} is healthy.")
59
 
60
  """
61
  Database operations
@@ -82,7 +83,7 @@ class ESConnection(DocStoreConnection):
82
  settings=self.mapping["settings"],
83
  mappings=self.mapping["mappings"])
84
  except Exception:
85
- logging.exception("ESConnection.createIndex error %s" % (indexName))
86
 
87
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
88
  if len(knowledgebaseId) > 0:
@@ -93,7 +94,7 @@ class ESConnection(DocStoreConnection):
93
  except NotFoundError:
94
  pass
95
  except Exception:
96
- logging.exception("ESConnection.deleteIdx error %s" % (indexName))
97
 
98
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
99
  s = Index(indexName, self.es)
@@ -101,7 +102,7 @@ class ESConnection(DocStoreConnection):
101
  try:
102
  return s.exists()
103
  except Exception as e:
104
- logging.exception("ESConnection.indexExist got exception")
105
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
106
  continue
107
  return False
@@ -189,7 +190,7 @@ class ESConnection(DocStoreConnection):
189
  if limit > 0:
190
  s = s[offset:limit]
191
  q = s.to_dict()
192
- logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
193
 
194
  for i in range(ATTEMPT_TIME):
195
  try:
@@ -201,14 +202,14 @@ class ESConnection(DocStoreConnection):
201
  _source=True)
202
  if str(res.get("timed_out", "")).lower() == "true":
203
  raise Exception("Es Timeout.")
204
- logging.debug(f"ESConnection.search {str(indexNames)} res: " + str(res))
205
  return res
206
  except Exception as e:
207
- logging.exception(f"ESConnection.search {str(indexNames)} query: " + str(q))
208
  if str(e).find("Timeout") > 0:
209
  continue
210
  raise e
211
- logging.error("ESConnection.search timeout for 3 times!")
212
  raise Exception("ESConnection.search timeout.")
213
 
214
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
@@ -224,11 +225,11 @@ class ESConnection(DocStoreConnection):
224
  except NotFoundError:
225
  return None
226
  except Exception as e:
227
- logging.exception(f"ESConnection.get({chunkId}) got exception")
228
  if str(e).find("Timeout") > 0:
229
  continue
230
  raise e
231
- logging.error("ESConnection.get timeout for 3 times!")
232
  raise Exception("ESConnection.get timeout.")
233
 
234
  def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
@@ -259,7 +260,7 @@ class ESConnection(DocStoreConnection):
259
  return res
260
  except Exception as e:
261
  res.append(str(e))
262
- logging.warning("ESConnection.insert got exception: " + str(e))
263
  res = []
264
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
265
  res.append(str(e))
@@ -278,7 +279,7 @@ class ESConnection(DocStoreConnection):
278
  self.es.update(index=indexName, id=chunkId, doc=doc)
279
  return True
280
  except Exception as e:
281
- logging.exception(
282
  f"ESConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception")
283
  if str(e).find("Timeout") > 0:
284
  continue
@@ -318,7 +319,7 @@ class ESConnection(DocStoreConnection):
318
  _ = ubq.execute()
319
  return True
320
  except Exception as e:
321
- logging.error("ESConnection.update got exception: " + str(e))
322
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
323
  continue
324
  return False
@@ -340,7 +341,7 @@ class ESConnection(DocStoreConnection):
340
  qry.must.append(Q("term", **{k: v}))
341
  else:
342
  raise Exception("Condition value must be int, str or list.")
343
- logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
344
  for _ in range(ATTEMPT_TIME):
345
  try:
346
  res = self.es.delete_by_query(
@@ -349,7 +350,7 @@ class ESConnection(DocStoreConnection):
349
  refresh=True)
350
  return res["deleted"]
351
  except Exception as e:
352
- logging.warning("ESConnection.delete got exception: " + str(e))
353
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
354
  time.sleep(3)
355
  continue
@@ -433,7 +434,7 @@ class ESConnection(DocStoreConnection):
433
  """
434
 
435
  def sql(self, sql: str, fetch_size: int, format: str):
436
- logging.debug(f"ESConnection.sql get sql: {sql}")
437
  sql = re.sub(r"[ `]+", " ", sql)
438
  sql = sql.replace("%", "")
439
  replaces = []
@@ -450,7 +451,7 @@ class ESConnection(DocStoreConnection):
450
 
451
  for p, r in replaces:
452
  sql = sql.replace(p, r, 1)
453
- logging.debug(f"ESConnection.sql to es: {sql}")
454
 
455
  for i in range(ATTEMPT_TIME):
456
  try:
@@ -458,10 +459,10 @@ class ESConnection(DocStoreConnection):
458
  request_timeout="2s")
459
  return res
460
  except ConnectionTimeout:
461
- logging.exception("ESConnection.sql timeout")
462
  continue
463
  except Exception:
464
- logging.exception("ESConnection.sql got exception")
465
  return None
466
- logging.error("ESConnection.sql timeout for 3 times!")
467
  return None
 
18
 
19
  ATTEMPT_TIME = 2
20
 
21
+ logger = logging.getLogger('ragflow.es_conn')
22
 
23
  @singleton
24
  class ESConnection(DocStoreConnection):
25
  def __init__(self):
26
  self.info = {}
27
+ logger.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
28
  for _ in range(ATTEMPT_TIME):
29
  try:
30
  self.es = Elasticsearch(
 
38
  self.info = self.es.info()
39
  break
40
  except Exception as e:
41
+ logger.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
42
  time.sleep(5)
43
  if not self.es.ping():
44
  msg = f"Elasticsearch {settings.ES['hosts']} didn't become healthy in 120s."
45
+ logger.error(msg)
46
  raise Exception(msg)
47
  v = self.info.get("version", {"number": "8.11.3"})
48
  v = v["number"].split(".")[0]
49
  if int(v) < 8:
50
  msg = f"Elasticsearch version must be greater than or equal to 8, current version: {v}"
51
+ logger.error(msg)
52
  raise Exception(msg)
53
  fp_mapping = os.path.join(get_project_base_directory(), "conf", "mapping.json")
54
  if not os.path.exists(fp_mapping):
55
  msg = f"Elasticsearch mapping file not found at {fp_mapping}"
56
+ logger.error(msg)
57
  raise Exception(msg)
58
  self.mapping = json.load(open(fp_mapping, "r"))
59
+ logger.info(f"Elasticsearch {settings.ES['hosts']} is healthy.")
60
 
61
  """
62
  Database operations
 
83
  settings=self.mapping["settings"],
84
  mappings=self.mapping["mappings"])
85
  except Exception:
86
+ logger.exception("ESConnection.createIndex error %s" % (indexName))
87
 
88
  def deleteIdx(self, indexName: str, knowledgebaseId: str):
89
  if len(knowledgebaseId) > 0:
 
94
  except NotFoundError:
95
  pass
96
  except Exception:
97
+ logger.exception("ESConnection.deleteIdx error %s" % (indexName))
98
 
99
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
100
  s = Index(indexName, self.es)
 
102
  try:
103
  return s.exists()
104
  except Exception as e:
105
+ logger.exception("ESConnection.indexExist got exception")
106
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
107
  continue
108
  return False
 
190
  if limit > 0:
191
  s = s[offset:limit]
192
  q = s.to_dict()
193
+ logger.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q))
194
 
195
  for i in range(ATTEMPT_TIME):
196
  try:
 
202
  _source=True)
203
  if str(res.get("timed_out", "")).lower() == "true":
204
  raise Exception("Es Timeout.")
205
+ logger.debug(f"ESConnection.search {str(indexNames)} res: " + str(res))
206
  return res
207
  except Exception as e:
208
+ logger.exception(f"ESConnection.search {str(indexNames)} query: " + str(q))
209
  if str(e).find("Timeout") > 0:
210
  continue
211
  raise e
212
+ logger.error("ESConnection.search timeout for 3 times!")
213
  raise Exception("ESConnection.search timeout.")
214
 
215
  def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
 
225
  except NotFoundError:
226
  return None
227
  except Exception as e:
228
+ logger.exception(f"ESConnection.get({chunkId}) got exception")
229
  if str(e).find("Timeout") > 0:
230
  continue
231
  raise e
232
+ logger.error("ESConnection.get timeout for 3 times!")
233
  raise Exception("ESConnection.get timeout.")
234
 
235
  def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str) -> list[str]:
 
260
  return res
261
  except Exception as e:
262
  res.append(str(e))
263
+ logger.warning("ESConnection.insert got exception: " + str(e))
264
  res = []
265
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
266
  res.append(str(e))
 
279
  self.es.update(index=indexName, id=chunkId, doc=doc)
280
  return True
281
  except Exception as e:
282
+ logger.exception(
283
  f"ESConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception")
284
  if str(e).find("Timeout") > 0:
285
  continue
 
319
  _ = ubq.execute()
320
  return True
321
  except Exception as e:
322
+ logger.error("ESConnection.update got exception: " + str(e))
323
  if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
324
  continue
325
  return False
 
341
  qry.must.append(Q("term", **{k: v}))
342
  else:
343
  raise Exception("Condition value must be int, str or list.")
344
+ logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
345
  for _ in range(ATTEMPT_TIME):
346
  try:
347
  res = self.es.delete_by_query(
 
350
  refresh=True)
351
  return res["deleted"]
352
  except Exception as e:
353
+ logger.warning("ESConnection.delete got exception: " + str(e))
354
  if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
355
  time.sleep(3)
356
  continue
 
434
  """
435
 
436
  def sql(self, sql: str, fetch_size: int, format: str):
437
+ logger.debug(f"ESConnection.sql get sql: {sql}")
438
  sql = re.sub(r"[ `]+", " ", sql)
439
  sql = sql.replace("%", "")
440
  replaces = []
 
451
 
452
  for p, r in replaces:
453
  sql = sql.replace(p, r, 1)
454
+ logger.debug(f"ESConnection.sql to es: {sql}")
455
 
456
  for i in range(ATTEMPT_TIME):
457
  try:
 
459
  request_timeout="2s")
460
  return res
461
  except ConnectionTimeout:
462
+ logger.exception("ESConnection.sql timeout")
463
  continue
464
  except Exception:
465
+ logger.exception("ESConnection.sql got exception")
466
  return None
467
+ logger.error("ESConnection.sql timeout for 3 times!")
468
  return None
rag/utils/infinity_conn.py CHANGED
@@ -23,6 +23,7 @@ from rag.utils.doc_store_conn import (
23
  OrderByExpr,
24
  )
25
 
 
26
 
27
  def equivalent_condition_to_str(condition: dict) -> str:
28
  assert "_id" not in condition
@@ -68,7 +69,7 @@ class InfinityConnection(DocStoreConnection):
68
  host, port = infinity_uri.split(":")
69
  infinity_uri = infinity.common.NetworkAddress(host, int(port))
70
  self.connPool = None
71
- logging.info(f"Use Infinity {infinity_uri} as the doc engine.")
72
  for _ in range(24):
73
  try:
74
  connPool = ConnectionPool(infinity_uri)
@@ -78,16 +79,16 @@ class InfinityConnection(DocStoreConnection):
78
  self.connPool = connPool
79
  if res.error_code == ErrorCode.OK and res.server_status=="started":
80
  break
81
- logging.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
82
  time.sleep(5)
83
  except Exception as e:
84
- logging.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
85
  time.sleep(5)
86
  if self.connPool is None:
87
  msg = f"Infinity {infinity_uri} didn't become healthy in 120s."
88
- logging.error(msg)
89
  raise Exception(msg)
90
- logging.info(f"Infinity {infinity_uri} is healthy.")
91
 
92
  """
93
  Database operations
@@ -162,7 +163,7 @@ class InfinityConnection(DocStoreConnection):
162
  )
163
  break
164
  self.connPool.release_conn(inf_conn)
165
- logging.info(
166
  f"INFINITY created table {table_name}, vector size {vectorSize}"
167
  )
168
 
@@ -172,7 +173,7 @@ class InfinityConnection(DocStoreConnection):
172
  db_instance = inf_conn.get_database(self.dbName)
173
  db_instance.drop_table(table_name, ConflictType.Ignore)
174
  self.connPool.release_conn(inf_conn)
175
- logging.info(f"INFINITY dropped table {table_name}")
176
 
177
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
178
  table_name = f"{indexName}_{knowledgebaseId}"
@@ -183,7 +184,7 @@ class InfinityConnection(DocStoreConnection):
183
  self.connPool.release_conn(inf_conn)
184
  return True
185
  except Exception as e:
186
- logging.warning(f"INFINITY indexExist {str(e)}")
187
  return False
188
 
189
  """
@@ -230,7 +231,7 @@ class InfinityConnection(DocStoreConnection):
230
  )
231
  if len(filter_cond) != 0:
232
  filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
233
- logging.debug(f"filter_fulltext: {filter_fulltext}")
234
  minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
235
  if isinstance(minimum_should_match, float):
236
  str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
@@ -296,7 +297,7 @@ class InfinityConnection(DocStoreConnection):
296
  df_list.append(kb_res)
297
  self.connPool.release_conn(inf_conn)
298
  res = concat_dataframes(df_list, selectFields)
299
- logging.debug("INFINITY search tables: " + str(table_list))
300
  return res
301
 
302
  def get(
@@ -356,18 +357,18 @@ class InfinityConnection(DocStoreConnection):
356
  str_filter = f"id IN ({str_ids})"
357
  table_instance.delete(str_filter)
358
  # for doc in documents:
359
- # logging.info(f"insert position_list: {doc['position_list']}")
360
- # logging.info(f"InfinityConnection.insert {json.dumps(documents)}")
361
  table_instance.insert(documents)
362
  self.connPool.release_conn(inf_conn)
363
- logging.debug(f"inserted into {table_name} {str_ids}.")
364
  return []
365
 
366
  def update(
367
  self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
368
  ) -> bool:
369
  # if 'position_list' in newValue:
370
- # logging.info(f"upsert position_list: {newValue['position_list']}")
371
  inf_conn = self.connPool.get_conn()
372
  db_instance = inf_conn.get_database(self.dbName)
373
  table_name = f"{indexName}_{knowledgebaseId}"
@@ -388,7 +389,7 @@ class InfinityConnection(DocStoreConnection):
388
  try:
389
  table_instance = db_instance.get_table(table_name)
390
  except Exception:
391
- logging.warning(
392
  f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
393
  )
394
  return 0
 
23
  OrderByExpr,
24
  )
25
 
26
+ logger = logging.getLogger('ragflow.infinity_conn')
27
 
28
  def equivalent_condition_to_str(condition: dict) -> str:
29
  assert "_id" not in condition
 
69
  host, port = infinity_uri.split(":")
70
  infinity_uri = infinity.common.NetworkAddress(host, int(port))
71
  self.connPool = None
72
+ logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
73
  for _ in range(24):
74
  try:
75
  connPool = ConnectionPool(infinity_uri)
 
79
  self.connPool = connPool
80
  if res.error_code == ErrorCode.OK and res.server_status=="started":
81
  break
82
+ logger.warn(f"Infinity status: {res.server_status}. Waiting Infinity {infinity_uri} to be healthy.")
83
  time.sleep(5)
84
  except Exception as e:
85
+ logger.warning(f"{str(e)}. Waiting Infinity {infinity_uri} to be healthy.")
86
  time.sleep(5)
87
  if self.connPool is None:
88
  msg = f"Infinity {infinity_uri} didn't become healthy in 120s."
89
+ logger.error(msg)
90
  raise Exception(msg)
91
+ logger.info(f"Infinity {infinity_uri} is healthy.")
92
 
93
  """
94
  Database operations
 
163
  )
164
  break
165
  self.connPool.release_conn(inf_conn)
166
+ logger.info(
167
  f"INFINITY created table {table_name}, vector size {vectorSize}"
168
  )
169
 
 
173
  db_instance = inf_conn.get_database(self.dbName)
174
  db_instance.drop_table(table_name, ConflictType.Ignore)
175
  self.connPool.release_conn(inf_conn)
176
+ logger.info(f"INFINITY dropped table {table_name}")
177
 
178
  def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
179
  table_name = f"{indexName}_{knowledgebaseId}"
 
184
  self.connPool.release_conn(inf_conn)
185
  return True
186
  except Exception as e:
187
+ logger.warning(f"INFINITY indexExist {str(e)}")
188
  return False
189
 
190
  """
 
231
  )
232
  if len(filter_cond) != 0:
233
  filter_fulltext = f"({filter_cond}) AND {filter_fulltext}"
234
+ logger.debug(f"filter_fulltext: {filter_fulltext}")
235
  minimum_should_match = matchExpr.extra_options.get("minimum_should_match", 0.0)
236
  if isinstance(minimum_should_match, float):
237
  str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
 
297
  df_list.append(kb_res)
298
  self.connPool.release_conn(inf_conn)
299
  res = concat_dataframes(df_list, selectFields)
300
+ logger.debug("INFINITY search tables: " + str(table_list))
301
  return res
302
 
303
  def get(
 
357
  str_filter = f"id IN ({str_ids})"
358
  table_instance.delete(str_filter)
359
  # for doc in documents:
360
+ # logger.info(f"insert position_list: {doc['position_list']}")
361
+ # logger.info(f"InfinityConnection.insert {json.dumps(documents)}")
362
  table_instance.insert(documents)
363
  self.connPool.release_conn(inf_conn)
364
+ logger.debug(f"inserted into {table_name} {str_ids}.")
365
  return []
366
 
367
  def update(
368
  self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
369
  ) -> bool:
370
  # if 'position_list' in newValue:
371
+ # logger.info(f"upsert position_list: {newValue['position_list']}")
372
  inf_conn = self.connPool.get_conn()
373
  db_instance = inf_conn.get_database(self.dbName)
374
  table_name = f"{indexName}_{knowledgebaseId}"
 
389
  try:
390
  table_instance = db_instance.get_table(table_name)
391
  except Exception:
392
+ logger.warning(
393
  f"Skipped deleting `{filter}` from table {table_name} since the table doesn't exist."
394
  )
395
  return 0