|
|
|
@@ -42,7 +42,6 @@ from multiprocessing.context import TimeoutError |
|
|
|
from timeit import default_timer as timer |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
from api.db import LLMType, ParserType |
|
|
|
from api.db.services.dialog_service import keyword_extraction, question_proposal |
|
|
|
@@ -85,10 +84,9 @@ CONSUMER_NAME = "task_consumer_" + CONSUMER_NO |
|
|
|
PAYLOAD: Payload | None = None |
|
|
|
BOOT_AT = datetime.now().isoformat() |
|
|
|
DONE_TASKS = 0 |
|
|
|
RETRY_TASKS = 0 |
|
|
|
FAILED_TASKS = 0 |
|
|
|
PENDING_TASKS = 0 |
|
|
|
HEAD_CREATED_AT = "" |
|
|
|
HEAD_DETAIL = "" |
|
|
|
LAG_TASKS = 0 |
|
|
|
|
|
|
|
|
|
|
|
def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing..."): |
|
|
|
@@ -120,34 +118,35 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... |
|
|
|
|
|
|
|
|
|
|
|
def collect(): |
|
|
|
global CONSUMER_NAME, PAYLOAD |
|
|
|
global CONSUMER_NAME, PAYLOAD, DONE_TASKS, FAILED_TASKS |
|
|
|
try: |
|
|
|
PAYLOAD = REDIS_CONN.get_unacked_for(CONSUMER_NAME, SVR_QUEUE_NAME, "rag_flow_svr_task_broker") |
|
|
|
if not PAYLOAD: |
|
|
|
PAYLOAD = REDIS_CONN.queue_consumer(SVR_QUEUE_NAME, "rag_flow_svr_task_broker", CONSUMER_NAME) |
|
|
|
if not PAYLOAD: |
|
|
|
time.sleep(1) |
|
|
|
return pd.DataFrame() |
|
|
|
return None |
|
|
|
except Exception: |
|
|
|
logging.exception("Get task event from queue exception") |
|
|
|
return pd.DataFrame() |
|
|
|
return None |
|
|
|
|
|
|
|
msg = PAYLOAD.get_message() |
|
|
|
if not msg: |
|
|
|
return pd.DataFrame() |
|
|
|
return None |
|
|
|
|
|
|
|
if TaskService.do_cancel(msg["id"]): |
|
|
|
DONE_TASKS += 1 |
|
|
|
logging.info("Task {} has been canceled.".format(msg["id"])) |
|
|
|
return pd.DataFrame() |
|
|
|
tasks = TaskService.get_tasks(msg["id"]) |
|
|
|
if not tasks: |
|
|
|
return None |
|
|
|
task = TaskService.get_task(msg["id"]) |
|
|
|
if not task: |
|
|
|
DONE_TASKS += 1 |
|
|
|
logging.warning("{} empty task!".format(msg["id"])) |
|
|
|
return [] |
|
|
|
return None |
|
|
|
|
|
|
|
tasks = pd.DataFrame(tasks) |
|
|
|
if msg.get("type", "") == "raptor": |
|
|
|
tasks["task_type"] = "raptor" |
|
|
|
return tasks |
|
|
|
task["task_type"] = "raptor" |
|
|
|
return task |
|
|
|
|
|
|
|
|
|
|
|
def get_storage_binary(bucket, name): |
|
|
|
@@ -176,14 +175,14 @@ def build(row): |
|
|
|
callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.") |
|
|
|
logging.exception( |
|
|
|
"Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"])) |
|
|
|
return |
|
|
|
raise |
|
|
|
except Exception as e: |
|
|
|
if re.search("(No such file|not found)", str(e)): |
|
|
|
callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"]) |
|
|
|
else: |
|
|
|
callback(-1, "Get file from minio: %s" % str(e).replace("'", "")) |
|
|
|
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) |
|
|
|
return |
|
|
|
raise |
|
|
|
|
|
|
|
try: |
|
|
|
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"], |
|
|
|
@@ -194,7 +193,7 @@ def build(row): |
|
|
|
callback(-1, "Internal server error while chunking: %s" % |
|
|
|
str(e).replace("'", "")) |
|
|
|
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) |
|
|
|
return |
|
|
|
raise |
|
|
|
|
|
|
|
docs = [] |
|
|
|
doc = { |
|
|
|
@@ -212,6 +211,7 @@ def build(row): |
|
|
|
d["create_time"] = str(datetime.now()).replace("T", " ")[:19] |
|
|
|
d["create_timestamp_flt"] = datetime.now().timestamp() |
|
|
|
if not d.get("image"): |
|
|
|
_ = d.pop("image", None) |
|
|
|
d["img_id"] = "" |
|
|
|
d["page_num_list"] = json.dumps([]) |
|
|
|
d["position_list"] = json.dumps([]) |
|
|
|
@@ -232,6 +232,7 @@ def build(row): |
|
|
|
except Exception: |
|
|
|
logging.exception( |
|
|
|
"Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"])) |
|
|
|
raise |
|
|
|
|
|
|
|
d["img_id"] = "{}-{}".format(row["kb_id"], d["id"]) |
|
|
|
del d["image"] |
|
|
|
@@ -356,105 +357,111 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None): |
|
|
|
return res, tk_count, vector_size |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
rows = collect() |
|
|
|
if len(rows) == 0: |
|
|
|
return |
|
|
|
|
|
|
|
for _, r in rows.iterrows(): |
|
|
|
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"]) |
|
|
|
def do_handle_task(r): |
|
|
|
callback = partial(set_progress, r["id"], r["from_page"], r["to_page"]) |
|
|
|
try: |
|
|
|
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"]) |
|
|
|
except Exception as e: |
|
|
|
callback(-1, msg=str(e)) |
|
|
|
raise |
|
|
|
if r.get("task_type", "") == "raptor": |
|
|
|
try: |
|
|
|
embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"]) |
|
|
|
chat_mdl = LLMBundle(r["tenant_id"], LLMType.CHAT, llm_name=r["llm_id"], lang=r["language"]) |
|
|
|
cks, tk_count, vector_size = run_raptor(r, chat_mdl, embd_mdl, callback) |
|
|
|
except Exception as e: |
|
|
|
callback(-1, msg=str(e)) |
|
|
|
logging.exception("LLMBundle got exception") |
|
|
|
continue |
|
|
|
|
|
|
|
if r.get("task_type", "") == "raptor": |
|
|
|
try: |
|
|
|
chat_mdl = LLMBundle(r["tenant_id"], LLMType.CHAT, llm_name=r["llm_id"], lang=r["language"]) |
|
|
|
cks, tk_count, vector_size = run_raptor(r, chat_mdl, embd_mdl, callback) |
|
|
|
except Exception as e: |
|
|
|
callback(-1, msg=str(e)) |
|
|
|
logging.exception("run_raptor got exception") |
|
|
|
continue |
|
|
|
else: |
|
|
|
st = timer() |
|
|
|
cks = build(r) |
|
|
|
logging.info("Build chunks({}): {}".format(r["name"], timer() - st)) |
|
|
|
if cks is None: |
|
|
|
continue |
|
|
|
if not cks: |
|
|
|
callback(1., "No chunk! Done!") |
|
|
|
continue |
|
|
|
# TODO: exception handler |
|
|
|
## set_progress(r["did"], -1, "ERROR: ") |
|
|
|
callback( |
|
|
|
raise |
|
|
|
else: |
|
|
|
st = timer() |
|
|
|
cks = build(r) |
|
|
|
logging.info("Build chunks({}): {}".format(r["name"], timer() - st)) |
|
|
|
if cks is None: |
|
|
|
return |
|
|
|
if not cks: |
|
|
|
callback(1., "No chunk! Done!") |
|
|
|
return |
|
|
|
# TODO: exception handler |
|
|
|
## set_progress(r["did"], -1, "ERROR: ") |
|
|
|
callback( |
|
|
|
msg="Finished slicing files ({} chunks in {:.2f}s). Start to embedding the content.".format(len(cks), |
|
|
|
timer() - st) |
|
|
|
) |
|
|
|
st = timer() |
|
|
|
try: |
|
|
|
tk_count, vector_size = embedding(cks, embd_mdl, r["parser_config"], callback) |
|
|
|
except Exception as e: |
|
|
|
callback(-1, "Embedding error:{}".format(str(e))) |
|
|
|
logging.exception("run_rembedding got exception") |
|
|
|
tk_count = 0 |
|
|
|
logging.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st)) |
|
|
|
callback(msg="Finished embedding (in {:.2f}s)! Start to build index!".format(timer() - st)) |
|
|
|
|
|
|
|
# logging.info(f"task_executor init_kb index {search.index_name(r["tenant_id"])} embd_mdl {embd_mdl.llm_name} vector length {vector_size}") |
|
|
|
init_kb(r, vector_size) |
|
|
|
chunk_count = len(set([c["id"] for c in cks])) |
|
|
|
) |
|
|
|
st = timer() |
|
|
|
es_r = "" |
|
|
|
es_bulk_size = 4 |
|
|
|
for b in range(0, len(cks), es_bulk_size): |
|
|
|
es_r = settings.docStoreConn.insert(cks[b:b + es_bulk_size], search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
if b % 128 == 0: |
|
|
|
callback(prog=0.8 + 0.1 * (b + 1) / len(cks), msg="") |
|
|
|
|
|
|
|
logging.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st)) |
|
|
|
if es_r: |
|
|
|
callback(-1, "Insert chunk error, detail info please check log file. Please also check ES status!") |
|
|
|
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
logging.error('Insert chunk error: ' + str(es_r)) |
|
|
|
else: |
|
|
|
if TaskService.do_cancel(r["id"]): |
|
|
|
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
continue |
|
|
|
callback(msg="Indexing elapsed in {:.2f}s.".format(timer() - st)) |
|
|
|
callback(1., "Done!") |
|
|
|
DocumentService.increment_chunk_num( |
|
|
|
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0) |
|
|
|
logging.info( |
|
|
|
"Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format( |
|
|
|
r["id"], tk_count, len(cks), timer() - st)) |
|
|
|
try: |
|
|
|
tk_count, vector_size = embedding(cks, embd_mdl, r["parser_config"], callback) |
|
|
|
except Exception as e: |
|
|
|
callback(-1, "Embedding error:{}".format(str(e))) |
|
|
|
logging.exception("run_rembedding got exception") |
|
|
|
tk_count = 0 |
|
|
|
raise |
|
|
|
logging.info("Embedding elapsed({}): {:.2f}".format(r["name"], timer() - st)) |
|
|
|
callback(msg="Finished embedding (in {:.2f}s)! Start to build index!".format(timer() - st)) |
|
|
|
# logging.info(f"task_executor init_kb index {search.index_name(r["tenant_id"])} embd_mdl {embd_mdl.llm_name} vector length {vector_size}") |
|
|
|
init_kb(r, vector_size) |
|
|
|
chunk_count = len(set([c["id"] for c in cks])) |
|
|
|
st = timer() |
|
|
|
es_r = "" |
|
|
|
es_bulk_size = 4 |
|
|
|
for b in range(0, len(cks), es_bulk_size): |
|
|
|
es_r = settings.docStoreConn.insert(cks[b:b + es_bulk_size], search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
if b % 128 == 0: |
|
|
|
callback(prog=0.8 + 0.1 * (b + 1) / len(cks), msg="") |
|
|
|
logging.info("Indexing elapsed({}): {:.2f}".format(r["name"], timer() - st)) |
|
|
|
if es_r: |
|
|
|
callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!") |
|
|
|
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
logging.error('Insert chunk error: ' + str(es_r)) |
|
|
|
raise Exception('Insert chunk error: ' + str(es_r)) |
|
|
|
|
|
|
|
if TaskService.do_cancel(r["id"]): |
|
|
|
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"]) |
|
|
|
return |
|
|
|
|
|
|
|
callback(msg="Indexing elapsed in {:.2f}s.".format(timer() - st)) |
|
|
|
callback(1., "Done!") |
|
|
|
DocumentService.increment_chunk_num( |
|
|
|
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0) |
|
|
|
logging.info( |
|
|
|
"Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format( |
|
|
|
r["id"], tk_count, len(cks), timer() - st)) |
|
|
|
|
|
|
|
|
|
|
|
def handle_task(): |
|
|
|
global PAYLOAD, DONE_TASKS, FAILED_TASKS |
|
|
|
task = collect() |
|
|
|
if task: |
|
|
|
try: |
|
|
|
logging.info(f"handle_task begin for task {json.dumps(task)}") |
|
|
|
do_handle_task(task) |
|
|
|
DONE_TASKS += 1 |
|
|
|
logging.exception(f"handle_task done for task {json.dumps(task)}") |
|
|
|
except Exception: |
|
|
|
FAILED_TASKS += 1 |
|
|
|
logging.exception(f"handle_task got exception for task {json.dumps(task)}") |
|
|
|
if PAYLOAD: |
|
|
|
PAYLOAD.ack() |
|
|
|
PAYLOAD = None |
|
|
|
|
|
|
|
|
|
|
|
def report_status(): |
|
|
|
global CONSUMER_NAME, BOOT_AT, DONE_TASKS, RETRY_TASKS, PENDING_TASKS, HEAD_CREATED_AT, HEAD_DETAIL |
|
|
|
global CONSUMER_NAME, BOOT_AT, DONE_TASKS, FAILED_TASKS, PENDING_TASKS, LAG_TASKS |
|
|
|
REDIS_CONN.sadd("TASKEXE", CONSUMER_NAME) |
|
|
|
while True: |
|
|
|
try: |
|
|
|
now = datetime.now() |
|
|
|
PENDING_TASKS = REDIS_CONN.queue_length(SVR_QUEUE_NAME) |
|
|
|
if PENDING_TASKS > 0: |
|
|
|
head_info = REDIS_CONN.queue_head(SVR_QUEUE_NAME) |
|
|
|
if head_info is not None: |
|
|
|
seconds = int(head_info[0].split("-")[0]) / 1000 |
|
|
|
HEAD_CREATED_AT = datetime.fromtimestamp(seconds).isoformat() |
|
|
|
HEAD_DETAIL = head_info[1] |
|
|
|
group_info = REDIS_CONN.queue_info(SVR_QUEUE_NAME, "rag_flow_svr_task_broker") |
|
|
|
if group_info is not None: |
|
|
|
PENDING_TASKS = int(group_info["pending"]) |
|
|
|
LAG_TASKS = int(group_info["lag"]) |
|
|
|
|
|
|
|
heartbeat = json.dumps({ |
|
|
|
"name": CONSUMER_NAME, |
|
|
|
"now": now.isoformat(), |
|
|
|
"boot_at": BOOT_AT, |
|
|
|
"done": DONE_TASKS, |
|
|
|
"retry": RETRY_TASKS, |
|
|
|
"failed": FAILED_TASKS, |
|
|
|
"pending": PENDING_TASKS, |
|
|
|
"head_created_at": HEAD_CREATED_AT, |
|
|
|
"head_detail": HEAD_DETAIL, |
|
|
|
"lag": LAG_TASKS, |
|
|
|
}) |
|
|
|
REDIS_CONN.zadd(CONSUMER_NAME, heartbeat, now.timestamp()) |
|
|
|
logging.info(f"{CONSUMER_NAME} reported heartbeat: {heartbeat}") |
|
|
|
@@ -466,14 +473,13 @@ def report_status(): |
|
|
|
logging.exception("report_status got exception") |
|
|
|
time.sleep(30) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
def main(): |
|
|
|
background_thread = threading.Thread(target=report_status) |
|
|
|
background_thread.daemon = True |
|
|
|
background_thread.start() |
|
|
|
|
|
|
|
while True: |
|
|
|
main() |
|
|
|
if PAYLOAD: |
|
|
|
PAYLOAD.ack() |
|
|
|
PAYLOAD = None |
|
|
|
handle_task() |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
main() |