KevinHuSh
commited on
Commit
·
14174de
1
Parent(s):
7c07000
fix #258 task_executor occupy cpu too much (#288)
Browse files### What problem does this PR solve?
Issue link:#285
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- rag/nlp/query.py +1 -1
- rag/svr/task_executor.py +20 -17
- rag/utils/minio_conn.py +1 -1
rag/nlp/query.py
CHANGED
|
@@ -62,7 +62,7 @@ class EsQueryer:
|
|
| 62 |
return Q("bool",
|
| 63 |
must=Q("query_string", fields=self.flds,
|
| 64 |
type="best_fields", query=" ".join(q),
|
| 65 |
-
boost=1
|
| 66 |
), tks
|
| 67 |
|
| 68 |
def needQieqie(tk):
|
|
|
|
| 62 |
return Q("bool",
|
| 63 |
must=Q("query_string", fields=self.flds,
|
| 64 |
type="best_fields", query=" ".join(q),
|
| 65 |
+
boost=1)#, minimum_should_match=min_match)
|
| 66 |
), tks
|
| 67 |
|
| 68 |
def needQieqie(tk):
|
rag/svr/task_executor.py
CHANGED
|
@@ -21,16 +21,15 @@ import hashlib
|
|
| 21 |
import copy
|
| 22 |
import re
|
| 23 |
import sys
|
|
|
|
| 24 |
import traceback
|
| 25 |
from functools import partial
|
| 26 |
-
import signal
|
| 27 |
-
from contextlib import contextmanager
|
| 28 |
from rag.settings import database_logger
|
| 29 |
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
|
| 30 |
-
|
| 31 |
import numpy as np
|
| 32 |
from elasticsearch_dsl import Q
|
| 33 |
-
|
| 34 |
from api.db.services.task_service import TaskService
|
| 35 |
from rag.utils import ELASTICSEARCH
|
| 36 |
from rag.utils import MINIO
|
|
@@ -92,23 +91,17 @@ def set_progress(task_id, from_page=0, to_page=-1,
|
|
| 92 |
def collect(comm, mod, tm):
|
| 93 |
tasks = TaskService.get_tasks(tm, mod, comm)
|
| 94 |
if len(tasks) == 0:
|
|
|
|
| 95 |
return pd.DataFrame()
|
| 96 |
tasks = pd.DataFrame(tasks)
|
| 97 |
mtm = tasks["update_time"].max()
|
| 98 |
cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
|
| 99 |
return tasks
|
| 100 |
|
| 101 |
-
@contextmanager
|
| 102 |
-
def timeout(time):
|
| 103 |
-
# Register a function to raise a TimeoutError on the signal.
|
| 104 |
-
signal.signal(signal.SIGALRM, raise_timeout)
|
| 105 |
-
# Schedule the signal to be sent after ``time``.
|
| 106 |
-
signal.alarm(time)
|
| 107 |
-
yield
|
| 108 |
-
|
| 109 |
|
| 110 |
-
def
|
| 111 |
-
|
|
|
|
| 112 |
|
| 113 |
|
| 114 |
def build(row):
|
|
@@ -124,24 +117,34 @@ def build(row):
|
|
| 124 |
row["from_page"],
|
| 125 |
row["to_page"])
|
| 126 |
chunker = FACTORY[row["parser_id"].lower()]
|
|
|
|
| 127 |
try:
|
| 128 |
st = timer()
|
| 129 |
-
|
| 130 |
-
|
|
|
|
|
|
|
|
|
|
| 131 |
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
|
| 132 |
to_page=row["to_page"], lang=row["language"], callback=callback,
|
| 133 |
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
|
| 134 |
cron_logger.info(
|
| 135 |
"Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 136 |
except Exception as e:
|
| 137 |
if re.search("(No such file|not found)", str(e)):
|
| 138 |
callback(-1, "Can not find file <%s>" % row["name"])
|
| 139 |
else:
|
| 140 |
callback(-1, f"Internal server error: %s" %
|
| 141 |
str(e).replace("'", ""))
|
|
|
|
| 142 |
traceback.print_exc()
|
| 143 |
|
| 144 |
-
cron_logger.
|
| 145 |
"Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
|
| 146 |
|
| 147 |
return
|
|
|
|
| 21 |
import copy
|
| 22 |
import re
|
| 23 |
import sys
|
| 24 |
+
import time
|
| 25 |
import traceback
|
| 26 |
from functools import partial
|
|
|
|
|
|
|
| 27 |
from rag.settings import database_logger
|
| 28 |
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
|
| 29 |
+
from multiprocessing import Pool
|
| 30 |
import numpy as np
|
| 31 |
from elasticsearch_dsl import Q
|
| 32 |
+
from multiprocessing.context import TimeoutError
|
| 33 |
from api.db.services.task_service import TaskService
|
| 34 |
from rag.utils import ELASTICSEARCH
|
| 35 |
from rag.utils import MINIO
|
|
|
|
| 91 |
def collect(comm, mod, tm):
|
| 92 |
tasks = TaskService.get_tasks(tm, mod, comm)
|
| 93 |
if len(tasks) == 0:
|
| 94 |
+
time.sleep(1)
|
| 95 |
return pd.DataFrame()
|
| 96 |
tasks = pd.DataFrame(tasks)
|
| 97 |
mtm = tasks["update_time"].max()
|
| 98 |
cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
|
| 99 |
return tasks
|
| 100 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 101 |
|
| 102 |
+
def get_minio_binary(bucket, name):
|
| 103 |
+
global MINIO
|
| 104 |
+
return MINIO.get(bucket, name)
|
| 105 |
|
| 106 |
|
| 107 |
def build(row):
|
|
|
|
| 117 |
row["from_page"],
|
| 118 |
row["to_page"])
|
| 119 |
chunker = FACTORY[row["parser_id"].lower()]
|
| 120 |
+
pool = Pool(processes=1)
|
| 121 |
try:
|
| 122 |
st = timer()
|
| 123 |
+
thr = pool.apply_async(get_minio_binary, args=(row["kb_id"], row["location"]))
|
| 124 |
+
binary = thr.get(timeout=90)
|
| 125 |
+
pool.terminate()
|
| 126 |
+
cron_logger.info(
|
| 127 |
+
"From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
|
| 128 |
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
|
| 129 |
to_page=row["to_page"], lang=row["language"], callback=callback,
|
| 130 |
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
|
| 131 |
cron_logger.info(
|
| 132 |
"Chunkking({}) {}/{}".format(timer()-st, row["location"], row["name"]))
|
| 133 |
+
except TimeoutError as e:
|
| 134 |
+
callback(-1, f"Internal server error: Fetch file timeout. Could you try it again.")
|
| 135 |
+
cron_logger.error(
|
| 136 |
+
"Chunkking {}/{}: Fetch file timeout.".format(row["location"], row["name"]))
|
| 137 |
+
return
|
| 138 |
except Exception as e:
|
| 139 |
if re.search("(No such file|not found)", str(e)):
|
| 140 |
callback(-1, "Can not find file <%s>" % row["name"])
|
| 141 |
else:
|
| 142 |
callback(-1, f"Internal server error: %s" %
|
| 143 |
str(e).replace("'", ""))
|
| 144 |
+
pool.terminate()
|
| 145 |
traceback.print_exc()
|
| 146 |
|
| 147 |
+
cron_logger.error(
|
| 148 |
"Chunkking {}/{}: {}".format(row["location"], row["name"], str(e)))
|
| 149 |
|
| 150 |
return
|
rag/utils/minio_conn.py
CHANGED
|
@@ -58,7 +58,7 @@ class HuMinio(object):
|
|
| 58 |
|
| 59 |
|
| 60 |
def get(self, bucket, fnm):
|
| 61 |
-
for _ in range(
|
| 62 |
try:
|
| 63 |
r = self.conn.get_object(bucket, fnm)
|
| 64 |
return r.read()
|
|
|
|
| 58 |
|
| 59 |
|
| 60 |
def get(self, bucket, fnm):
|
| 61 |
+
for _ in range(1):
|
| 62 |
try:
|
| 63 |
r = self.conn.get_object(bucket, fnm)
|
| 64 |
return r.read()
|