Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import random
  18. import xxhash
  19. from datetime import datetime
  20. from api.db.db_utils import bulk_insert_into_db
  21. from deepdoc.parser import PdfParser
  22. from peewee import JOIN
  23. from api.db.db_models import DB, File2Document, File
  24. from api.db import StatusEnum, FileType, TaskStatus
  25. from api.db.db_models import Task, Document, Knowledgebase, Tenant
  26. from api.db.services.common_service import CommonService
  27. from api.db.services.document_service import DocumentService
  28. from api.utils import current_timestamp, get_uuid
  29. from deepdoc.parser.excel_parser import RAGFlowExcelParser
  30. from rag.settings import get_svr_queue_name
  31. from rag.utils.storage_factory import STORAGE_IMPL
  32. from rag.utils.redis_conn import REDIS_CONN
  33. from api import settings
  34. from rag.nlp import search
  35. def trim_header_by_lines(text: str, max_length) -> str:
  36. # Trim header text to maximum length while preserving line breaks
  37. # Args:
  38. # text: Input text to trim
  39. # max_length: Maximum allowed length
  40. # Returns:
  41. # Trimmed text
  42. len_text = len(text)
  43. if len_text <= max_length:
  44. return text
  45. for i in range(len_text):
  46. if text[i] == '\n' and len_text - i <= max_length:
  47. return text[i + 1:]
  48. return text
  49. class TaskService(CommonService):
  50. """Service class for managing document processing tasks.
  51. This class extends CommonService to provide specialized functionality for document
  52. processing task management, including task creation, progress tracking, and chunk
  53. management. It handles various document types (PDF, Excel, etc.) and manages their
  54. processing lifecycle.
  55. The class implements a robust task queue system with retry mechanisms and progress
  56. tracking, supporting both synchronous and asynchronous task execution.
  57. Attributes:
  58. model: The Task model class for database operations.
  59. """
  60. model = Task
  61. @classmethod
  62. @DB.connection_context()
  63. def get_task(cls, task_id):
  64. """Retrieve detailed task information by task ID.
  65. This method fetches comprehensive task details including associated document,
  66. knowledge base, and tenant information. It also handles task retry logic and
  67. progress updates.
  68. Args:
  69. task_id (str): The unique identifier of the task to retrieve.
  70. Returns:
  71. dict: Task details dictionary containing all task information and related metadata.
  72. Returns None if task is not found or has exceeded retry limit.
  73. """
  74. fields = [
  75. cls.model.id,
  76. cls.model.doc_id,
  77. cls.model.from_page,
  78. cls.model.to_page,
  79. cls.model.retry_count,
  80. Document.kb_id,
  81. Document.parser_id,
  82. Document.parser_config,
  83. Document.name,
  84. Document.type,
  85. Document.location,
  86. Document.size,
  87. Knowledgebase.tenant_id,
  88. Knowledgebase.language,
  89. Knowledgebase.embd_id,
  90. Knowledgebase.pagerank,
  91. Knowledgebase.parser_config.alias("kb_parser_config"),
  92. Tenant.img2txt_id,
  93. Tenant.asr_id,
  94. Tenant.llm_id,
  95. cls.model.update_time,
  96. ]
  97. docs = (
  98. cls.model.select(*fields)
  99. .join(Document, on=(cls.model.doc_id == Document.id))
  100. .join(Knowledgebase, on=(Document.kb_id == Knowledgebase.id))
  101. .join(Tenant, on=(Knowledgebase.tenant_id == Tenant.id))
  102. .where(cls.model.id == task_id)
  103. )
  104. docs = list(docs.dicts())
  105. if not docs:
  106. return None
  107. msg = f"\n{datetime.now().strftime('%H:%M:%S')} Task has been received."
  108. prog = random.random() / 10.0
  109. if docs[0]["retry_count"] >= 3:
  110. msg = "\nERROR: Task is abandoned after 3 times attempts."
  111. prog = -1
  112. cls.model.update(
  113. progress_msg=cls.model.progress_msg + msg,
  114. progress=prog,
  115. retry_count=docs[0]["retry_count"] + 1,
  116. ).where(cls.model.id == docs[0]["id"]).execute()
  117. if docs[0]["retry_count"] >= 3:
  118. return None
  119. return docs[0]
  120. @classmethod
  121. @DB.connection_context()
  122. def get_tasks(cls, doc_id: str):
  123. """Retrieve all tasks associated with a document.
  124. This method fetches all processing tasks for a given document, ordered by page
  125. number and creation time. It includes task progress and chunk information.
  126. Args:
  127. doc_id (str): The unique identifier of the document.
  128. Returns:
  129. list[dict]: List of task dictionaries containing task details.
  130. Returns None if no tasks are found.
  131. """
  132. fields = [
  133. cls.model.id,
  134. cls.model.from_page,
  135. cls.model.progress,
  136. cls.model.digest,
  137. cls.model.chunk_ids,
  138. ]
  139. tasks = (
  140. cls.model.select(*fields).order_by(cls.model.from_page.asc(), cls.model.create_time.desc())
  141. .where(cls.model.doc_id == doc_id)
  142. )
  143. tasks = list(tasks.dicts())
  144. if not tasks:
  145. return None
  146. return tasks
  147. @classmethod
  148. @DB.connection_context()
  149. def update_chunk_ids(cls, id: str, chunk_ids: str):
  150. """Update the chunk IDs associated with a task.
  151. This method updates the chunk_ids field of a task, which stores the IDs of
  152. processed document chunks in a space-separated string format.
  153. Args:
  154. id (str): The unique identifier of the task.
  155. chunk_ids (str): Space-separated string of chunk identifiers.
  156. """
  157. cls.model.update(chunk_ids=chunk_ids).where(cls.model.id == id).execute()
  158. @classmethod
  159. @DB.connection_context()
  160. def get_ongoing_doc_name(cls):
  161. """Get names of documents that are currently being processed.
  162. This method retrieves information about documents that are in the processing state,
  163. including their locations and associated IDs. It uses database locking to ensure
  164. thread safety when accessing the task information.
  165. Returns:
  166. list[tuple]: A list of tuples, each containing (parent_id/kb_id, location)
  167. for documents currently being processed. Returns empty list if
  168. no documents are being processed.
  169. """
  170. with DB.lock("get_task", -1):
  171. docs = (
  172. cls.model.select(
  173. *[Document.id, Document.kb_id, Document.location, File.parent_id]
  174. )
  175. .join(Document, on=(cls.model.doc_id == Document.id))
  176. .join(
  177. File2Document,
  178. on=(File2Document.document_id == Document.id),
  179. join_type=JOIN.LEFT_OUTER,
  180. )
  181. .join(
  182. File,
  183. on=(File2Document.file_id == File.id),
  184. join_type=JOIN.LEFT_OUTER,
  185. )
  186. .where(
  187. Document.status == StatusEnum.VALID.value,
  188. Document.run == TaskStatus.RUNNING.value,
  189. ~(Document.type == FileType.VIRTUAL.value),
  190. cls.model.progress < 1,
  191. cls.model.create_time >= current_timestamp() - 1000 * 600,
  192. )
  193. )
  194. docs = list(docs.dicts())
  195. if not docs:
  196. return []
  197. return list(
  198. set(
  199. [
  200. (
  201. d["parent_id"] if d["parent_id"] else d["kb_id"],
  202. d["location"],
  203. )
  204. for d in docs
  205. ]
  206. )
  207. )
  208. @classmethod
  209. @DB.connection_context()
  210. def do_cancel(cls, id):
  211. """Check if a task should be cancelled based on its document status.
  212. This method determines whether a task should be cancelled by checking the
  213. associated document's run status and progress. A task should be cancelled
  214. if its document is marked for cancellation or has negative progress.
  215. Args:
  216. id (str): The unique identifier of the task to check.
  217. Returns:
  218. bool: True if the task should be cancelled, False otherwise.
  219. """
  220. task = cls.model.get_by_id(id)
  221. _, doc = DocumentService.get_by_id(task.doc_id)
  222. return doc.run == TaskStatus.CANCEL.value or doc.progress < 0
  223. @classmethod
  224. @DB.connection_context()
  225. def update_progress(cls, id, info):
  226. """Update the progress information for a task.
  227. This method updates both the progress message and completion percentage of a task.
  228. It handles platform-specific behavior (macOS vs others) and uses database locking
  229. when necessary to ensure thread safety.
  230. Args:
  231. id (str): The unique identifier of the task to update.
  232. info (dict): Dictionary containing progress information with keys:
  233. - progress_msg (str, optional): Progress message to append
  234. - progress (float, optional): Progress percentage (0.0 to 1.0)
  235. """
  236. if os.environ.get("MACOS"):
  237. if info["progress_msg"]:
  238. task = cls.model.get_by_id(id)
  239. progress_msg = trim_header_by_lines(task.progress_msg + "\n" + info["progress_msg"], 3000)
  240. cls.model.update(progress_msg=progress_msg).where(cls.model.id == id).execute()
  241. if "progress" in info:
  242. cls.model.update(progress=info["progress"]).where(
  243. cls.model.id == id
  244. ).execute()
  245. return
  246. with DB.lock("update_progress", -1):
  247. if info["progress_msg"]:
  248. task = cls.model.get_by_id(id)
  249. progress_msg = trim_header_by_lines(task.progress_msg + "\n" + info["progress_msg"], 3000)
  250. cls.model.update(progress_msg=progress_msg).where(cls.model.id == id).execute()
  251. if "progress" in info:
  252. cls.model.update(progress=info["progress"]).where(
  253. cls.model.id == id
  254. ).execute()
  255. def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
  256. """Create and queue document processing tasks.
  257. This function creates processing tasks for a document based on its type and configuration.
  258. It handles different document types (PDF, Excel, etc.) differently and manages task
  259. chunking and configuration. It also implements task reuse optimization by checking
  260. for previously completed tasks.
  261. Args:
  262. doc (dict): Document dictionary containing metadata and configuration.
  263. bucket (str): Storage bucket name where the document is stored.
  264. name (str): File name of the document.
  265. priority (int, optional): Priority level for task queueing (default is 0).
  266. Note:
  267. - For PDF documents, tasks are created per page range based on configuration
  268. - For Excel documents, tasks are created per row range
  269. - Task digests are calculated for optimization and reuse
  270. - Previous task chunks may be reused if available
  271. """
  272. def new_task():
  273. return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}
  274. parse_task_array = []
  275. if doc["type"] == FileType.PDF.value:
  276. file_bin = STORAGE_IMPL.get(bucket, name)
  277. do_layout = doc["parser_config"].get("layout_recognize", "DeepDOC")
  278. pages = PdfParser.total_page_number(doc["name"], file_bin)
  279. page_size = doc["parser_config"].get("task_page_size", 12)
  280. if doc["parser_id"] == "paper":
  281. page_size = doc["parser_config"].get("task_page_size", 22)
  282. if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC":
  283. page_size = 10 ** 9
  284. page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)]
  285. for s, e in page_ranges:
  286. s -= 1
  287. s = max(0, s)
  288. e = min(e - 1, pages)
  289. for p in range(s, e, page_size):
  290. task = new_task()
  291. task["from_page"] = p
  292. task["to_page"] = min(p + page_size, e)
  293. parse_task_array.append(task)
  294. elif doc["parser_id"] == "table":
  295. file_bin = STORAGE_IMPL.get(bucket, name)
  296. rn = RAGFlowExcelParser.row_number(doc["name"], file_bin)
  297. for i in range(0, rn, 3000):
  298. task = new_task()
  299. task["from_page"] = i
  300. task["to_page"] = min(i + 3000, rn)
  301. parse_task_array.append(task)
  302. else:
  303. parse_task_array.append(new_task())
  304. chunking_config = DocumentService.get_chunking_config(doc["id"])
  305. for task in parse_task_array:
  306. hasher = xxhash.xxh64()
  307. for field in sorted(chunking_config.keys()):
  308. if field == "parser_config":
  309. for k in ["raptor", "graphrag"]:
  310. if k in chunking_config[field]:
  311. del chunking_config[field][k]
  312. hasher.update(str(chunking_config[field]).encode("utf-8"))
  313. for field in ["doc_id", "from_page", "to_page"]:
  314. hasher.update(str(task.get(field, "")).encode("utf-8"))
  315. task_digest = hasher.hexdigest()
  316. task["digest"] = task_digest
  317. task["progress"] = 0.0
  318. task["priority"] = priority
  319. prev_tasks = TaskService.get_tasks(doc["id"])
  320. ck_num = 0
  321. if prev_tasks:
  322. for task in parse_task_array:
  323. ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
  324. TaskService.filter_delete([Task.doc_id == doc["id"]])
  325. chunk_ids = []
  326. for task in prev_tasks:
  327. if task["chunk_ids"]:
  328. chunk_ids.extend(task["chunk_ids"].split())
  329. if chunk_ids:
  330. settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]),
  331. chunking_config["kb_id"])
  332. DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})
  333. bulk_insert_into_db(Task, parse_task_array, True)
  334. DocumentService.begin2parse(doc["id"])
  335. unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]
  336. for unfinished_task in unfinished_task_array:
  337. assert REDIS_CONN.queue_product(
  338. get_svr_queue_name(priority), message=unfinished_task
  339. ), "Can't access Redis. Please check the Redis' status."
  340. def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict):
  341. """Attempt to reuse chunks from previous tasks for optimization.
  342. This function checks if chunks from previously completed tasks can be reused for
  343. the current task, which can significantly improve processing efficiency. It matches
  344. tasks based on page ranges and configuration digests.
  345. Args:
  346. task (dict): Current task dictionary to potentially reuse chunks for.
  347. prev_tasks (list[dict]): List of previous task dictionaries to check for reuse.
  348. chunking_config (dict): Configuration dictionary for chunk processing.
  349. Returns:
  350. int: Number of chunks successfully reused. Returns 0 if no chunks could be reused.
  351. Note:
  352. Chunks can only be reused if:
  353. - A previous task exists with matching page range and configuration digest
  354. - The previous task was completed successfully (progress = 1.0)
  355. - The previous task has valid chunk IDs
  356. """
  357. idx = 0
  358. while idx < len(prev_tasks):
  359. prev_task = prev_tasks[idx]
  360. if prev_task.get("from_page", 0) == task.get("from_page", 0) \
  361. and prev_task.get("digest", 0) == task.get("digest", ""):
  362. break
  363. idx += 1
  364. if idx >= len(prev_tasks):
  365. return 0
  366. prev_task = prev_tasks[idx]
  367. if prev_task["progress"] < 1.0 or not prev_task["chunk_ids"]:
  368. return 0
  369. task["chunk_ids"] = prev_task["chunk_ids"]
  370. task["progress"] = 1.0
  371. if "from_page" in task and "to_page" in task and int(task['to_page']) - int(task['from_page']) >= 10 ** 6:
  372. task["progress_msg"] = f"Page({task['from_page']}~{task['to_page']}): "
  373. else:
  374. task["progress_msg"] = ""
  375. task["progress_msg"] = " ".join(
  376. [datetime.now().strftime("%H:%M:%S"), task["progress_msg"], "Reused previous task's chunks."])
  377. prev_task["chunk_ids"] = ""
  378. return len(task["chunk_ids"].split())