Commit 
							
							·
						
						403ecb1
	
1
								Parent(s):
							
							5e4c165
								
Fix 'SCORE' not found bug (#4178)
Browse files### What problem does this PR solve?
As title
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
---------
Signed-off-by: jinhai <[email protected]>
    	
        api/db/services/dialog_service.py
    CHANGED
    
    | @@ -311,7 +311,7 @@ def chat(dialog, messages, stream=True, **kwargs): | |
| 311 | 
             
                    retrieval_time_cost = (retrieval_ts - generate_keyword_ts) * 1000
         | 
| 312 | 
             
                    generate_result_time_cost = (finish_chat_ts - retrieval_ts) * 1000
         | 
| 313 |  | 
| 314 | 
            -
                    prompt = f"{prompt} | 
| 315 | 
             
                    return {"answer": answer, "reference": refs, "prompt": prompt}
         | 
| 316 |  | 
| 317 | 
             
                if stream:
         | 
|  | |
| 311 | 
             
                    retrieval_time_cost = (retrieval_ts - generate_keyword_ts) * 1000
         | 
| 312 | 
             
                    generate_result_time_cost = (finish_chat_ts - retrieval_ts) * 1000
         | 
| 313 |  | 
| 314 | 
            +
                    prompt = f"{prompt}\n\n - Total: {total_time_cost:.1f}ms\n  - Check LLM: {check_llm_time_cost:.1f}ms\n  - Create retriever: {create_retriever_time_cost:.1f}ms\n  - Bind embedding: {bind_embedding_time_cost:.1f}ms\n  - Bind LLM: {bind_llm_time_cost:.1f}ms\n  - Tune question: {refine_question_time_cost:.1f}ms\n  - Bind reranker: {bind_reranker_time_cost:.1f}ms\n  - Generate keyword: {generate_keyword_time_cost:.1f}ms\n  - Retrieval: {retrieval_time_cost:.1f}ms\n  - Generate answer: {generate_result_time_cost:.1f}ms"
         | 
| 315 | 
             
                    return {"answer": answer, "reference": refs, "prompt": prompt}
         | 
| 316 |  | 
| 317 | 
             
                if stream:
         | 
    	
        api/db/services/task_service.py
    CHANGED
    
    | @@ -204,7 +204,7 @@ def queue_tasks(doc: dict, bucket: str, name: str): | |
| 204 | 
             
                def new_task():
         | 
| 205 | 
             
                    return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}
         | 
| 206 |  | 
| 207 | 
            -
                 | 
| 208 |  | 
| 209 | 
             
                if doc["type"] == FileType.PDF.value:
         | 
| 210 | 
             
                    file_bin = STORAGE_IMPL.get(bucket, name)
         | 
| @@ -224,7 +224,7 @@ def queue_tasks(doc: dict, bucket: str, name: str): | |
| 224 | 
             
                            task = new_task()
         | 
| 225 | 
             
                            task["from_page"] = p
         | 
| 226 | 
             
                            task["to_page"] = min(p + page_size, e)
         | 
| 227 | 
            -
                             | 
| 228 |  | 
| 229 | 
             
                elif doc["parser_id"] == "table":
         | 
| 230 | 
             
                    file_bin = STORAGE_IMPL.get(bucket, name)
         | 
| @@ -233,12 +233,12 @@ def queue_tasks(doc: dict, bucket: str, name: str): | |
| 233 | 
             
                        task = new_task()
         | 
| 234 | 
             
                        task["from_page"] = i
         | 
| 235 | 
             
                        task["to_page"] = min(i + 3000, rn)
         | 
| 236 | 
            -
                         | 
| 237 | 
             
                else:
         | 
| 238 | 
            -
                     | 
| 239 |  | 
| 240 | 
             
                chunking_config = DocumentService.get_chunking_config(doc["id"])
         | 
| 241 | 
            -
                for task in  | 
| 242 | 
             
                    hasher = xxhash.xxh64()
         | 
| 243 | 
             
                    for field in sorted(chunking_config.keys()):
         | 
| 244 | 
             
                        hasher.update(str(chunking_config[field]).encode("utf-8"))
         | 
| @@ -251,7 +251,7 @@ def queue_tasks(doc: dict, bucket: str, name: str): | |
| 251 | 
             
                prev_tasks = TaskService.get_tasks(doc["id"])
         | 
| 252 | 
             
                ck_num = 0
         | 
| 253 | 
             
                if prev_tasks:
         | 
| 254 | 
            -
                    for task in  | 
| 255 | 
             
                        ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
         | 
| 256 | 
             
                    TaskService.filter_delete([Task.doc_id == doc["id"]])
         | 
| 257 | 
             
                    chunk_ids = []
         | 
| @@ -263,13 +263,13 @@ def queue_tasks(doc: dict, bucket: str, name: str): | |
| 263 | 
             
                                                     chunking_config["kb_id"])
         | 
| 264 | 
             
                DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})
         | 
