optimize srv broker and executor logic (#630)
Browse files### What problem does this PR solve?
Optimize task broker and executor for reduce memory usage and deployment
complexity.
### Type of change
- [x] Performance Improvement
- [x] Refactoring
### Change Log
- Enhance redis utils for message queue(use stream)
- Modify task broker logic via message queue (1.get parse event from
message queue 2.use ThreadPoolExecutor async executor )
- Modify the table column name of document and task (process_duation ->
process_duration maybe just a spelling mistake)
- Reformat some code style(just what i see)
- Add requirement_dev.txt for developer
- Add redis container on docker compose
---------
Co-authored-by: Kevin Hu <[email protected]>
- .gitignore +2 -0
- api/apps/document_app.py +12 -1
- api/db/services/document_service.py +58 -5
- api/db/services/task_service.py +71 -26
- api/ragflow_server.py +17 -0
- conf/service_conf.yaml +4 -0
- docker/.env +2 -0
- docker/README.md +1 -1
- docker/docker-compose-base.yml +13 -21
- docker/entrypoint.sh +4 -19
- docker/service_conf.yaml +5 -1
- docs/conversation_api.md +1 -1
- rag/llm/embedding_model.py +1 -2
- rag/nlp/query.py +1 -2
- rag/settings.py +6 -0
- rag/svr/task_broker.py +0 -189
- rag/svr/task_executor.py +22 -29
- rag/utils/redis_conn.py +67 -2
- requirements.txt +1 -1
- requirements_dev.txt +126 -0
.gitignore
CHANGED
@@ -27,3 +27,5 @@ Cargo.lock
|
|
27 |
|
28 |
# Exclude the log folder
|
29 |
docker/ragflow-logs/
|
|
|
|
|
|
27 |
|
28 |
# Exclude the log folder
|
29 |
docker/ragflow-logs/
|
30 |
+
/flask_session
|
31 |
+
/logs
|
api/apps/document_app.py
CHANGED
@@ -14,7 +14,6 @@
|
|
14 |
# limitations under the License
|
15 |
#
|
16 |
|
17 |
-
import base64
|
18 |
import os
|
19 |
import pathlib
|
20 |
import re
|
@@ -24,8 +23,10 @@ from elasticsearch_dsl import Q
|
|
24 |
from flask import request
|
25 |
from flask_login import login_required, current_user
|
26 |
|
|
|
27 |
from api.db.services.file2document_service import File2DocumentService
|
28 |
from api.db.services.file_service import FileService
|
|
|
29 |
from rag.nlp import search
|
30 |
from rag.utils.es_conn import ELASTICSEARCH
|
31 |
from api.db.services import duplicate_name
|
@@ -37,7 +38,9 @@ from api.db.services.document_service import DocumentService
|
|
37 |
from api.settings import RetCode
|
38 |
from api.utils.api_utils import get_json_result
|
39 |
from rag.utils.minio_conn import MINIO
|
|
|
40 |
from api.utils.file_utils import filename_type, thumbnail
|
|
|
41 |
|
42 |
|
43 |
@manager.route('/upload', methods=['POST'])
|
@@ -277,6 +280,14 @@ def run():
|
|
277 |
return get_data_error_result(retmsg="Tenant not found!")
|
278 |
ELASTICSEARCH.deleteByQuery(
|
279 |
Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
280 |
|
281 |
return get_json_result(data=True)
|
282 |
except Exception as e:
|
|
|
14 |
# limitations under the License
|
15 |
#
|
16 |
|
|
|
17 |
import os
|
18 |
import pathlib
|
19 |
import re
|
|
|
23 |
from flask import request
|
24 |
from flask_login import login_required, current_user
|
25 |
|
26 |
+
from api.db.db_models import Task
|
27 |
from api.db.services.file2document_service import File2DocumentService
|
28 |
from api.db.services.file_service import FileService
|
29 |
+
from api.db.services.task_service import TaskService, queue_tasks
|
30 |
from rag.nlp import search
|
31 |
from rag.utils.es_conn import ELASTICSEARCH
|
32 |
from api.db.services import duplicate_name
|
|
|
38 |
from api.settings import RetCode
|
39 |
from api.utils.api_utils import get_json_result
|
40 |
from rag.utils.minio_conn import MINIO
|
41 |
+
from rag.utils.redis_conn import REDIS_CONN
|
42 |
from api.utils.file_utils import filename_type, thumbnail
|
43 |
+
from rag.settings import SVR_QUEUE_NAME
|
44 |
|
45 |
|
46 |
@manager.route('/upload', methods=['POST'])
|
|
|
280 |
return get_data_error_result(retmsg="Tenant not found!")
|
281 |
ELASTICSEARCH.deleteByQuery(
|
282 |
Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
|
283 |
+
|
284 |
+
if str(req["run"]) == TaskStatus.RUNNING.value:
|
285 |
+
TaskService.filter_delete([Task.doc_id == id])
|
286 |
+
e, doc = DocumentService.get_by_id(id)
|
287 |
+
doc = doc.to_dict()
|
288 |
+
doc["tenant_id"] = tenant_id
|
289 |
+
bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"])
|
290 |
+
queue_tasks(doc, bucket, name)
|
291 |
|
292 |
return get_json_result(data=True)
|
293 |
except Exception as e:
|
api/db/services/document_service.py
CHANGED
@@ -13,17 +13,18 @@
|
|
13 |
# See the License for the specific language governing permissions and
|
14 |
# limitations under the License.
|
15 |
#
|
16 |
-
|
17 |
-
|
18 |
from elasticsearch_dsl import Q
|
19 |
|
20 |
-
from api.
|
|
|
21 |
from rag.utils.es_conn import ELASTICSEARCH
|
22 |
from rag.utils.minio_conn import MINIO
|
23 |
from rag.nlp import search
|
24 |
|
25 |
from api.db import FileType, TaskStatus
|
26 |
-
from api.db.db_models import DB, Knowledgebase, Tenant
|
27 |
from api.db.db_models import Document
|
28 |
from api.db.services.common_service import CommonService
|
29 |
from api.db.services.knowledgebase_service import KnowledgebaseService
|
@@ -92,7 +93,7 @@ class DocumentService(CommonService):
|
|
92 |
|
93 |
@classmethod
|
94 |
@DB.connection_context()
|
95 |
-
def get_newly_uploaded(cls
|
96 |
fields = [
|
97 |
cls.model.id,
|
98 |
cls.model.kb_id,
|
@@ -196,3 +197,55 @@ class DocumentService(CommonService):
|
|
196 |
on=(Knowledgebase.id == cls.model.kb_id)).where(
|
197 |
Knowledgebase.tenant_id == tenant_id)
|
198 |
return len(docs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
13 |
# See the License for the specific language governing permissions and
|
14 |
# limitations under the License.
|
15 |
#
|
16 |
+
import random
|
17 |
+
from datetime import datetime
|
18 |
from elasticsearch_dsl import Q
|
19 |
|
20 |
+
from api.settings import stat_logger
|
21 |
+
from api.utils import current_timestamp, get_format_time
|
22 |
from rag.utils.es_conn import ELASTICSEARCH
|
23 |
from rag.utils.minio_conn import MINIO
|
24 |
from rag.nlp import search
|
25 |
|
26 |
from api.db import FileType, TaskStatus
|
27 |
+
from api.db.db_models import DB, Knowledgebase, Tenant, Task
|
28 |
from api.db.db_models import Document
|
29 |
from api.db.services.common_service import CommonService
|
30 |
from api.db.services.knowledgebase_service import KnowledgebaseService
|
|
|
93 |
|
94 |
@classmethod
|
95 |
@DB.connection_context()
|
96 |
+
def get_newly_uploaded(cls):
|
97 |
fields = [
|
98 |
cls.model.id,
|
99 |
cls.model.kb_id,
|
|
|
197 |
on=(Knowledgebase.id == cls.model.kb_id)).where(
|
198 |
Knowledgebase.tenant_id == tenant_id)
|
199 |
return len(docs)
|
200 |
+
|
201 |
+
@classmethod
|
202 |
+
@DB.connection_context()
|
203 |
+
def begin2parse(cls, docid):
|
204 |
+
cls.update_by_id(
|
205 |
+
docid, {"progress": random.random() * 1 / 100.,
|
206 |
+
"progress_msg": "Task dispatched...",
|
207 |
+
"process_begin_at": get_format_time()
|
208 |
+
})
|
209 |
+
|
210 |
+
@classmethod
|
211 |
+
@DB.connection_context()
|
212 |
+
def update_progress(cls):
|
213 |
+
docs = cls.get_unfinished_docs()
|
214 |
+
for d in docs:
|
215 |
+
try:
|
216 |
+
tsks = Task.query(doc_id=d["id"], order_by=Task.create_time)
|
217 |
+
if not tsks:
|
218 |
+
continue
|
219 |
+
msg = []
|
220 |
+
prg = 0
|
221 |
+
finished = True
|
222 |
+
bad = 0
|
223 |
+
status = TaskStatus.RUNNING.value
|
224 |
+
for t in tsks:
|
225 |
+
if 0 <= t.progress < 1:
|
226 |
+
finished = False
|
227 |
+
prg += t.progress if t.progress >= 0 else 0
|
228 |
+
msg.append(t.progress_msg)
|
229 |
+
if t.progress == -1:
|
230 |
+
bad += 1
|
231 |
+
prg /= len(tsks)
|
232 |
+
if finished and bad:
|
233 |
+
prg = -1
|
234 |
+
status = TaskStatus.FAIL.value
|
235 |
+
elif finished:
|
236 |
+
status = TaskStatus.DONE.value
|
237 |
+
|
238 |
+
msg = "\n".join(msg)
|
239 |
+
info = {
|
240 |
+
"process_duation": datetime.timestamp(
|
241 |
+
datetime.now()) -
|
242 |
+
d["process_begin_at"].timestamp(),
|
243 |
+
"run": status}
|
244 |
+
if prg != 0:
|
245 |
+
info["progress"] = prg
|
246 |
+
if msg:
|
247 |
+
info["progress_msg"] = msg
|
248 |
+
cls.update_by_id(d["id"], info)
|
249 |
+
except Exception as e:
|
250 |
+
stat_logger.error("fetch task exception:" + str(e))
|
251 |
+
|
api/db/services/task_service.py
CHANGED
@@ -15,21 +15,24 @@
|
|
15 |
#
|
16 |
import random
|
17 |
|
18 |
-
from
|
|
|
|
|
19 |
from api.db.db_models import DB, File2Document, File
|
20 |
from api.db import StatusEnum, FileType, TaskStatus
|
21 |
from api.db.db_models import Task, Document, Knowledgebase, Tenant
|
22 |
from api.db.services.common_service import CommonService
|
23 |
from api.db.services.document_service import DocumentService
|
24 |
-
from api.utils import current_timestamp
|
|
|
|
|
|
|
25 |
|
26 |
|
27 |
class TaskService(CommonService):
|
28 |
model = Task
|
29 |
|
30 |
-
|
31 |
-
@DB.connection_context()
|
32 |
-
def get_tasks(cls, tm, mod=0, comm=1, items_per_page=1, takeit=True):
|
33 |
fields = [
|
34 |
cls.model.id,
|
35 |
cls.model.doc_id,
|
@@ -48,28 +51,18 @@ class TaskService(CommonService):
|
|
48 |
Tenant.img2txt_id,
|
49 |
Tenant.asr_id,
|
50 |
cls.model.update_time]
|
51 |
-
|
52 |
-
|
53 |
-
|
54 |
-
|
55 |
-
|
56 |
-
|
57 |
-
|
58 |
-
Document.run == TaskStatus.RUNNING.value,
|
59 |
-
~(Document.type == FileType.VIRTUAL.value),
|
60 |
-
cls.model.progress == 0,
|
61 |
-
#cls.model.update_time >= tm,
|
62 |
-
#(Expression(cls.model.create_time, "%%", comm) == mod)
|
63 |
-
)\
|
64 |
-
.order_by(cls.model.update_time.asc())\
|
65 |
-
.paginate(0, items_per_page)
|
66 |
-
docs = list(docs.dicts())
|
67 |
-
if not docs: return []
|
68 |
-
if not takeit: return docs
|
69 |
|
70 |
-
|
71 |
-
|
72 |
-
|
|
|
73 |
|
74 |
@classmethod
|
75 |
@DB.connection_context()
|
@@ -112,3 +105,55 @@ class TaskService(CommonService):
|
|
112 |
if "progress" in info:
|
113 |
cls.model.update(progress=info["progress"]).where(
|
114 |
cls.model.id == id).execute()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
15 |
#
|
16 |
import random
|
17 |
|
18 |
+
from api.db.db_utils import bulk_insert_into_db
|
19 |
+
from deepdoc.parser import PdfParser
|
20 |
+
from peewee import JOIN
|
21 |
from api.db.db_models import DB, File2Document, File
|
22 |
from api.db import StatusEnum, FileType, TaskStatus
|
23 |
from api.db.db_models import Task, Document, Knowledgebase, Tenant
|
24 |
from api.db.services.common_service import CommonService
|
25 |
from api.db.services.document_service import DocumentService
|
26 |
+
from api.utils import current_timestamp, get_uuid
|
27 |
+
from deepdoc.parser.excel_parser import RAGFlowExcelParser
|
28 |
+
from rag.settings import MINIO, SVR_QUEUE_NAME
|
29 |
+
from rag.utils.redis_conn import REDIS_CONN
|
30 |
|
31 |
|
32 |
class TaskService(CommonService):
|
33 |
model = Task
|
34 |
|
35 |
+
def get_tasks(cls, task_id):
|
|
|
|
|
36 |
fields = [
|
37 |
cls.model.id,
|
38 |
cls.model.doc_id,
|
|
|
51 |
Tenant.img2txt_id,
|
52 |
Tenant.asr_id,
|
53 |
cls.model.update_time]
|
54 |
+
docs = cls.model.select(*fields) \
|
55 |
+
.join(Document, on=(cls.model.doc_id == Document.id)) \
|
56 |
+
.join(Knowledgebase, on=(Document.kb_id == Knowledgebase.id)) \
|
57 |
+
.join(Tenant, on=(Knowledgebase.tenant_id == Tenant.id)) \
|
58 |
+
.where(cls.model.id == task_id)
|
59 |
+
docs = list(docs.dicts())
|
60 |
+
if not docs: return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
61 |
|
62 |
+
cls.model.update(progress_msg=cls.model.progress_msg + "\n" + "Task has been received.",
|
63 |
+
progress=random.random() / 10.).where(
|
64 |
+
cls.model.id == docs[0]["id"]).execute()
|
65 |
+
return docs
|
66 |
|
67 |
@classmethod
|
68 |
@DB.connection_context()
|
|
|
105 |
if "progress" in info:
|
106 |
cls.model.update(progress=info["progress"]).where(
|
107 |
cls.model.id == id).execute()
|
108 |
+
|
109 |
+
|
110 |
+
def queue_tasks(doc, bucket, name):
|
111 |
+
def new_task():
|
112 |
+
nonlocal doc
|
113 |
+
return {
|
114 |
+
"id": get_uuid(),
|
115 |
+
"doc_id": doc["id"]
|
116 |
+
}
|
117 |
+
tsks = []
|
118 |
+
|
119 |
+
if doc["type"] == FileType.PDF.value:
|
120 |
+
file_bin = MINIO.get(bucket, name)
|
121 |
+
do_layout = doc["parser_config"].get("layout_recognize", True)
|
122 |
+
pages = PdfParser.total_page_number(doc["name"], file_bin)
|
123 |
+
page_size = doc["parser_config"].get("task_page_size", 12)
|
124 |
+
if doc["parser_id"] == "paper":
|
125 |
+
page_size = doc["parser_config"].get("task_page_size", 22)
|
126 |
+
if doc["parser_id"] == "one":
|
127 |
+
page_size = 1000000000
|
128 |
+
if not do_layout:
|
129 |
+
page_size = 1000000000
|
130 |
+
page_ranges = doc["parser_config"].get("pages")
|
131 |
+
if not page_ranges:
|
132 |
+
page_ranges = [(1, 100000)]
|
133 |
+
for s, e in page_ranges:
|
134 |
+
s -= 1
|
135 |
+
s = max(0, s)
|
136 |
+
e = min(e - 1, pages)
|
137 |
+
for p in range(s, e, page_size):
|
138 |
+
task = new_task()
|
139 |
+
task["from_page"] = p
|
140 |
+
task["to_page"] = min(p + page_size, e)
|
141 |
+
tsks.append(task)
|
142 |
+
|
143 |
+
elif doc["parser_id"] == "table":
|
144 |
+
file_bin = MINIO.get(bucket, name)
|
145 |
+
rn = RAGFlowExcelParser.row_number(
|
146 |
+
doc["name"], file_bin)
|
147 |
+
for i in range(0, rn, 3000):
|
148 |
+
task = new_task()
|
149 |
+
task["from_page"] = i
|
150 |
+
task["to_page"] = min(i + 3000, rn)
|
151 |
+
tsks.append(task)
|
152 |
+
else:
|
153 |
+
tsks.append(new_task())
|
154 |
+
|
155 |
+
for t in tsks:
|
156 |
+
REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=t)
|
157 |
+
|
158 |
+
bulk_insert_into_db(Task, tsks, True)
|
159 |
+
DocumentService.begin2parse(doc["id"])
|
api/ragflow_server.py
CHANGED
@@ -18,10 +18,14 @@ import logging
|
|
18 |
import os
|
19 |
import signal
|
20 |
import sys
|
|
|
21 |
import traceback
|
|
|
|
|
22 |
from werkzeug.serving import run_simple
|
23 |
from api.apps import app
|
24 |
from api.db.runtime_config import RuntimeConfig
|
|
|
25 |
from api.settings import (
|
26 |
HOST, HTTP_PORT, access_logger, database_logger, stat_logger,
|
27 |
)
|
@@ -31,6 +35,16 @@ from api.db.db_models import init_database_tables as init_web_db
|
|
31 |
from api.db.init_data import init_web_data
|
32 |
from api.versions import get_versions
|
33 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
34 |
if __name__ == '__main__':
|
35 |
print("""
|
36 |
____ ______ __
|
@@ -71,6 +85,9 @@ if __name__ == '__main__':
|
|
71 |
peewee_logger.addHandler(database_logger.handlers[0])
|
72 |
peewee_logger.setLevel(database_logger.level)
|
73 |
|
|
|
|
|
|
|
74 |
# start http server
|
75 |
try:
|
76 |
stat_logger.info("RAG Flow http server start...")
|
|
|
18 |
import os
|
19 |
import signal
|
20 |
import sys
|
21 |
+
import time
|
22 |
import traceback
|
23 |
+
from concurrent.futures import ThreadPoolExecutor
|
24 |
+
|
25 |
from werkzeug.serving import run_simple
|
26 |
from api.apps import app
|
27 |
from api.db.runtime_config import RuntimeConfig
|
28 |
+
from api.db.services.document_service import DocumentService
|
29 |
from api.settings import (
|
30 |
HOST, HTTP_PORT, access_logger, database_logger, stat_logger,
|
31 |
)
|
|
|
35 |
from api.db.init_data import init_web_data
|
36 |
from api.versions import get_versions
|
37 |
|
38 |
+
|
39 |
+
def update_progress():
|
40 |
+
while True:
|
41 |
+
time.sleep(1)
|
42 |
+
try:
|
43 |
+
DocumentService.update_progress()
|
44 |
+
except Exception as e:
|
45 |
+
stat_logger.error("update_progress exception:" + str(e))
|
46 |
+
|
47 |
+
|
48 |
if __name__ == '__main__':
|
49 |
print("""
|
50 |
____ ______ __
|
|
|
85 |
peewee_logger.addHandler(database_logger.handlers[0])
|
86 |
peewee_logger.setLevel(database_logger.level)
|
87 |
|
88 |
+
thr = ThreadPoolExecutor(max_workers=1)
|
89 |
+
thr.submit(update_progress)
|
90 |
+
|
91 |
# start http server
|
92 |
try:
|
93 |
stat_logger.info("RAG Flow http server start...")
|
conf/service_conf.yaml
CHANGED
@@ -15,6 +15,10 @@ minio:
|
|
15 |
host: 'minio:9000'
|
16 |
es:
|
17 |
hosts: 'http://es01:9200'
|
|
|
|
|
|
|
|
|
18 |
user_default_llm:
|
19 |
factory: 'Tongyi-Qianwen'
|
20 |
api_key: 'sk-xxxxxxxxxxxxx'
|
|
|
15 |
host: 'minio:9000'
|
16 |
es:
|
17 |
hosts: 'http://es01:9200'
|
18 |
+
redis:
|
19 |
+
db: 1
|
20 |
+
password: 'infini_rag_flow'
|
21 |
+
host: 'redis:6379'
|
22 |
user_default_llm:
|
23 |
factory: 'Tongyi-Qianwen'
|
24 |
api_key: 'sk-xxxxxxxxxxxxx'
|
docker/.env
CHANGED
@@ -25,6 +25,8 @@ MINIO_PORT=9000
|
|
25 |
MINIO_USER=rag_flow
|
26 |
MINIO_PASSWORD=infini_rag_flow
|
27 |
|
|
|
|
|
28 |
SVR_HTTP_PORT=9380
|
29 |
|
30 |
RAGFLOW_VERSION=latest
|
|
|
25 |
MINIO_USER=rag_flow
|
26 |
MINIO_PASSWORD=infini_rag_flow
|
27 |
|
28 |
+
REDIS_PASSWORD=infini_rag_flow
|
29 |
+
|
30 |
SVR_HTTP_PORT=9380
|
31 |
|
32 |
RAGFLOW_VERSION=latest
|
docker/README.md
CHANGED
@@ -50,7 +50,7 @@ The serving port of mysql inside the container. The modification should be synch
|
|
50 |
The max database connection.
|
51 |
|
52 |
### stale_timeout
|
53 |
-
The timeout
|
54 |
|
55 |
## minio
|
56 |
|
|
|
50 |
The max database connection.
|
51 |
|
52 |
### stale_timeout
|
53 |
+
The timeout duration in seconds.
|
54 |
|
55 |
## minio
|
56 |
|
docker/docker-compose-base.yml
CHANGED
@@ -29,24 +29,6 @@ services:
|
|
29 |
- ragflow
|
30 |
restart: always
|
31 |
|
32 |
-
#kibana:
|
33 |
-
# depends_on:
|
34 |
-
# es01:
|
35 |
-
# condition: service_healthy
|
36 |
-
# image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
|
37 |
-
# container_name: ragflow-kibana
|
38 |
-
# volumes:
|
39 |
-
# - kibanadata:/usr/share/kibana/data
|
40 |
-
# ports:
|
41 |
-
# - ${KIBANA_PORT}:5601
|
42 |
-
# environment:
|
43 |
-
# - SERVERNAME=kibana
|
44 |
-
# - ELASTICSEARCH_HOSTS=http://es01:9200
|
45 |
-
# - TZ=${TIMEZONE}
|
46 |
-
# mem_limit: ${MEM_LIMIT}
|
47 |
-
# networks:
|
48 |
-
# - ragflow
|
49 |
-
|
50 |
mysql:
|
51 |
image: mysql:5.7.18
|
52 |
container_name: ragflow-mysql
|
@@ -74,7 +56,6 @@ services:
|
|
74 |
retries: 3
|
75 |
restart: always
|
76 |
|
77 |
-
|
78 |
minio:
|
79 |
image: quay.io/minio/minio:RELEASE.2023-12-20T01-00-02Z
|
80 |
container_name: ragflow-minio
|
@@ -92,16 +73,27 @@ services:
|
|
92 |
- ragflow
|
93 |
restart: always
|
94 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
95 |
|
96 |
volumes:
|
97 |
esdata01:
|
98 |
driver: local
|
99 |
-
# kibanadata:
|
100 |
-
# driver: local
|
101 |
mysql_data:
|
102 |
driver: local
|
103 |
minio_data:
|
104 |
driver: local
|
|
|
|
|
105 |
|
106 |
networks:
|
107 |
ragflow:
|
|
|
29 |
- ragflow
|
30 |
restart: always
|
31 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
32 |
mysql:
|
33 |
image: mysql:5.7.18
|
34 |
container_name: ragflow-mysql
|
|
|
56 |
retries: 3
|
57 |
restart: always
|
58 |
|
|
|
59 |
minio:
|
60 |
image: quay.io/minio/minio:RELEASE.2023-12-20T01-00-02Z
|
61 |
container_name: ragflow-minio
|
|
|
73 |
- ragflow
|
74 |
restart: always
|
75 |
|
76 |
+
redis:
|
77 |
+
image: redis:7.2.4
|
78 |
+
container_name: ragflow-redis
|
79 |
+
command: redis-server --requirepass ${REDIS_PASSWORD} --maxmemory 128mb --maxmemory-policy allkeys-lru
|
80 |
+
volumes:
|
81 |
+
- redis_data:/data
|
82 |
+
networks:
|
83 |
+
- ragflow
|
84 |
+
restart: always
|
85 |
+
|
86 |
+
|
87 |
|
88 |
volumes:
|
89 |
esdata01:
|
90 |
driver: local
|
|
|
|
|
91 |
mysql_data:
|
92 |
driver: local
|
93 |
minio_data:
|
94 |
driver: local
|
95 |
+
redis_data:
|
96 |
+
driver: local
|
97 |
|
98 |
networks:
|
99 |
ragflow:
|
docker/entrypoint.sh
CHANGED
@@ -12,29 +12,14 @@ function task_exe(){
|
|
12 |
done
|
13 |
}
|
14 |
|
15 |
-
function watch_broker(){
|
16 |
-
while [ 1 -eq 1 ];do
|
17 |
-
C=`ps aux|grep "task_broker.py"|grep -v grep|wc -l`;
|
18 |
-
if [ $C -lt 1 ];then
|
19 |
-
$PY rag/svr/task_broker.py &
|
20 |
-
fi
|
21 |
-
sleep 5;
|
22 |
-
done
|
23 |
-
}
|
24 |
-
|
25 |
-
function task_bro(){
|
26 |
-
watch_broker;
|
27 |
-
}
|
28 |
-
|
29 |
-
task_bro &
|
30 |
-
|
31 |
WS=1
|
32 |
for ((i=0;i<WS;i++))
|
33 |
do
|
34 |
task_exe $i $WS &
|
35 |
done
|
36 |
|
37 |
-
while [ 1 -eq
|
38 |
-
$PY api/ragflow_server.py
|
39 |
done
|
40 |
-
|
|
|
|
12 |
done
|
13 |
}
|
14 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
15 |
WS=1
|
16 |
for ((i=0;i<WS;i++))
|
17 |
do
|
18 |
task_exe $i $WS &
|
19 |
done
|
20 |
|
21 |
+
while [ 1 -eq q ];do
|
22 |
+
$PY api/ragflow_server.py
|
23 |
done
|
24 |
+
|
25 |
+
wait;
|
docker/service_conf.yaml
CHANGED
@@ -15,6 +15,10 @@ minio:
|
|
15 |
host: 'minio:9000'
|
16 |
es:
|
17 |
hosts: 'http://es01:9200'
|
|
|
|
|
|
|
|
|
18 |
user_default_llm:
|
19 |
factory: 'Tongyi-Qianwen'
|
20 |
api_key: 'sk-xxxxxxxxxxxxx'
|
@@ -34,4 +38,4 @@ authentication:
|
|
34 |
permission:
|
35 |
switch: false
|
36 |
component: false
|
37 |
-
dataset: false
|
|
|
15 |
host: 'minio:9000'
|
16 |
es:
|
17 |
hosts: 'http://es01:9200'
|
18 |
+
redis:
|
19 |
+
db: 1
|
20 |
+
password: 'infini_rag_flow'
|
21 |
+
host: 'redis:6379'
|
22 |
user_default_llm:
|
23 |
factory: 'Tongyi-Qianwen'
|
24 |
api_key: 'sk-xxxxxxxxxxxxx'
|
|
|
38 |
permission:
|
39 |
switch: false
|
40 |
component: false
|
41 |
+
dataset: false
|
docs/conversation_api.md
CHANGED
@@ -361,4 +361,4 @@ This is usually used when upload a file to.
|
|
361 |
"retmsg": "success"
|
362 |
}
|
363 |
|
364 |
-
```
|
|
|
361 |
"retmsg": "success"
|
362 |
}
|
363 |
|
364 |
+
```
|
rag/llm/embedding_model.py
CHANGED
@@ -95,8 +95,7 @@ class OpenAIEmbed(Base):
|
|
95 |
def encode(self, texts: list, batch_size=32):
|
96 |
res = self.client.embeddings.create(input=texts,
|
97 |
model=self.model_name)
|
98 |
-
return np.array([d.embedding for d in res.data]
|
99 |
-
), res.usage.total_tokens
|
100 |
|
101 |
def encode_queries(self, text):
|
102 |
res = self.client.embeddings.create(input=[text],
|
|
|
95 |
def encode(self, texts: list, batch_size=32):
|
96 |
res = self.client.embeddings.create(input=texts,
|
97 |
model=self.model_name)
|
98 |
+
return np.array([d.embedding for d in res.data]), res.usage.total_tokens
|
|
|
99 |
|
100 |
def encode_queries(self, text):
|
101 |
res = self.client.embeddings.create(input=[text],
|
rag/nlp/query.py
CHANGED
@@ -9,12 +9,11 @@ from elasticsearch_dsl import Q
|
|
9 |
|
10 |
from rag.nlp import rag_tokenizer, term_weight, synonym
|
11 |
|
12 |
-
|
13 |
class EsQueryer:
|
14 |
def __init__(self, es):
|
15 |
self.tw = term_weight.Dealer()
|
16 |
self.es = es
|
17 |
-
self.syn = synonym.Dealer(
|
18 |
self.flds = ["ask_tks^10", "ask_small_tks"]
|
19 |
|
20 |
@staticmethod
|
|
|
9 |
|
10 |
from rag.nlp import rag_tokenizer, term_weight, synonym
|
11 |
|
|
|
12 |
class EsQueryer:
|
13 |
def __init__(self, es):
|
14 |
self.tw = term_weight.Dealer()
|
15 |
self.es = es
|
16 |
+
self.syn = synonym.Dealer()
|
17 |
self.flds = ["ask_tks^10", "ask_small_tks"]
|
18 |
|
19 |
@staticmethod
|
rag/settings.py
CHANGED
@@ -47,3 +47,9 @@ cron_logger = getLogger("cron_logger")
|
|
47 |
cron_logger.setLevel(20)
|
48 |
chunk_logger = getLogger("chunk_logger")
|
49 |
database_logger = getLogger("database")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
47 |
cron_logger.setLevel(20)
|
48 |
chunk_logger = getLogger("chunk_logger")
|
49 |
database_logger = getLogger("database")
|
50 |
+
|
51 |
+
SVR_QUEUE_NAME = "rag_flow_svr_queue"
|
52 |
+
SVR_QUEUE_RETENTION = 60*60
|
53 |
+
SVR_QUEUE_MAX_LEN = 1024
|
54 |
+
SVR_CONSUMER_NAME = "rag_flow_svr_consumer"
|
55 |
+
SVR_CONSUMER_GROUP_NAME = "rag_flow_svr_consumer_group"
|
rag/svr/task_broker.py
DELETED
@@ -1,189 +0,0 @@
|
|
1 |
-
#
|
2 |
-
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
3 |
-
#
|
4 |
-
# Licensed under the Apache License, Version 2.0 (the "License");
|
5 |
-
# you may not use this file except in compliance with the License.
|
6 |
-
# You may obtain a copy of the License at
|
7 |
-
#
|
8 |
-
# http://www.apache.org/licenses/LICENSE-2.0
|
9 |
-
#
|
10 |
-
# Unless required by applicable law or agreed to in writing, software
|
11 |
-
# distributed under the License is distributed on an "AS IS" BASIS,
|
12 |
-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
13 |
-
# See the License for the specific language governing permissions and
|
14 |
-
# limitations under the License.
|
15 |
-
#
|
16 |
-
import logging
|
17 |
-
import os
|
18 |
-
import time
|
19 |
-
import random
|
20 |
-
from datetime import datetime
|
21 |
-
from api.db.db_models import Task
|
22 |
-
from api.db.db_utils import bulk_insert_into_db
|
23 |
-
from api.db.services.file2document_service import File2DocumentService
|
24 |
-
from api.db.services.task_service import TaskService
|
25 |
-
from deepdoc.parser import PdfParser
|
26 |
-
from deepdoc.parser.excel_parser import RAGFlowExcelParser
|
27 |
-
from rag.settings import cron_logger
|
28 |
-
from rag.utils.minio_conn import MINIO
|
29 |
-
from rag.utils import findMaxTm
|
30 |
-
import pandas as pd
|
31 |
-
from api.db import FileType, TaskStatus
|
32 |
-
from api.db.services.document_service import DocumentService
|
33 |
-
from api.settings import database_logger
|
34 |
-
from api.utils import get_format_time, get_uuid
|
35 |
-
from api.utils.file_utils import get_project_base_directory
|
36 |
-
from rag.utils.redis_conn import REDIS_CONN
|
37 |
-
from api.db.db_models import init_database_tables as init_web_db
|
38 |
-
from api.db.init_data import init_web_data
|
39 |
-
|
40 |
-
|
41 |
-
def collect(tm):
|
42 |
-
docs = DocumentService.get_newly_uploaded(tm)
|
43 |
-
if len(docs) == 0:
|
44 |
-
return pd.DataFrame()
|
45 |
-
docs = pd.DataFrame(docs)
|
46 |
-
mtm = docs["update_time"].max()
|
47 |
-
cron_logger.info("TOTAL:{}, To:{}".format(len(docs), mtm))
|
48 |
-
return docs
|
49 |
-
|
50 |
-
|
51 |
-
def set_dispatching(docid):
|
52 |
-
try:
|
53 |
-
DocumentService.update_by_id(
|
54 |
-
docid, {"progress": random.random() * 1 / 100.,
|
55 |
-
"progress_msg": "Task dispatched...",
|
56 |
-
"process_begin_at": get_format_time()
|
57 |
-
})
|
58 |
-
except Exception as e:
|
59 |
-
cron_logger.error("set_dispatching:({}), {}".format(docid, str(e)))
|
60 |
-
|
61 |
-
|
62 |
-
def dispatch():
|
63 |
-
tm_fnm = os.path.join(
|
64 |
-
get_project_base_directory(),
|
65 |
-
"rag/res",
|
66 |
-
f"broker.tm")
|
67 |
-
tm = findMaxTm(tm_fnm)
|
68 |
-
rows = collect(tm)
|
69 |
-
if len(rows) == 0:
|
70 |
-
return
|
71 |
-
|
72 |
-
tmf = open(tm_fnm, "a+")
|
73 |
-
for _, r in rows.iterrows():
|
74 |
-
try:
|
75 |
-
tsks = TaskService.query(doc_id=r["id"])
|
76 |
-
if tsks:
|
77 |
-
for t in tsks:
|
78 |
-
TaskService.delete_by_id(t.id)
|
79 |
-
except Exception as e:
|
80 |
-
cron_logger.exception(e)
|
81 |
-
|
82 |
-
def new_task():
|
83 |
-
nonlocal r
|
84 |
-
return {
|
85 |
-
"id": get_uuid(),
|
86 |
-
"doc_id": r["id"]
|
87 |
-
}
|
88 |
-
|
89 |
-
tsks = []
|
90 |
-
try:
|
91 |
-
bucket, name = File2DocumentService.get_minio_address(doc_id=r["id"])
|
92 |
-
file_bin = MINIO.get(bucket, name)
|
93 |
-
if r["type"] == FileType.PDF.value:
|
94 |
-
do_layout = r["parser_config"].get("layout_recognize", True)
|
95 |
-
pages = PdfParser.total_page_number(r["name"], file_bin)
|
96 |
-
page_size = r["parser_config"].get("task_page_size", 12)
|
97 |
-
if r["parser_id"] == "paper":
|
98 |
-
page_size = r["parser_config"].get("task_page_size", 22)
|
99 |
-
if r["parser_id"] == "one":
|
100 |
-
page_size = 1000000000
|
101 |
-
if not do_layout:
|
102 |
-
page_size = 1000000000
|
103 |
-
page_ranges = r["parser_config"].get("pages")
|
104 |
-
if not page_ranges:
|
105 |
-
page_ranges = [(1, 100000)]
|
106 |
-
for s, e in page_ranges:
|
107 |
-
s -= 1
|
108 |
-
s = max(0, s)
|
109 |
-
e = min(e - 1, pages)
|
110 |
-
for p in range(s, e, page_size):
|
111 |
-
task = new_task()
|
112 |
-
task["from_page"] = p
|
113 |
-
task["to_page"] = min(p + page_size, e)
|
114 |
-
tsks.append(task)
|
115 |
-
|
116 |
-
elif r["parser_id"] == "table":
|
117 |
-
rn = RAGFlowExcelParser.row_number(
|
118 |
-
r["name"], file_bin)
|
119 |
-
for i in range(0, rn, 3000):
|
120 |
-
task = new_task()
|
121 |
-
task["from_page"] = i
|
122 |
-
task["to_page"] = min(i + 3000, rn)
|
123 |
-
tsks.append(task)
|
124 |
-
else:
|
125 |
-
tsks.append(new_task())
|
126 |
-
|
127 |
-
bulk_insert_into_db(Task, tsks, True)
|
128 |
-
set_dispatching(r["id"])
|
129 |
-
except Exception as e:
|
130 |
-
cron_logger.exception(e)
|
131 |
-
|
132 |
-
tmf.write(str(r["update_time"]) + "\n")
|
133 |
-
tmf.close()
|
134 |
-
|
135 |
-
|
136 |
-
def update_progress():
|
137 |
-
docs = DocumentService.get_unfinished_docs()
|
138 |
-
for d in docs:
|
139 |
-
try:
|
140 |
-
tsks = TaskService.query(doc_id=d["id"], order_by=Task.create_time)
|
141 |
-
if not tsks:
|
142 |
-
continue
|
143 |
-
msg = []
|
144 |
-
prg = 0
|
145 |
-
finished = True
|
146 |
-
bad = 0
|
147 |
-
status = TaskStatus.RUNNING.value
|
148 |
-
for t in tsks:
|
149 |
-
if 0 <= t.progress < 1:
|
150 |
-
finished = False
|
151 |
-
prg += t.progress if t.progress >= 0 else 0
|
152 |
-
msg.append(t.progress_msg)
|
153 |
-
if t.progress == -1:
|
154 |
-
bad += 1
|
155 |
-
prg /= len(tsks)
|
156 |
-
if finished and bad:
|
157 |
-
prg = -1
|
158 |
-
status = TaskStatus.FAIL.value
|
159 |
-
elif finished:
|
160 |
-
status = TaskStatus.DONE.value
|
161 |
-
|
162 |
-
msg = "\n".join(msg)
|
163 |
-
info = {
|
164 |
-
"process_duation": datetime.timestamp(
|
165 |
-
datetime.now()) -
|
166 |
-
d["process_begin_at"].timestamp(),
|
167 |
-
"run": status}
|
168 |
-
if prg != 0:
|
169 |
-
info["progress"] = prg
|
170 |
-
if msg:
|
171 |
-
info["progress_msg"] = msg
|
172 |
-
DocumentService.update_by_id(d["id"], info)
|
173 |
-
except Exception as e:
|
174 |
-
cron_logger.error("fetch task exception:" + str(e))
|
175 |
-
|
176 |
-
|
177 |
-
if __name__ == "__main__":
|
178 |
-
peewee_logger = logging.getLogger('peewee')
|
179 |
-
peewee_logger.propagate = False
|
180 |
-
peewee_logger.addHandler(database_logger.handlers[0])
|
181 |
-
peewee_logger.setLevel(database_logger.level)
|
182 |
-
# init db
|
183 |
-
init_web_db()
|
184 |
-
init_web_data()
|
185 |
-
|
186 |
-
while True:
|
187 |
-
dispatch()
|
188 |
-
time.sleep(1)
|
189 |
-
update_progress()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rag/svr/task_executor.py
CHANGED
@@ -28,7 +28,7 @@ from functools import partial
|
|
28 |
from api.db.services.file2document_service import File2DocumentService
|
29 |
from rag.utils.minio_conn import MINIO
|
30 |
from api.db.db_models import close_connection
|
31 |
-
from rag.settings import database_logger
|
32 |
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
|
33 |
from multiprocessing import Pool
|
34 |
import numpy as np
|
@@ -93,20 +93,29 @@ def set_progress(task_id, from_page=0, to_page=-1,
|
|
93 |
sys.exit()
|
94 |
|
95 |
|
96 |
-
def collect(
|
97 |
-
|
98 |
-
|
99 |
-
|
100 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
101 |
return pd.DataFrame()
|
|
|
|
|
102 |
tasks = pd.DataFrame(tasks)
|
103 |
-
mtm = tasks["update_time"].max()
|
104 |
-
cron_logger.info("TOTAL:{}, To:{}".format(len(tasks), mtm))
|
105 |
return tasks
|
106 |
|
107 |
|
108 |
def get_minio_binary(bucket, name):
|
109 |
-
global MINIO
|
110 |
return MINIO.get(bucket, name)
|
111 |
|
112 |
|
@@ -122,13 +131,10 @@ def build(row):
|
|
122 |
row["from_page"],
|
123 |
row["to_page"])
|
124 |
chunker = FACTORY[row["parser_id"].lower()]
|
125 |
-
pool = Pool(processes=1)
|
126 |
try:
|
127 |
st = timer()
|
128 |
bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"])
|
129 |
-
|
130 |
-
binary = thr.get(timeout=90)
|
131 |
-
pool.terminate()
|
132 |
cron_logger.info(
|
133 |
"From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
|
134 |
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
|
@@ -147,7 +153,6 @@ def build(row):
|
|
147 |
else:
|
148 |
callback(-1, f"Internal server error: %s" %
|
149 |
str(e).replace("'", ""))
|
150 |
-
pool.terminate()
|
151 |
traceback.print_exc()
|
152 |
|
153 |
cron_logger.error(
|
@@ -238,20 +243,13 @@ def embedding(docs, mdl, parser_config={}, callback=None):
|
|
238 |
return tk_count
|
239 |
|
240 |
|
241 |
-
def main(
|
242 |
-
|
243 |
-
get_project_base_directory(),
|
244 |
-
"rag/res",
|
245 |
-
f"{comm}-{mod}.tm")
|
246 |
-
tm = findMaxTm(tm_fnm)
|
247 |
-
rows = collect(comm, mod, tm)
|
248 |
if len(rows) == 0:
|
249 |
return
|
250 |
|
251 |
-
tmf = open(tm_fnm, "a+")
|
252 |
for _, r in rows.iterrows():
|
253 |
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
|
254 |
-
#callback(random.random()/10., "Task has been received.")
|
255 |
try:
|
256 |
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])
|
257 |
except Exception as e:
|
@@ -265,7 +263,6 @@ def main(comm, mod):
|
|
265 |
if cks is None:
|
266 |
continue
|
267 |
if not cks:
|
268 |
-
tmf.write(str(r["update_time"]) + "\n")
|
269 |
callback(1., "No chunk! Done!")
|
270 |
continue
|
271 |
# TODO: exception handler
|
@@ -305,8 +302,6 @@ def main(comm, mod):
|
|
305 |
"Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
|
306 |
r["id"], tk_count, len(cks), timer()-st))
|
307 |
|
308 |
-
tmf.write(str(r["update_time"]) + "\n")
|
309 |
-
tmf.close()
|
310 |
|
311 |
|
312 |
if __name__ == "__main__":
|
@@ -315,8 +310,6 @@ if __name__ == "__main__":
|
|
315 |
peewee_logger.addHandler(database_logger.handlers[0])
|
316 |
peewee_logger.setLevel(database_logger.level)
|
317 |
|
318 |
-
#from mpi4py import MPI
|
319 |
-
#comm = MPI.COMM_WORLD
|
320 |
while True:
|
321 |
-
main(
|
322 |
close_connection()
|
|
|
28 |
from api.db.services.file2document_service import File2DocumentService
|
29 |
from rag.utils.minio_conn import MINIO
|
30 |
from api.db.db_models import close_connection
|
31 |
+
from rag.settings import database_logger, SVR_QUEUE_NAME
|
32 |
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
|
33 |
from multiprocessing import Pool
|
34 |
import numpy as np
|
|
|
93 |
sys.exit()
|
94 |
|
95 |
|
96 |
+
def collect():
|
97 |
+
try:
|
98 |
+
payload = REDIS_CONN.queue_consumer(SVR_QUEUE_NAME, "rag_flow_svr_task_broker", "rag_flow_svr_task_consumer")
|
99 |
+
if not payload:
|
100 |
+
time.sleep(1)
|
101 |
+
return pd.DataFrame()
|
102 |
+
except Exception as e:
|
103 |
+
cron_logger.error("Get task event from queue exception:" + str(e))
|
104 |
+
return pd.DataFrame()
|
105 |
+
|
106 |
+
msg = payload.get_message()
|
107 |
+
payload.ack()
|
108 |
+
if not msg: return pd.DataFrame()
|
109 |
+
|
110 |
+
if TaskService.do_cancel(msg["id"]):
|
111 |
return pd.DataFrame()
|
112 |
+
tasks = TaskService.get_tasks(msg["id"])
|
113 |
+
assert tasks, "{} empty task!".format(msg["id"])
|
114 |
tasks = pd.DataFrame(tasks)
|
|
|
|
|
115 |
return tasks
|
116 |
|
117 |
|
118 |
def get_minio_binary(bucket, name):
|
|
|
119 |
return MINIO.get(bucket, name)
|
120 |
|
121 |
|
|
|
131 |
row["from_page"],
|
132 |
row["to_page"])
|
133 |
chunker = FACTORY[row["parser_id"].lower()]
|
|
|
134 |
try:
|
135 |
st = timer()
|
136 |
bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"])
|
137 |
+
binary = get_minio_binary(bucket, name)
|
|
|
|
|
138 |
cron_logger.info(
|
139 |
"From minio({}) {}/{}".format(timer()-st, row["location"], row["name"]))
|
140 |
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"],
|
|
|
153 |
else:
|
154 |
callback(-1, f"Internal server error: %s" %
|
155 |
str(e).replace("'", ""))
|
|
|
156 |
traceback.print_exc()
|
157 |
|
158 |
cron_logger.error(
|
|
|
243 |
return tk_count
|
244 |
|
245 |
|
246 |
+
def main():
|
247 |
+
rows = collect()
|
|
|
|
|
|
|
|
|
|
|
248 |
if len(rows) == 0:
|
249 |
return
|
250 |
|
|
|
251 |
for _, r in rows.iterrows():
|
252 |
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"])
|
|
|
253 |
try:
|
254 |
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])
|
255 |
except Exception as e:
|
|
|
263 |
if cks is None:
|
264 |
continue
|
265 |
if not cks:
|
|
|
266 |
callback(1., "No chunk! Done!")
|
267 |
continue
|
268 |
# TODO: exception handler
|
|
|
302 |
"Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
|
303 |
r["id"], tk_count, len(cks), timer()-st))
|
304 |
|
|
|
|
|
305 |
|
306 |
|
307 |
if __name__ == "__main__":
|
|
|
310 |
peewee_logger.addHandler(database_logger.handlers[0])
|
311 |
peewee_logger.setLevel(database_logger.level)
|
312 |
|
|
|
|
|
313 |
while True:
|
314 |
+
main()
|
315 |
close_connection()
|
rag/utils/redis_conn.py
CHANGED
@@ -5,6 +5,27 @@ import logging
|
|
5 |
from rag import settings
|
6 |
from rag.utils import singleton
|
7 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
8 |
@singleton
|
9 |
class RedisDB:
|
10 |
def __init__(self):
|
@@ -17,7 +38,8 @@ class RedisDB:
|
|
17 |
self.REDIS = redis.StrictRedis(host=self.config["host"].split(":")[0],
|
18 |
port=int(self.config.get("host", ":6379").split(":")[1]),
|
19 |
db=int(self.config.get("db", 1)),
|
20 |
-
password=self.config
|
|
|
21 |
except Exception as e:
|
22 |
logging.warning("Redis can't be connected.")
|
23 |
return self.REDIS
|
@@ -70,5 +92,48 @@ class RedisDB:
|
|
70 |
self.__open__()
|
71 |
return False
|
72 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
73 |
|
74 |
-
REDIS_CONN = RedisDB()
|
|
|
5 |
from rag import settings
|
6 |
from rag.utils import singleton
|
7 |
|
8 |
+
|
9 |
+
class Payload:
|
10 |
+
def __init__(self, consumer, queue_name, group_name, msg_id, message):
|
11 |
+
self.__consumer = consumer
|
12 |
+
self.__queue_name = queue_name
|
13 |
+
self.__group_name = group_name
|
14 |
+
self.__msg_id = msg_id
|
15 |
+
self.__message = json.loads(message['message'])
|
16 |
+
|
17 |
+
def ack(self):
|
18 |
+
try:
|
19 |
+
self.__consumer.xack(self.__queue_name, self.__group_name, self.__msg_id)
|
20 |
+
return True
|
21 |
+
except Exception as e:
|
22 |
+
logging.warning("[EXCEPTION]ack" + str(self.__queue_name) + "||" + str(e))
|
23 |
+
return False
|
24 |
+
|
25 |
+
def get_message(self):
|
26 |
+
return self.__message
|
27 |
+
|
28 |
+
|
29 |
@singleton
|
30 |
class RedisDB:
|
31 |
def __init__(self):
|
|
|
38 |
self.REDIS = redis.StrictRedis(host=self.config["host"].split(":")[0],
|
39 |
port=int(self.config.get("host", ":6379").split(":")[1]),
|
40 |
db=int(self.config.get("db", 1)),
|
41 |
+
password=self.config.get("password"),
|
42 |
+
decode_responses=True)
|
43 |
except Exception as e:
|
44 |
logging.warning("Redis can't be connected.")
|
45 |
return self.REDIS
|
|
|
92 |
self.__open__()
|
93 |
return False
|
94 |
|
95 |
+
def queue_product(self, queue, message, exp=settings.SVR_QUEUE_RETENTION) -> bool:
|
96 |
+
try:
|
97 |
+
payload = {"message": json.dumps(message)}
|
98 |
+
pipeline = self.REDIS.pipeline()
|
99 |
+
pipeline.xadd(queue, payload)
|
100 |
+
pipeline.expire(queue, exp)
|
101 |
+
pipeline.execute()
|
102 |
+
return True
|
103 |
+
except Exception as e:
|
104 |
+
logging.warning("[EXCEPTION]producer" + str(queue) + "||" + str(e))
|
105 |
+
return False
|
106 |
+
|
107 |
+
def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> Payload:
|
108 |
+
try:
|
109 |
+
group_info = self.REDIS.xinfo_groups(queue_name)
|
110 |
+
if not any(e["name"] == group_name for e in group_info):
|
111 |
+
self.REDIS.xgroup_create(
|
112 |
+
queue_name,
|
113 |
+
group_name,
|
114 |
+
id="$",
|
115 |
+
mkstream=True
|
116 |
+
)
|
117 |
+
args = {
|
118 |
+
"groupname": group_name,
|
119 |
+
"consumername": consumer_name,
|
120 |
+
"count": 1,
|
121 |
+
"block": 10000,
|
122 |
+
"streams": {queue_name: msg_id},
|
123 |
+
}
|
124 |
+
messages = self.REDIS.xreadgroup(**args)
|
125 |
+
if not messages:
|
126 |
+
return None
|
127 |
+
stream, element_list = messages[0]
|
128 |
+
msg_id, payload = element_list[0]
|
129 |
+
res = Payload(self.REDIS, queue_name, group_name, msg_id, payload)
|
130 |
+
return res
|
131 |
+
except Exception as e:
|
132 |
+
if 'key' in str(e):
|
133 |
+
pass
|
134 |
+
else:
|
135 |
+
logging.warning("[EXCEPTION]consumer" + str(queue_name) + "||" + str(e))
|
136 |
+
return None
|
137 |
+
|
138 |
|
139 |
+
REDIS_CONN = RedisDB()
|
requirements.txt
CHANGED
@@ -134,4 +134,4 @@ xxhash==3.4.1
|
|
134 |
yarl==1.9.4
|
135 |
zhipuai==2.0.1
|
136 |
BCEmbedding
|
137 |
-
loguru==0.7.2
|
|
|
134 |
yarl==1.9.4
|
135 |
zhipuai==2.0.1
|
136 |
BCEmbedding
|
137 |
+
loguru==0.7.2
|
requirements_dev.txt
ADDED
@@ -0,0 +1,126 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
accelerate==0.27.2
|
2 |
+
aiohttp==3.9.3
|
3 |
+
aiosignal==1.3.1
|
4 |
+
annotated-types==0.6.0
|
5 |
+
anyio==4.3.0
|
6 |
+
argon2-cffi==23.1.0
|
7 |
+
argon2-cffi-bindings==21.2.0
|
8 |
+
Aspose.Slides==24.2.0
|
9 |
+
attrs==23.2.0
|
10 |
+
blinker==1.7.0
|
11 |
+
cachelib==0.12.0
|
12 |
+
cachetools==5.3.3
|
13 |
+
certifi==2024.2.2
|
14 |
+
cffi==1.16.0
|
15 |
+
charset-normalizer==3.3.2
|
16 |
+
click==8.1.7
|
17 |
+
coloredlogs==15.0.1
|
18 |
+
cryptography==42.0.5
|
19 |
+
dashscope==1.14.1
|
20 |
+
datasets==2.17.1
|
21 |
+
datrie==0.8.2
|
22 |
+
demjson3==3.0.6
|
23 |
+
dill==0.3.8
|
24 |
+
distro==1.9.0
|
25 |
+
elastic-transport==8.12.0
|
26 |
+
elasticsearch==8.12.1
|
27 |
+
elasticsearch-dsl==8.12.0
|
28 |
+
et-xmlfile==1.1.0
|
29 |
+
filelock==3.13.1
|
30 |
+
fastembed==0.2.6
|
31 |
+
FlagEmbedding==1.2.5
|
32 |
+
Flask==3.0.2
|
33 |
+
Flask-Cors==4.0.0
|
34 |
+
Flask-Login==0.6.3
|
35 |
+
Flask-Session==0.6.0
|
36 |
+
flatbuffers==23.5.26
|
37 |
+
frozenlist==1.4.1
|
38 |
+
fsspec==2023.10.0
|
39 |
+
h11==0.14.0
|
40 |
+
hanziconv==0.3.2
|
41 |
+
httpcore==1.0.4
|
42 |
+
httpx==0.27.0
|
43 |
+
huggingface-hub==0.20.3
|
44 |
+
humanfriendly==10.0
|
45 |
+
idna==3.6
|
46 |
+
install==1.3.5
|
47 |
+
itsdangerous==2.1.2
|
48 |
+
Jinja2==3.1.3
|
49 |
+
joblib==1.3.2
|
50 |
+
lxml==5.1.0
|
51 |
+
MarkupSafe==2.1.5
|
52 |
+
minio==7.2.4
|
53 |
+
mpi4py==3.1.5
|
54 |
+
mpmath==1.3.0
|
55 |
+
multidict==6.0.5
|
56 |
+
multiprocess==0.70.16
|
57 |
+
networkx==3.2.1
|
58 |
+
nltk==3.8.1
|
59 |
+
numpy==1.26.4
|
60 |
+
openai==1.12.0
|
61 |
+
opencv-python==4.9.0.80
|
62 |
+
openpyxl==3.1.2
|
63 |
+
packaging==23.2
|
64 |
+
pandas==2.2.1
|
65 |
+
pdfminer.six==20221105
|
66 |
+
pdfplumber==0.10.4
|
67 |
+
peewee==3.17.1
|
68 |
+
pillow==10.2.0
|
69 |
+
protobuf==4.25.3
|
70 |
+
psutil==5.9.8
|
71 |
+
pyarrow==15.0.0
|
72 |
+
pyarrow-hotfix==0.6
|
73 |
+
pyclipper==1.3.0.post5
|
74 |
+
pycparser==2.21
|
75 |
+
pycryptodome==3.20.0
|
76 |
+
pycryptodome-test-vectors==1.0.14
|
77 |
+
pycryptodomex==3.20.0
|
78 |
+
pydantic==2.6.2
|
79 |
+
pydantic_core==2.16.3
|
80 |
+
PyJWT==2.8.0
|
81 |
+
PyMuPDF==1.23.25
|
82 |
+
PyMuPDFb==1.23.22
|
83 |
+
PyMySQL==1.1.0
|
84 |
+
PyPDF2==3.0.1
|
85 |
+
pypdfium2==4.27.0
|
86 |
+
python-dateutil==2.8.2
|
87 |
+
python-docx==1.1.0
|
88 |
+
python-dotenv==1.0.1
|
89 |
+
python-pptx==0.6.23
|
90 |
+
pytz==2024.1
|
91 |
+
PyYAML==6.0.1
|
92 |
+
regex==2023.12.25
|
93 |
+
requests==2.31.0
|
94 |
+
ruamel.yaml==0.18.6
|
95 |
+
ruamel.yaml.clib==0.2.8
|
96 |
+
safetensors==0.4.2
|
97 |
+
scikit-learn==1.4.1.post1
|
98 |
+
scipy==1.12.0
|
99 |
+
sentence-transformers==2.4.0
|
100 |
+
shapely==2.0.3
|
101 |
+
six==1.16.0
|
102 |
+
sniffio==1.3.1
|
103 |
+
StrEnum==0.4.15
|
104 |
+
sympy==1.12
|
105 |
+
threadpoolctl==3.3.0
|
106 |
+
tika==2.6.0
|
107 |
+
tiktoken==0.6.0
|
108 |
+
tokenizers==0.15.2
|
109 |
+
torch==2.2.1
|
110 |
+
tqdm==4.66.2
|
111 |
+
transformers==4.38.1
|
112 |
+
triton==2.2.0
|
113 |
+
typing_extensions==4.10.0
|
114 |
+
tzdata==2024.1
|
115 |
+
urllib3==2.2.1
|
116 |
+
Werkzeug==3.0.1
|
117 |
+
xgboost==2.0.3
|
118 |
+
XlsxWriter==3.2.0
|
119 |
+
xpinyin==0.7.6
|
120 |
+
xxhash==3.4.1
|
121 |
+
yarl==1.9.4
|
122 |
+
zhipuai==2.0.1
|
123 |
+
BCEmbedding
|
124 |
+
loguru==0.7.2
|
125 |
+
ollama==0.1.8
|
126 |
+
redis==5.0.4
|