|
|
|
|
|
|
|
|
def new_task(): |
|
|
def new_task(): |
|
|
return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000} |
|
|
return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000} |
|
|
|
|
|
|
|
|
tsks = [] |
|
|
|
|
|
|
|
|
parse_task_array = [] |
|
|
|
|
|
|
|
|
if doc["type"] == FileType.PDF.value: |
|
|
if doc["type"] == FileType.PDF.value: |
|
|
file_bin = STORAGE_IMPL.get(bucket, name) |
|
|
file_bin = STORAGE_IMPL.get(bucket, name) |
|
|
|
|
|
|
|
|
task = new_task() |
|
|
task = new_task() |
|
|
task["from_page"] = p |
|
|
task["from_page"] = p |
|
|
task["to_page"] = min(p + page_size, e) |
|
|
task["to_page"] = min(p + page_size, e) |
|
|
tsks.append(task) |
|
|
|
|
|
|
|
|
parse_task_array.append(task) |
|
|
|
|
|
|
|
|
elif doc["parser_id"] == "table": |
|
|
elif doc["parser_id"] == "table": |
|
|
file_bin = STORAGE_IMPL.get(bucket, name) |
|
|
file_bin = STORAGE_IMPL.get(bucket, name) |
|
|
|
|
|
|
|
|
task = new_task() |
|
|
task = new_task() |
|
|
task["from_page"] = i |
|
|
task["from_page"] = i |
|
|
task["to_page"] = min(i + 3000, rn) |
|
|
task["to_page"] = min(i + 3000, rn) |
|
|
tsks.append(task) |
|
|
|
|
|
|
|
|
parse_task_array.append(task) |
|
|
else: |
|
|
else: |
|
|
tsks.append(new_task()) |
|
|
|
|
|
|
|
|
parse_task_array.append(new_task()) |
|
|
|
|
|
|
|
|
chunking_config = DocumentService.get_chunking_config(doc["id"]) |
|
|
chunking_config = DocumentService.get_chunking_config(doc["id"]) |
|
|
for task in tsks: |
|
|
|
|
|
|
|
|
for task in parse_task_array: |
|
|
hasher = xxhash.xxh64() |
|
|
hasher = xxhash.xxh64() |
|
|
for field in sorted(chunking_config.keys()): |
|
|
for field in sorted(chunking_config.keys()): |
|
|
hasher.update(str(chunking_config[field]).encode("utf-8")) |
|
|
hasher.update(str(chunking_config[field]).encode("utf-8")) |
|
|
|
|
|
|
|
|
prev_tasks = TaskService.get_tasks(doc["id"]) |
|
|
prev_tasks = TaskService.get_tasks(doc["id"]) |
|
|
ck_num = 0 |
|
|
ck_num = 0 |
|
|
if prev_tasks: |
|
|
if prev_tasks: |
|
|
for task in tsks: |
|
|
|
|
|
|
|
|
for task in parse_task_array: |
|
|
ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config) |
|
|
ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config) |
|
|
TaskService.filter_delete([Task.doc_id == doc["id"]]) |
|
|
TaskService.filter_delete([Task.doc_id == doc["id"]]) |
|
|
chunk_ids = [] |
|
|
chunk_ids = [] |
|
|
|
|
|
|
|
|
chunking_config["kb_id"]) |
|
|
chunking_config["kb_id"]) |
|
|
DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num}) |
|
|
DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num}) |
|
|
|
|
|
|
|
|
bulk_insert_into_db(Task, tsks, True) |
|
|
|
|
|
|
|
|
bulk_insert_into_db(Task, parse_task_array, True) |
|
|
DocumentService.begin2parse(doc["id"]) |
|
|
DocumentService.begin2parse(doc["id"]) |
|
|
|
|
|
|
|
|
tsks = [task for task in tsks if task["progress"] < 1.0] |
|
|
|
|
|
for t in tsks: |
|
|
|
|
|
|
|
|
unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0] |
|
|
|
|
|
for unfinished_task in unfinished_task_array: |
|
|
assert REDIS_CONN.queue_product( |
|
|
assert REDIS_CONN.queue_product( |
|
|
SVR_QUEUE_NAME, message=t |
|
|
|
|
|
|
|
|
SVR_QUEUE_NAME, message=unfinished_task |
|
|
), "Can't access Redis. Please check the Redis' status." |
|
|
), "Can't access Redis. Please check the Redis' status." |
|
|
|
|
|
|
|
|
|
|
|
|