| 265 |  | 
| 266 | 
            -
                bulk_insert_into_db(Task,  | 
| 267 | 
             
                DocumentService.begin2parse(doc["id"])
         | 
| 268 |  | 
| 269 | 
            -
                 | 
| 270 | 
            -
                for  | 
| 271 | 
             
                    assert REDIS_CONN.queue_product(
         | 
| 272 | 
            -
                        SVR_QUEUE_NAME, message= | 
| 273 | 
             
                    ), "Can't access Redis. Please check the Redis' status."
         | 
| 274 |  | 
| 275 |  | 
|  | |
| 204 | 
             
                def new_task():
         | 
| 205 | 
             
                    return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}
         | 
| 206 |  | 
| 207 | 
            +
                parse_task_array = []
         | 
| 208 |  | 
| 209 | 
             
                if doc["type"] == FileType.PDF.value:
         | 
| 210 | 
             
                    file_bin = STORAGE_IMPL.get(bucket, name)
         | 
|  | |
| 224 | 
             
                            task = new_task()
         | 
| 225 | 
             
                            task["from_page"] = p
         | 
| 226 | 
             
                            task["to_page"] = min(p + page_size, e)
         | 
| 227 | 
            +
                            parse_task_array.append(task)
         | 
| 228 |  | 
| 229 | 
             
                elif doc["parser_id"] == "table":
         | 
| 230 | 
             
                    file_bin = STORAGE_IMPL.get(bucket, name)
         | 
|  | |
| 233 | 
             
                        task = new_task()
         | 
| 234 | 
             
                        task["from_page"] = i
         | 
| 235 | 
             
                        task["to_page"] = min(i + 3000, rn)
         | 
| 236 | 
            +
                        parse_task_array.append(task)
         | 
| 237 | 
             
                else:
         | 
| 238 | 
            +
                    parse_task_array.append(new_task())
         | 
| 239 |  | 
| 240 | 
             
                chunking_config = DocumentService.get_chunking_config(doc["id"])
         | 
| 241 | 
            +
                for task in parse_task_array:
         | 
| 242 | 
             
                    hasher = xxhash.xxh64()
         | 
| 243 | 
             
                    for field in sorted(chunking_config.keys()):
         | 
| 244 | 
             
                        hasher.update(str(chunking_config[field]).encode("utf-8"))
         | 
|  | |
| 251 | 
             
                prev_tasks = TaskService.get_tasks(doc["id"])
         | 
| 252 | 
             
                ck_num = 0
         | 
| 253 | 
             
                if prev_tasks:
         | 
| 254 | 
            +
                    for task in parse_task_array:
         | 
| 255 | 
             
                        ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
         | 
| 256 | 
             
                    TaskService.filter_delete([Task.doc_id == doc["id"]])
         | 
| 257 | 
             
                    chunk_ids = []
         | 
|  | |
| 263 | 
             
                                                     chunking_config["kb_id"])
         | 
| 264 | 
             
                DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})
         | 
| 265 |  | 
| 266 | 
            +
                bulk_insert_into_db(Task, parse_task_array, True)
         | 
| 267 | 
             
                DocumentService.begin2parse(doc["id"])
         | 
| 268 |  | 
| 269 | 
            +
                unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]
         | 
| 270 | 
            +
                for unfinished_task in unfinished_task_array:
         | 
| 271 | 
             
                    assert REDIS_CONN.queue_product(
         | 
| 272 | 
            +
                        SVR_QUEUE_NAME, message=unfinished_task
         | 
| 273 | 
             
                    ), "Can't access Redis. Please check the Redis' status."
         | 
| 274 |  | 
| 275 |  | 
    	
        rag/utils/infinity_conn.py
    CHANGED
    
    | @@ -342,7 +342,7 @@ class InfinityConnection(DocStoreConnection): | |
| 342 | 
             
                    self.connPool.release_conn(inf_conn)
         | 
| 343 | 
             
                    res = concat_dataframes(df_list, selectFields)
         | 
| 344 | 
             
                    if matchExprs:
         | 
| 345 | 
            -
                        res = res.sort(pl.col(" | 
| 346 | 
             
                    res = res.limit(limit)
         | 
| 347 | 
             
                    logger.debug(f"INFINITY search final result: {str(res)}")
         | 
| 348 | 
             
                    return res, total_hits_count
         | 
|  | |
| 342 | 
             
                    self.connPool.release_conn(inf_conn)
         | 
| 343 | 
             
                    res = concat_dataframes(df_list, selectFields)
         | 
| 344 | 
             
                    if matchExprs:
         | 
| 345 | 
            +
                        res = res.sort(pl.col("score()") + pl.col("pagerank_fea"), descending=True, maintain_order=True)
         | 
| 346 | 
             
                    res = res.limit(limit)
         | 
| 347 | 
             
                    logger.debug(f"INFINITY search final result: {str(res)}")
         | 
| 348 | 
             
                    return res, total_hits_count
         |