KevinHuSh
commited on
Commit
·
389dac4
1
Parent(s):
a505adc
Let task continue dispaching while meeting unexpected doc formats (#199)
Browse files### What problem does this PR solve?
_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._
Issue link:#[[Link the issue
here](https://github.com/infiniflow/ragflow/issues/198)]
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- [ ] New Feature (non-breaking change which adds functionality)
- [ ] Breaking Change (fix or feature that could cause existing
functionality not to work as expected)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Test cases
- [ ] Python SDK impacted, Need to update PyPI
- [ ] Other (please describe):
- rag/svr/task_broker.py +40 -36
rag/svr/task_broker.py
CHANGED
|
@@ -73,7 +73,7 @@ def dispatch():
|
|
| 73 |
for t in tsks:
|
| 74 |
TaskService.delete_by_id(t.id)
|
| 75 |
except Exception as e:
|
| 76 |
-
cron_logger.
|
| 77 |
|
| 78 |
def new_task():
|
| 79 |
nonlocal r
|
|
@@ -83,44 +83,48 @@ def dispatch():
|
|
| 83 |
}
|
| 84 |
|
| 85 |
tsks = []
|
| 86 |
-
|
| 87 |
-
|
| 88 |
-
|
| 89 |
-
|
| 90 |
-
|
| 91 |
-
|
| 92 |
-
|
| 93 |
-
|
| 94 |
-
|
| 95 |
-
|
| 96 |
-
|
| 97 |
-
|
| 98 |
-
|
| 99 |
-
|
| 100 |
-
|
| 101 |
-
s
|
| 102 |
-
|
| 103 |
-
|
| 104 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 105 |
task = new_task()
|
| 106 |
-
task["from_page"] =
|
| 107 |
-
task["to_page"] = min(
|
| 108 |
tsks.append(task)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 109 |
|
| 110 |
-
elif r["parser_id"] == "table":
|
| 111 |
-
rn = HuExcelParser.row_number(
|
| 112 |
-
r["name"], MINIO.get(
|
| 113 |
-
r["kb_id"], r["location"]))
|
| 114 |
-
for i in range(0, rn, 3000):
|
| 115 |
-
task = new_task()
|
| 116 |
-
task["from_page"] = i
|
| 117 |
-
task["to_page"] = min(i + 3000, rn)
|
| 118 |
-
tsks.append(task)
|
| 119 |
-
else:
|
| 120 |
-
tsks.append(new_task())
|
| 121 |
-
|
| 122 |
-
bulk_insert_into_db(Task, tsks, True)
|
| 123 |
-
set_dispatching(r["id"])
|
| 124 |
tmf.write(str(r["update_time"]) + "\n")
|
| 125 |
tmf.close()
|
| 126 |
|
|
|
|
| 73 |
for t in tsks:
|
| 74 |
TaskService.delete_by_id(t.id)
|
| 75 |
except Exception as e:
|
| 76 |
+
cron_logger.exception(e)
|
| 77 |
|
| 78 |
def new_task():
|
| 79 |
nonlocal r
|
|
|
|
| 83 |
}
|
| 84 |
|
| 85 |
tsks = []
|
| 86 |
+
try:
|
| 87 |
+
if r["type"] == FileType.PDF.value:
|
| 88 |
+
do_layout = r["parser_config"].get("layout_recognize", True)
|
| 89 |
+
pages = PdfParser.total_page_number(
|
| 90 |
+
r["name"], MINIO.get(r["kb_id"], r["location"]))
|
| 91 |
+
page_size = r["parser_config"].get("task_page_size", 12)
|
| 92 |
+
if r["parser_id"] == "paper":
|
| 93 |
+
page_size = r["parser_config"].get("task_page_size", 22)
|
| 94 |
+
if r["parser_id"] == "one":
|
| 95 |
+
page_size = 1000000000
|
| 96 |
+
if not do_layout:
|
| 97 |
+
page_size = 1000000000
|
| 98 |
+
page_ranges = r["parser_config"].get("pages")
|
| 99 |
+
if not page_ranges:
|
| 100 |
+
page_ranges = [(1, 100000)]
|
| 101 |
+
for s, e in page_ranges:
|
| 102 |
+
s -= 1
|
| 103 |
+
s = max(0, s)
|
| 104 |
+
e = min(e - 1, pages)
|
| 105 |
+
for p in range(s, e, page_size):
|
| 106 |
+
task = new_task()
|
| 107 |
+
task["from_page"] = p
|
| 108 |
+
task["to_page"] = min(p + page_size, e)
|
| 109 |
+
tsks.append(task)
|
| 110 |
+
|
| 111 |
+
elif r["parser_id"] == "table":
|
| 112 |
+
rn = HuExcelParser.row_number(
|
| 113 |
+
r["name"], MINIO.get(
|
| 114 |
+
r["kb_id"], r["location"]))
|
| 115 |
+
for i in range(0, rn, 3000):
|
| 116 |
task = new_task()
|
| 117 |
+
task["from_page"] = i
|
| 118 |
+
task["to_page"] = min(i + 3000, rn)
|
| 119 |
tsks.append(task)
|
| 120 |
+
else:
|
| 121 |
+
tsks.append(new_task())
|
| 122 |
+
|
| 123 |
+
bulk_insert_into_db(Task, tsks, True)
|
| 124 |
+
set_dispatching(r["id"])
|
| 125 |
+
except Exception as e:
|
| 126 |
+
cron_logger.exception(e)
|
| 127 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 128 |
tmf.write(str(r["update_time"]) + "\n")
|
| 129 |
tmf.close()
|
| 130 |
|