|
|
|
@@ -99,8 +99,10 @@ CURRENT_TASKS = {} |
|
|
|
|
|
|
|
MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) |
|
|
|
MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1")) |
|
|
|
MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) |
|
|
|
task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) |
|
|
|
chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) |
|
|
|
minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) |
|
|
|
WORKER_HEARTBEAT_TIMEOUT = int(os.environ.get('WORKER_HEARTBEAT_TIMEOUT', '120')) |
|
|
|
stop_event = threading.Event() |
|
|
|
|
|
|
|
@@ -270,38 +272,43 @@ async def build_chunks(task, progress_callback): |
|
|
|
} |
|
|
|
if task["pagerank"]: |
|
|
|
doc[PAGERANK_FLD] = int(task["pagerank"]) |
|
|
|
el = 0 |
|
|
|
for ck in cks: |
|
|
|
d = copy.deepcopy(doc) |
|
|
|
d.update(ck) |
|
|
|
d["id"] = xxhash.xxh64((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest() |
|
|
|
d["create_time"] = str(datetime.now()).replace("T", " ")[:19] |
|
|
|
d["create_timestamp_flt"] = datetime.now().timestamp() |
|
|
|
if not d.get("image"): |
|
|
|
_ = d.pop("image", None) |
|
|
|
d["img_id"] = "" |
|
|
|
docs.append(d) |
|
|
|
continue |
|
|
|
st = timer() |
|
|
|
|
|
|
|
async def upload_to_minio(document, chunk): |
|
|
|
try: |
|
|
|
output_buffer = BytesIO() |
|
|
|
if isinstance(d["image"], bytes): |
|
|
|
output_buffer = BytesIO(d["image"]) |
|
|
|
else: |
|
|
|
d["image"].save(output_buffer, format='JPEG') |
|
|
|
|
|
|
|
st = timer() |
|
|
|
await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) |
|
|
|
el += timer() - st |
|
|
|
async with minio_limiter: |
|
|
|
d = copy.deepcopy(document) |
|
|
|
d.update(chunk) |
|
|
|
d["id"] = xxhash.xxh64((chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest() |
|
|
|
d["create_time"] = str(datetime.now()).replace("T", " ")[:19] |
|
|
|
d["create_timestamp_flt"] = datetime.now().timestamp() |
|
|
|
if not d.get("image"): |
|
|
|
_ = d.pop("image", None) |
|
|
|
d["img_id"] = "" |
|
|
|
docs.append(d) |
|
|
|
return |
|
|
|
|
|
|
|
output_buffer = BytesIO() |
|
|
|
if isinstance(d["image"], bytes): |
|
|
|
output_buffer = BytesIO(d["image"]) |
|
|
|
else: |
|
|
|
d["image"].save(output_buffer, format='JPEG') |
|
|
|
await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) |
|
|
|
|
|
|
|
d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) |
|
|
|
del d["image"] |
|
|
|
docs.append(d) |
|
|
|
except Exception: |
|
|
|
logging.exception( |
|
|
|
"Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"])) |
|
|
|
raise |
|
|
|
|
|
|
|
d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) |
|
|
|
del d["image"] |
|
|
|
docs.append(d) |
|
|
|
logging.info("MINIO PUT({}):{}".format(task["name"], el)) |
|
|
|
async with trio.open_nursery() as nursery: |
|
|
|
for ck in cks: |
|
|
|
nursery.start_soon(upload_to_minio, doc, ck) |
|
|
|
|
|
|
|
el = timer() - st |
|
|
|
logging.info("MINIO PUT({}) cost {:.3f} s".format(task["name"], el)) |
|
|
|
|
|
|
|
if task["parser_config"].get("auto_keywords", 0): |
|
|
|
st = timer() |