Fix logs. Use dict.pop instead of del. Close #3473 (#3484)
Browse files### What problem does this PR solve?
Fix logs. Use dict.pop instead of del.
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- api/apps/api_app.py +1 -2
- api/apps/chunk_app.py +1 -2
- api/apps/sdk/dify_retrieval.py +1 -2
- api/apps/sdk/doc.py +1 -3
- api/apps/user_app.py +1 -2
- api/db/services/document_service.py +1 -1
- api/utils/log_utils.py +1 -0
- graphrag/graph_extractor.py +1 -1
- rag/benchmark.py +1 -2
- rag/nlp/search.py +1 -2
- rag/utils/es_conn.py +7 -5
- rag/utils/minio_conn.py +5 -2
- rag/utils/redis_conn.py +71 -55
api/apps/api_app.py
CHANGED
|
@@ -839,8 +839,7 @@ def retrieval():
|
|
| 839 |
similarity_threshold, vector_similarity_weight, top,
|
| 840 |
doc_ids, rerank_mdl=rerank_mdl)
|
| 841 |
for c in ranks["chunks"]:
|
| 842 |
-
|
| 843 |
-
del c["vector"]
|
| 844 |
return get_json_result(data=ranks)
|
| 845 |
except Exception as e:
|
| 846 |
if str(e).find("not_found") > 0:
|
|
|
|
| 839 |
similarity_threshold, vector_similarity_weight, top,
|
| 840 |
doc_ids, rerank_mdl=rerank_mdl)
|
| 841 |
for c in ranks["chunks"]:
|
| 842 |
+
c.pop("vector", None)
|
|
|
|
| 843 |
return get_json_result(data=ranks)
|
| 844 |
except Exception as e:
|
| 845 |
if str(e).find("not_found") > 0:
|
api/apps/chunk_app.py
CHANGED
|
@@ -287,8 +287,7 @@ def retrieval_test():
|
|
| 287 |
similarity_threshold, vector_similarity_weight, top,
|
| 288 |
doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"))
|
| 289 |
for c in ranks["chunks"]:
|
| 290 |
-
|
| 291 |
-
del c["vector"]
|
| 292 |
|
| 293 |
return get_json_result(data=ranks)
|
| 294 |
except Exception as e:
|
|
|
|
| 287 |
similarity_threshold, vector_similarity_weight, top,
|
| 288 |
doc_ids, rerank_mdl=rerank_mdl, highlight=req.get("highlight"))
|
| 289 |
for c in ranks["chunks"]:
|
| 290 |
+
c.pop("vector", None)
|
|
|
|
| 291 |
|
| 292 |
return get_json_result(data=ranks)
|
| 293 |
except Exception as e:
|
api/apps/sdk/dify_retrieval.py
CHANGED
|
@@ -58,8 +58,7 @@ def retrieval(tenant_id):
|
|
| 58 |
)
|
| 59 |
records = []
|
| 60 |
for c in ranks["chunks"]:
|
| 61 |
-
|
| 62 |
-
del c["vector"]
|
| 63 |
records.append({
|
| 64 |
"content": c["content_ltks"],
|
| 65 |
"score": c["similarity"],
|
|
|
|
| 58 |
)
|
| 59 |
records = []
|
| 60 |
for c in ranks["chunks"]:
|
| 61 |
+
c.pop("vector", None)
|
|
|
|
| 62 |
records.append({
|
| 63 |
"content": c["content_ltks"],
|
| 64 |
"score": c["similarity"],
|
api/apps/sdk/doc.py
CHANGED
|
@@ -37,7 +37,6 @@ from api.db.services.document_service import DocumentService
|
|
| 37 |
from api.db.services.file2document_service import File2DocumentService
|
| 38 |
from api.db.services.file_service import FileService
|
| 39 |
from api.db.services.knowledgebase_service import KnowledgebaseService
|
| 40 |
-
from api import settings
|
| 41 |
from api.utils.api_utils import construct_json_result, get_parser_config
|
| 42 |
from rag.nlp import search
|
| 43 |
from rag.utils import rmSpace
|
|
@@ -1342,8 +1341,7 @@ def retrieval_test(tenant_id):
|
|
| 1342 |
highlight=highlight,
|
| 1343 |
)
|
| 1344 |
for c in ranks["chunks"]:
|
| 1345 |
-
|
| 1346 |
-
del c["vector"]
|
| 1347 |
|
| 1348 |
##rename keys
|
| 1349 |
renamed_chunks = []
|
|
|
|
| 37 |
from api.db.services.file2document_service import File2DocumentService
|
| 38 |
from api.db.services.file_service import FileService
|
| 39 |
from api.db.services.knowledgebase_service import KnowledgebaseService
|
|
|
|
| 40 |
from api.utils.api_utils import construct_json_result, get_parser_config
|
| 41 |
from rag.nlp import search
|
| 42 |
from rag.utils import rmSpace
|
|
|
|
| 1341 |
highlight=highlight,
|
| 1342 |
)
|
| 1343 |
for c in ranks["chunks"]:
|
| 1344 |
+
c.pop("vector", None)
|
|
|
|
| 1345 |
|
| 1346 |
##rename keys
|
| 1347 |
renamed_chunks = []
|
api/apps/user_app.py
CHANGED
|
@@ -696,8 +696,7 @@ def set_tenant_info():
|
|
| 696 |
"""
|
| 697 |
req = request.json
|
| 698 |
try:
|
| 699 |
-
tid = req
|
| 700 |
-
del req["tenant_id"]
|
| 701 |
TenantService.update_by_id(tid, req)
|
| 702 |
return get_json_result(data=True)
|
| 703 |
except Exception as e:
|
|
|
|
| 696 |
"""
|
| 697 |
req = request.json
|
| 698 |
try:
|
| 699 |
+
tid = req.pop("tenant_id")
|
|
|
|
| 700 |
TenantService.update_by_id(tid, req)
|
| 701 |
return get_json_result(data=True)
|
| 702 |
except Exception as e:
|
api/db/services/document_service.py
CHANGED
|
@@ -500,7 +500,7 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id):
|
|
| 500 |
|
| 501 |
STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue())
|
| 502 |
d["img_id"] = "{}-{}".format(kb.id, d["id"])
|
| 503 |
-
|
| 504 |
docs.append(d)
|
| 505 |
|
| 506 |
parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
|
|
|
|
| 500 |
|
| 501 |
STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue())
|
| 502 |
d["img_id"] = "{}-{}".format(kb.id, d["id"])
|
| 503 |
+
d.pop("image", None)
|
| 504 |
docs.append(d)
|
| 505 |
|
| 506 |
parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
|
api/utils/log_utils.py
CHANGED
|
@@ -49,5 +49,6 @@ def initRootLogger(logfile_basename: str, log_level: int = logging.INFO, log_for
|
|
| 49 |
handler2.setFormatter(formatter)
|
| 50 |
logger.addHandler(handler2)
|
| 51 |
|
|
|
|
| 52 |
msg = f"{logfile_basename} log path: {log_path}"
|
| 53 |
logger.info(msg)
|
|
|
|
| 49 |
handler2.setFormatter(formatter)
|
| 50 |
logger.addHandler(handler2)
|
| 51 |
|
| 52 |
+
logging.captureWarnings(True)
|
| 53 |
msg = f"{logfile_basename} log path: {log_path}"
|
| 54 |
logger.info(msg)
|
graphrag/graph_extractor.py
CHANGED
|
@@ -9,7 +9,7 @@ import logging
|
|
| 9 |
import numbers
|
| 10 |
import re
|
| 11 |
import traceback
|
| 12 |
-
from typing import Any, Callable
|
| 13 |
from dataclasses import dataclass
|
| 14 |
import tiktoken
|
| 15 |
from graphrag.graph_prompt import GRAPH_EXTRACTION_PROMPT, CONTINUE_PROMPT, LOOP_PROMPT
|
|
|
|
| 9 |
import numbers
|
| 10 |
import re
|
| 11 |
import traceback
|
| 12 |
+
from typing import Any, Callable, Mapping
|
| 13 |
from dataclasses import dataclass
|
| 14 |
import tiktoken
|
| 15 |
from graphrag.graph_prompt import GRAPH_EXTRACTION_PROMPT, CONTINUE_PROMPT, LOOP_PROMPT
|
rag/benchmark.py
CHANGED
|
@@ -59,8 +59,7 @@ class Benchmark:
|
|
| 59 |
del qrels[query]
|
| 60 |
continue
|
| 61 |
for c in ranks["chunks"]:
|
| 62 |
-
|
| 63 |
-
del c["vector"]
|
| 64 |
run[query][c["chunk_id"]] = c["similarity"]
|
| 65 |
return run
|
| 66 |
|
|
|
|
| 59 |
del qrels[query]
|
| 60 |
continue
|
| 61 |
for c in ranks["chunks"]:
|
| 62 |
+
c.pop("vector", None)
|
|
|
|
| 63 |
run[query][c["chunk_id"]] = c["similarity"]
|
| 64 |
return run
|
| 65 |
|
rag/nlp/search.py
CHANGED
|
@@ -106,8 +106,7 @@ class Dealer:
|
|
| 106 |
# If result is empty, try again with lower min_match
|
| 107 |
if total == 0:
|
| 108 |
matchText, _ = self.qryr.question(qst, min_match=0.1)
|
| 109 |
-
|
| 110 |
-
del filters["doc_ids"]
|
| 111 |
matchDense.extra_options["similarity"] = 0.17
|
| 112 |
res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids)
|
| 113 |
total=self.dataStore.getTotal(res)
|
|
|
|
| 106 |
# If result is empty, try again with lower min_match
|
| 107 |
if total == 0:
|
| 108 |
matchText, _ = self.qryr.question(qst, min_match=0.1)
|
| 109 |
+
filters.pop("doc_ids", None)
|
|
|
|
| 110 |
matchDense.extra_options["similarity"] = 0.17
|
| 111 |
res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids)
|
| 112 |
total=self.dataStore.getTotal(res)
|
rag/utils/es_conn.py
CHANGED
|
@@ -5,7 +5,7 @@ import time
|
|
| 5 |
import os
|
| 6 |
|
| 7 |
import copy
|
| 8 |
-
from elasticsearch import Elasticsearch
|
| 9 |
from elasticsearch_dsl import UpdateByQuery, Q, Search, Index
|
| 10 |
from elastic_transport import ConnectionTimeout
|
| 11 |
from rag import settings
|
|
@@ -82,7 +82,9 @@ class ESConnection(DocStoreConnection):
|
|
| 82 |
|
| 83 |
def deleteIdx(self, indexName: str, knowledgebaseId: str):
|
| 84 |
try:
|
| 85 |
-
|
|
|
|
|
|
|
| 86 |
except Exception:
|
| 87 |
logging.exception("ES delete index error %s" % (indexName))
|
| 88 |
|
|
@@ -146,6 +148,7 @@ class ESConnection(DocStoreConnection):
|
|
| 146 |
similarity=similarity,
|
| 147 |
)
|
| 148 |
|
|
|
|
| 149 |
if condition:
|
| 150 |
if not bqry:
|
| 151 |
bqry = Q("bool", must=[])
|
|
@@ -226,8 +229,7 @@ class ESConnection(DocStoreConnection):
|
|
| 226 |
assert "_id" not in d
|
| 227 |
assert "id" in d
|
| 228 |
d_copy = copy.deepcopy(d)
|
| 229 |
-
meta_id = d_copy
|
| 230 |
-
del d_copy["id"]
|
| 231 |
operations.append(
|
| 232 |
{"index": {"_index": indexName, "_id": meta_id}})
|
| 233 |
operations.append(d_copy)
|
|
@@ -254,7 +256,7 @@ class ESConnection(DocStoreConnection):
|
|
| 254 |
|
| 255 |
def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
|
| 256 |
doc = copy.deepcopy(newValue)
|
| 257 |
-
|
| 258 |
if "id" in condition and isinstance(condition["id"], str):
|
| 259 |
# update specific single document
|
| 260 |
chunkId = condition["id"]
|
|
|
|
| 5 |
import os
|
| 6 |
|
| 7 |
import copy
|
| 8 |
+
from elasticsearch import Elasticsearch, NotFoundError
|
| 9 |
from elasticsearch_dsl import UpdateByQuery, Q, Search, Index
|
| 10 |
from elastic_transport import ConnectionTimeout
|
| 11 |
from rag import settings
|
|
|
|
| 82 |
|
| 83 |
def deleteIdx(self, indexName: str, knowledgebaseId: str):
|
| 84 |
try:
|
| 85 |
+
self.es.indices.delete(index=indexName, allow_no_indices=True)
|
| 86 |
+
except NotFoundError:
|
| 87 |
+
pass
|
| 88 |
except Exception:
|
| 89 |
logging.exception("ES delete index error %s" % (indexName))
|
| 90 |
|
|
|
|
| 148 |
similarity=similarity,
|
| 149 |
)
|
| 150 |
|
| 151 |
+
condition["kb_id"] = knowledgebaseIds
|
| 152 |
if condition:
|
| 153 |
if not bqry:
|
| 154 |
bqry = Q("bool", must=[])
|
|
|
|
| 229 |
assert "_id" not in d
|
| 230 |
assert "id" in d
|
| 231 |
d_copy = copy.deepcopy(d)
|
| 232 |
+
meta_id = d_copy.pop("id", "")
|
|
|
|
| 233 |
operations.append(
|
| 234 |
{"index": {"_index": indexName, "_id": meta_id}})
|
| 235 |
operations.append(d_copy)
|
|
|
|
| 256 |
|
| 257 |
def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
|
| 258 |
doc = copy.deepcopy(newValue)
|
| 259 |
+
doc.pop("id", None)
|
| 260 |
if "id" in condition and isinstance(condition["id"], str):
|
| 261 |
# update specific single document
|
| 262 |
chunkId = condition["id"]
|
rag/utils/minio_conn.py
CHANGED
|
@@ -78,10 +78,13 @@ class RAGFlowMinio(object):
|
|
| 78 |
|
| 79 |
def obj_exist(self, bucket, fnm):
|
| 80 |
try:
|
| 81 |
-
if self.conn.
|
|
|
|
|
|
|
|
|
|
| 82 |
return False
|
| 83 |
except Exception:
|
| 84 |
-
logging.exception(f"
|
| 85 |
return False
|
| 86 |
|
| 87 |
|
|
|
|
| 78 |
|
| 79 |
def obj_exist(self, bucket, fnm):
|
| 80 |
try:
|
| 81 |
+
if not self.conn.bucket_exists(bucket):
|
| 82 |
+
return False
|
| 83 |
+
if self.conn.stat_object(bucket, fnm):
|
| 84 |
+
return True
|
| 85 |
return False
|
| 86 |
except Exception:
|
| 87 |
+
logging.exception(f"RAGFlowMinio.obj_exist {bucket}/{fnm} got exception:")
|
| 88 |
return False
|
| 89 |
|
| 90 |
|
rag/utils/redis_conn.py
CHANGED
|
@@ -12,7 +12,7 @@ class Payload:
|
|
| 12 |
self.__queue_name = queue_name
|
| 13 |
self.__group_name = group_name
|
| 14 |
self.__msg_id = msg_id
|
| 15 |
-
self.__message = json.loads(message[
|
| 16 |
|
| 17 |
def ack(self):
|
| 18 |
try:
|
|
@@ -35,19 +35,20 @@ class RedisDB:
|
|
| 35 |
|
| 36 |
def __open__(self):
|
| 37 |
try:
|
| 38 |
-
self.REDIS = redis.StrictRedis(
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
|
|
|
|
|
|
| 43 |
except Exception:
|
| 44 |
logging.warning("Redis can't be connected.")
|
| 45 |
return self.REDIS
|
| 46 |
|
| 47 |
def health(self):
|
| 48 |
-
|
| 49 |
self.REDIS.ping()
|
| 50 |
-
a, b =
|
| 51 |
self.REDIS.set(a, b, 3)
|
| 52 |
|
| 53 |
if self.REDIS.get(a) == b:
|
|
@@ -57,19 +58,21 @@ class RedisDB:
|
|
| 57 |
return self.REDIS is not None
|
| 58 |
|
| 59 |
def exist(self, k):
|
| 60 |
-
if not self.REDIS:
|
|
|
|
| 61 |
try:
|
| 62 |
return self.REDIS.exists(k)
|
| 63 |
except Exception as e:
|
| 64 |
-
logging.warning("
|
| 65 |
self.__open__()
|
| 66 |
|
| 67 |
def get(self, k):
|
| 68 |
-
if not self.REDIS:
|
|
|
|
| 69 |
try:
|
| 70 |
return self.REDIS.get(k)
|
| 71 |
except Exception as e:
|
| 72 |
-
logging.warning("
|
| 73 |
self.__open__()
|
| 74 |
|
| 75 |
def set_obj(self, k, obj, exp=3600):
|
|
@@ -77,7 +80,7 @@ class RedisDB:
|
|
| 77 |
self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
|
| 78 |
return True
|
| 79 |
except Exception as e:
|
| 80 |
-
logging.warning("
|
| 81 |
self.__open__()
|
| 82 |
return False
|
| 83 |
|
|
@@ -86,7 +89,7 @@ class RedisDB:
|
|
| 86 |
self.REDIS.set(k, v, exp)
|
| 87 |
return True
|
| 88 |
except Exception as e:
|
| 89 |
-
logging.warning("
|
| 90 |
self.__open__()
|
| 91 |
return False
|
| 92 |
|
|
@@ -95,7 +98,7 @@ class RedisDB:
|
|
| 95 |
self.REDIS.sadd(key, member)
|
| 96 |
return True
|
| 97 |
except Exception as e:
|
| 98 |
-
logging.warning("
|
| 99 |
self.__open__()
|
| 100 |
return False
|
| 101 |
|
|
@@ -104,7 +107,7 @@ class RedisDB:
|
|
| 104 |
self.REDIS.srem(key, member)
|
| 105 |
return True
|
| 106 |
except Exception as e:
|
| 107 |
-
logging.warning("
|
| 108 |
self.__open__()
|
| 109 |
return False
|
| 110 |
|
|
@@ -113,7 +116,9 @@ class RedisDB:
|
|
| 113 |
res = self.REDIS.smembers(key)
|
| 114 |
return res
|
| 115 |
except Exception as e:
|
| 116 |
-
logging.warning(
|
|
|
|
|
|
|
| 117 |
self.__open__()
|
| 118 |
return None
|
| 119 |
|
|
@@ -122,7 +127,7 @@ class RedisDB:
|
|
| 122 |
self.REDIS.zadd(key, {member: score})
|
| 123 |
return True
|
| 124 |
except Exception as e:
|
| 125 |
-
logging.warning("
|
| 126 |
self.__open__()
|
| 127 |
return False
|
| 128 |
|
|
@@ -131,7 +136,7 @@ class RedisDB:
|
|
| 131 |
res = self.REDIS.zcount(key, min, max)
|
| 132 |
return res
|
| 133 |
except Exception as e:
|
| 134 |
-
logging.warning("
|
| 135 |
self.__open__()
|
| 136 |
return 0
|
| 137 |
|
|
@@ -140,7 +145,7 @@ class RedisDB:
|
|
| 140 |
res = self.REDIS.zpopmin(key, count)
|
| 141 |
return res
|
| 142 |
except Exception as e:
|
| 143 |
-
logging.warning("
|
| 144 |
self.__open__()
|
| 145 |
return None
|
| 146 |
|
|
@@ -149,7 +154,9 @@ class RedisDB:
|
|
| 149 |
res = self.REDIS.zrangebyscore(key, min, max)
|
| 150 |
return res
|
| 151 |
except Exception as e:
|
| 152 |
-
logging.warning(
|
|
|
|
|
|
|
| 153 |
self.__open__()
|
| 154 |
return None
|
| 155 |
|
|
@@ -160,7 +167,9 @@ class RedisDB:
|
|
| 160 |
pipeline.execute()
|
| 161 |
return True
|
| 162 |
except Exception as e:
|
| 163 |
-
logging.warning(
|
|
|
|
|
|
|
| 164 |
self.__open__()
|
| 165 |
return False
|
| 166 |
|
|
@@ -170,23 +179,22 @@ class RedisDB:
|
|
| 170 |
payload = {"message": json.dumps(message)}
|
| 171 |
pipeline = self.REDIS.pipeline()
|
| 172 |
pipeline.xadd(queue, payload)
|
| 173 |
-
#pipeline.expire(queue, exp)
|
| 174 |
pipeline.execute()
|
| 175 |
return True
|
| 176 |
-
except Exception:
|
| 177 |
-
logging.exception(
|
|
|
|
|
|
|
| 178 |
return False
|
| 179 |
|
| 180 |
-
def queue_consumer(
|
|
|
|
|
|
|
| 181 |
try:
|
| 182 |
group_info = self.REDIS.xinfo_groups(queue_name)
|
| 183 |
if not any(e["name"] == group_name for e in group_info):
|
| 184 |
-
self.REDIS.xgroup_create(
|
| 185 |
-
queue_name,
|
| 186 |
-
group_name,
|
| 187 |
-
id="0",
|
| 188 |
-
mkstream=True
|
| 189 |
-
)
|
| 190 |
args = {
|
| 191 |
"groupname": group_name,
|
| 192 |
"consumername": consumer_name,
|
|
@@ -202,10 +210,15 @@ class RedisDB:
|
|
| 202 |
res = Payload(self.REDIS, queue_name, group_name, msg_id, payload)
|
| 203 |
return res
|
| 204 |
except Exception as e:
|
| 205 |
-
if
|
| 206 |
pass
|
| 207 |
else:
|
| 208 |
-
logging.exception(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 209 |
return None
|
| 210 |
|
| 211 |
def get_unacked_for(self, consumer_name, queue_name, group_name):
|
|
@@ -213,36 +226,39 @@ class RedisDB:
|
|
| 213 |
group_info = self.REDIS.xinfo_groups(queue_name)
|
| 214 |
if not any(e["name"] == group_name for e in group_info):
|
| 215 |
return
|
| 216 |
-
pendings = self.REDIS.xpending_range(
|
| 217 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 218 |
msg_id = pendings[0]["message_id"]
|
| 219 |
msg = self.REDIS.xrange(queue_name, min=msg_id, count=1)
|
| 220 |
_, payload = msg[0]
|
| 221 |
return Payload(self.REDIS, queue_name, group_name, msg_id, payload)
|
| 222 |
except Exception as e:
|
| 223 |
-
if
|
| 224 |
return
|
| 225 |
-
logging.exception(
|
|
|
|
|
|
|
| 226 |
self.__open__()
|
| 227 |
|
| 228 |
-
def queue_info(self, queue, group_name) -> dict:
|
| 229 |
-
|
| 230 |
-
|
| 231 |
-
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
|
| 235 |
-
|
| 236 |
-
|
|
|
|
| 237 |
return None
|
| 238 |
|
| 239 |
-
def queue_head(self, queue) -> int:
|
| 240 |
-
for _ in range(3):
|
| 241 |
-
try:
|
| 242 |
-
ent = self.REDIS.xrange(queue, count=1)
|
| 243 |
-
return ent[0]
|
| 244 |
-
except Exception:
|
| 245 |
-
logging.exception("queue_head" + str(queue) + " got exception")
|
| 246 |
-
return 0
|
| 247 |
|
| 248 |
REDIS_CONN = RedisDB()
|
|
|
|
| 12 |
self.__queue_name = queue_name
|
| 13 |
self.__group_name = group_name
|
| 14 |
self.__msg_id = msg_id
|
| 15 |
+
self.__message = json.loads(message["message"])
|
| 16 |
|
| 17 |
def ack(self):
|
| 18 |
try:
|
|
|
|
| 35 |
|
| 36 |
def __open__(self):
|
| 37 |
try:
|
| 38 |
+
self.REDIS = redis.StrictRedis(
|
| 39 |
+
host=self.config["host"].split(":")[0],
|
| 40 |
+
port=int(self.config.get("host", ":6379").split(":")[1]),
|
| 41 |
+
db=int(self.config.get("db", 1)),
|
| 42 |
+
password=self.config.get("password"),
|
| 43 |
+
decode_responses=True,
|
| 44 |
+
)
|
| 45 |
except Exception:
|
| 46 |
logging.warning("Redis can't be connected.")
|
| 47 |
return self.REDIS
|
| 48 |
|
| 49 |
def health(self):
|
|
|
|
| 50 |
self.REDIS.ping()
|
| 51 |
+
a, b = "xx", "yy"
|
| 52 |
self.REDIS.set(a, b, 3)
|
| 53 |
|
| 54 |
if self.REDIS.get(a) == b:
|
|
|
|
| 58 |
return self.REDIS is not None
|
| 59 |
|
| 60 |
def exist(self, k):
|
| 61 |
+
if not self.REDIS:
|
| 62 |
+
return
|
| 63 |
try:
|
| 64 |
return self.REDIS.exists(k)
|
| 65 |
except Exception as e:
|
| 66 |
+
logging.warning("RedisDB.exist " + str(k) + " got exception: " + str(e))
|
| 67 |
self.__open__()
|
| 68 |
|
| 69 |
def get(self, k):
|
| 70 |
+
if not self.REDIS:
|
| 71 |
+
return
|
| 72 |
try:
|
| 73 |
return self.REDIS.get(k)
|
| 74 |
except Exception as e:
|
| 75 |
+
logging.warning("RedisDB.get " + str(k) + " got exception: " + str(e))
|
| 76 |
self.__open__()
|
| 77 |
|
| 78 |
def set_obj(self, k, obj, exp=3600):
|
|
|
|
| 80 |
self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
|
| 81 |
return True
|
| 82 |
except Exception as e:
|
| 83 |
+
logging.warning("RedisDB.set_obj " + str(k) + " got exception: " + str(e))
|
| 84 |
self.__open__()
|
| 85 |
return False
|
| 86 |
|
|
|
|
| 89 |
self.REDIS.set(k, v, exp)
|
| 90 |
return True
|
| 91 |
except Exception as e:
|
| 92 |
+
logging.warning("RedisDB.set " + str(k) + " got exception: " + str(e))
|
| 93 |
self.__open__()
|
| 94 |
return False
|
| 95 |
|
|
|
|
| 98 |
self.REDIS.sadd(key, member)
|
| 99 |
return True
|
| 100 |
except Exception as e:
|
| 101 |
+
logging.warning("RedisDB.sadd " + str(key) + " got exception: " + str(e))
|
| 102 |
self.__open__()
|
| 103 |
return False
|
| 104 |
|
|
|
|
| 107 |
self.REDIS.srem(key, member)
|
| 108 |
return True
|
| 109 |
except Exception as e:
|
| 110 |
+
logging.warning("RedisDB.srem " + str(key) + " got exception: " + str(e))
|
| 111 |
self.__open__()
|
| 112 |
return False
|
| 113 |
|
|
|
|
| 116 |
res = self.REDIS.smembers(key)
|
| 117 |
return res
|
| 118 |
except Exception as e:
|
| 119 |
+
logging.warning(
|
| 120 |
+
"RedisDB.smembers " + str(key) + " got exception: " + str(e)
|
| 121 |
+
)
|
| 122 |
self.__open__()
|
| 123 |
return None
|
| 124 |
|
|
|
|
| 127 |
self.REDIS.zadd(key, {member: score})
|
| 128 |
return True
|
| 129 |
except Exception as e:
|
| 130 |
+
logging.warning("RedisDB.zadd " + str(key) + " got exception: " + str(e))
|
| 131 |
self.__open__()
|
| 132 |
return False
|
| 133 |
|
|
|
|
| 136 |
res = self.REDIS.zcount(key, min, max)
|
| 137 |
return res
|
| 138 |
except Exception as e:
|
| 139 |
+
logging.warning("RedisDB.zcount " + str(key) + " got exception: " + str(e))
|
| 140 |
self.__open__()
|
| 141 |
return 0
|
| 142 |
|
|
|
|
| 145 |
res = self.REDIS.zpopmin(key, count)
|
| 146 |
return res
|
| 147 |
except Exception as e:
|
| 148 |
+
logging.warning("RedisDB.zpopmin " + str(key) + " got exception: " + str(e))
|
| 149 |
self.__open__()
|
| 150 |
return None
|
| 151 |
|
|
|
|
| 154 |
res = self.REDIS.zrangebyscore(key, min, max)
|
| 155 |
return res
|
| 156 |
except Exception as e:
|
| 157 |
+
logging.warning(
|
| 158 |
+
"RedisDB.zrangebyscore " + str(key) + " got exception: " + str(e)
|
| 159 |
+
)
|
| 160 |
self.__open__()
|
| 161 |
return None
|
| 162 |
|
|
|
|
| 167 |
pipeline.execute()
|
| 168 |
return True
|
| 169 |
except Exception as e:
|
| 170 |
+
logging.warning(
|
| 171 |
+
"RedisDB.transaction " + str(key) + " got exception: " + str(e)
|
| 172 |
+
)
|
| 173 |
self.__open__()
|
| 174 |
return False
|
| 175 |
|
|
|
|
| 179 |
payload = {"message": json.dumps(message)}
|
| 180 |
pipeline = self.REDIS.pipeline()
|
| 181 |
pipeline.xadd(queue, payload)
|
| 182 |
+
# pipeline.expire(queue, exp)
|
| 183 |
pipeline.execute()
|
| 184 |
return True
|
| 185 |
+
except Exception as e:
|
| 186 |
+
logging.exception(
|
| 187 |
+
"RedisDB.queue_product " + str(queue) + " got exception: " + str(e)
|
| 188 |
+
)
|
| 189 |
return False
|
| 190 |
|
| 191 |
+
def queue_consumer(
|
| 192 |
+
self, queue_name, group_name, consumer_name, msg_id=b">"
|
| 193 |
+
) -> Payload:
|
| 194 |
try:
|
| 195 |
group_info = self.REDIS.xinfo_groups(queue_name)
|
| 196 |
if not any(e["name"] == group_name for e in group_info):
|
| 197 |
+
self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 198 |
args = {
|
| 199 |
"groupname": group_name,
|
| 200 |
"consumername": consumer_name,
|
|
|
|
| 210 |
res = Payload(self.REDIS, queue_name, group_name, msg_id, payload)
|
| 211 |
return res
|
| 212 |
except Exception as e:
|
| 213 |
+
if "key" in str(e):
|
| 214 |
pass
|
| 215 |
else:
|
| 216 |
+
logging.exception(
|
| 217 |
+
"RedisDB.queue_consumer "
|
| 218 |
+
+ str(queue_name)
|
| 219 |
+
+ " got exception: "
|
| 220 |
+
+ str(e)
|
| 221 |
+
)
|
| 222 |
return None
|
| 223 |
|
| 224 |
def get_unacked_for(self, consumer_name, queue_name, group_name):
|
|
|
|
| 226 |
group_info = self.REDIS.xinfo_groups(queue_name)
|
| 227 |
if not any(e["name"] == group_name for e in group_info):
|
| 228 |
return
|
| 229 |
+
pendings = self.REDIS.xpending_range(
|
| 230 |
+
queue_name,
|
| 231 |
+
group_name,
|
| 232 |
+
min=0,
|
| 233 |
+
max=10000000000000,
|
| 234 |
+
count=1,
|
| 235 |
+
consumername=consumer_name,
|
| 236 |
+
)
|
| 237 |
+
if not pendings:
|
| 238 |
+
return
|
| 239 |
msg_id = pendings[0]["message_id"]
|
| 240 |
msg = self.REDIS.xrange(queue_name, min=msg_id, count=1)
|
| 241 |
_, payload = msg[0]
|
| 242 |
return Payload(self.REDIS, queue_name, group_name, msg_id, payload)
|
| 243 |
except Exception as e:
|
| 244 |
+
if "key" in str(e):
|
| 245 |
return
|
| 246 |
+
logging.exception(
|
| 247 |
+
"RedisDB.get_unacked_for " + consumer_name + " got exception: " + str(e)
|
| 248 |
+
)
|
| 249 |
self.__open__()
|
| 250 |
|
| 251 |
+
def queue_info(self, queue, group_name) -> dict | None:
|
| 252 |
+
try:
|
| 253 |
+
groups = self.REDIS.xinfo_groups(queue)
|
| 254 |
+
for group in groups:
|
| 255 |
+
if group["name"] == group_name:
|
| 256 |
+
return group
|
| 257 |
+
except Exception as e:
|
| 258 |
+
logging.warning(
|
| 259 |
+
"RedisDB.queue_info " + str(queue) + " got exception: " + str(e)
|
| 260 |
+
)
|
| 261 |
return None
|
| 262 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 263 |
|
| 264 |
REDIS_CONN = RedisDB()
|