您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

indexing_runner.py 32KB

Introduce Plugins (#13836) Signed-off-by: yihong0618 <zouzou0208@gmail.com> Signed-off-by: -LAN- <laipz8200@outlook.com> Signed-off-by: xhe <xw897002528@gmail.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: takatost <takatost@gmail.com> Co-authored-by: kurokobo <kuro664@gmail.com> Co-authored-by: Novice Lee <novicelee@NoviPro.local> Co-authored-by: zxhlyh <jasonapring2015@outlook.com> Co-authored-by: AkaraChen <akarachen@outlook.com> Co-authored-by: Yi <yxiaoisme@gmail.com> Co-authored-by: Joel <iamjoel007@gmail.com> Co-authored-by: JzoNg <jzongcode@gmail.com> Co-authored-by: twwu <twwu@dify.ai> Co-authored-by: Hiroshi Fujita <fujita-h@users.noreply.github.com> Co-authored-by: AkaraChen <85140972+AkaraChen@users.noreply.github.com> Co-authored-by: NFish <douxc512@gmail.com> Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com> Co-authored-by: 非法操作 <hjlarry@163.com> Co-authored-by: Novice <857526207@qq.com> Co-authored-by: Hiroki Nagai <82458324+nagaihiroki-git@users.noreply.github.com> Co-authored-by: Gen Sato <52241300+halogen22@users.noreply.github.com> Co-authored-by: eux <euxuuu@gmail.com> Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com> Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com> Co-authored-by: lotsik <lotsik@mail.ru> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: nite-knite <nkCoding@gmail.com> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: gakkiyomi <gakkiyomi@aliyun.com> Co-authored-by: CN-P5 <heibai2006@gmail.com> Co-authored-by: CN-P5 <heibai2006@qq.com> Co-authored-by: Chuehnone <1897025+chuehnone@users.noreply.github.com> Co-authored-by: yihong <zouzou0208@gmail.com> Co-authored-by: Kevin9703 <51311316+Kevin9703@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: Boris Feld <lothiraldan@gmail.com> Co-authored-by: mbo <himabo@gmail.com> Co-authored-by: mabo <mabo@aeyes.ai> Co-authored-by: Warren Chen <warren.chen830@gmail.com> Co-authored-by: JzoNgKVO <27049666+JzoNgKVO@users.noreply.github.com> Co-authored-by: jiandanfeng <chenjh3@wangsu.com> Co-authored-by: zhu-an <70234959+xhdd123321@users.noreply.github.com> Co-authored-by: zhaoqingyu.1075 <zhaoqingyu.1075@bytedance.com> Co-authored-by: 海狸大師 <86974027+yenslife@users.noreply.github.com> Co-authored-by: Xu Song <xusong.vip@gmail.com> Co-authored-by: rayshaw001 <396301947@163.com> Co-authored-by: Ding Jiatong <dingjiatong@gmail.com> Co-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: JasonVV <jasonwangiii@outlook.com> Co-authored-by: le0zh <newlight@qq.com> Co-authored-by: zhuxinliang <zhuxinliang@didiglobal.com> Co-authored-by: k-zaku <zaku99@outlook.jp> Co-authored-by: luckylhb90 <luckylhb90@gmail.com> Co-authored-by: hobo.l <hobo.l@binance.com> Co-authored-by: jiangbo721 <365065261@qq.com> Co-authored-by: 刘江波 <jiangbo721@163.com> Co-authored-by: Shun Miyazawa <34241526+miya@users.noreply.github.com> Co-authored-by: EricPan <30651140+Egfly@users.noreply.github.com> Co-authored-by: crazywoola <427733928@qq.com> Co-authored-by: sino <sino2322@gmail.com> Co-authored-by: Jhvcc <37662342+Jhvcc@users.noreply.github.com> Co-authored-by: lowell <lowell.hu@zkteco.in> Co-authored-by: Boris Polonsky <BorisPolonsky@users.noreply.github.com> Co-authored-by: Ademílson Tonato <ademilsonft@outlook.com> Co-authored-by: Ademílson Tonato <ademilson.tonato@refurbed.com> Co-authored-by: IWAI, Masaharu <iwaim.sub@gmail.com> Co-authored-by: Yueh-Po Peng (Yabi) <94939112+y10ab1@users.noreply.github.com> Co-authored-by: Jason <ggbbddjm@gmail.com> Co-authored-by: Xin Zhang <sjhpzx@gmail.com> Co-authored-by: yjc980121 <3898524+yjc980121@users.noreply.github.com> Co-authored-by: heyszt <36215648+hieheihei@users.noreply.github.com> Co-authored-by: Abdullah AlOsaimi <osaimiacc@gmail.com> Co-authored-by: Abdullah AlOsaimi <189027247+osaimi@users.noreply.github.com> Co-authored-by: Yingchun Lai <laiyingchun@apache.org> Co-authored-by: Hash Brown <hi@xzd.me> Co-authored-by: zuodongxu <192560071+zuodongxu@users.noreply.github.com> Co-authored-by: Masashi Tomooka <tmokmss@users.noreply.github.com> Co-authored-by: aplio <ryo.091219@gmail.com> Co-authored-by: Obada Khalili <54270856+obadakhalili@users.noreply.github.com> Co-authored-by: Nam Vu <zuzoovn@gmail.com> Co-authored-by: Kei YAMAZAKI <1715090+kei-yamazaki@users.noreply.github.com> Co-authored-by: TechnoHouse <13776377+deephbz@users.noreply.github.com> Co-authored-by: Riddhimaan-Senapati <114703025+Riddhimaan-Senapati@users.noreply.github.com> Co-authored-by: MaFee921 <31881301+2284730142@users.noreply.github.com> Co-authored-by: te-chan <t-nakanome@sakura-is.co.jp> Co-authored-by: HQidea <HQidea@users.noreply.github.com> Co-authored-by: Joshbly <36315710+Joshbly@users.noreply.github.com> Co-authored-by: xhe <xw897002528@gmail.com> Co-authored-by: weiwenyan-dev <154779315+weiwenyan-dev@users.noreply.github.com> Co-authored-by: ex_wenyan.wei <ex_wenyan.wei@tcl.com> Co-authored-by: engchina <12236799+engchina@users.noreply.github.com> Co-authored-by: engchina <atjapan2015@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: 呆萌闷油瓶 <253605712@qq.com> Co-authored-by: Kemal <kemalmeler@outlook.com> Co-authored-by: Lazy_Frog <4590648+lazyFrogLOL@users.noreply.github.com> Co-authored-by: Yi Xiao <54782454+YIXIAO0@users.noreply.github.com> Co-authored-by: Steven sun <98230804+Tuyohai@users.noreply.github.com> Co-authored-by: steven <sunzwj@digitalchina.com> Co-authored-by: Kalo Chin <91766386+fdb02983rhy@users.noreply.github.com> Co-authored-by: Katy Tao <34019945+KatyTao@users.noreply.github.com> Co-authored-by: depy <42985524+h4ckdepy@users.noreply.github.com> Co-authored-by: 胡春东 <gycm520@gmail.com> Co-authored-by: Junjie.M <118170653@qq.com> Co-authored-by: MuYu <mr.muzea@gmail.com> Co-authored-by: Naoki Takashima <39912547+takatea@users.noreply.github.com> Co-authored-by: Summer-Gu <37869445+gubinjie@users.noreply.github.com> Co-authored-by: Fei He <droxer.he@gmail.com> Co-authored-by: ybalbert001 <120714773+ybalbert001@users.noreply.github.com> Co-authored-by: Yuanbo Li <ybalbert@amazon.com> Co-authored-by: douxc <7553076+douxc@users.noreply.github.com> Co-authored-by: liuzhenghua <1090179900@qq.com> Co-authored-by: Wu Jiayang <62842862+Wu-Jiayang@users.noreply.github.com> Co-authored-by: Your Name <you@example.com> Co-authored-by: kimjion <45935338+kimjion@users.noreply.github.com> Co-authored-by: AugNSo <song.tiankai@icloud.com> Co-authored-by: llinvokerl <38915183+llinvokerl@users.noreply.github.com> Co-authored-by: liusurong.lsr <liusurong.lsr@alibaba-inc.com> Co-authored-by: Vasu Negi <vasu-negi@users.noreply.github.com> Co-authored-by: Hundredwz <1808096180@qq.com> Co-authored-by: Xiyuan Chen <52963600+GareArc@users.noreply.github.com>
8 个月前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. import concurrent.futures
  2. import json
  3. import logging
  4. import re
  5. import threading
  6. import time
  7. import uuid
  8. from typing import Any, Optional, cast
  9. from flask import current_app
  10. from sqlalchemy import select
  11. from sqlalchemy.orm.exc import ObjectDeletedError
  12. from configs import dify_config
  13. from core.entities.knowledge_entities import IndexingEstimate, PreviewDetail, QAPreviewDetail
  14. from core.errors.error import ProviderTokenNotInitError
  15. from core.model_manager import ModelInstance, ModelManager
  16. from core.model_runtime.entities.model_entities import ModelType
  17. from core.rag.cleaner.clean_processor import CleanProcessor
  18. from core.rag.datasource.keyword.keyword_factory import Keyword
  19. from core.rag.docstore.dataset_docstore import DatasetDocumentStore
  20. from core.rag.extractor.entity.extract_setting import ExtractSetting
  21. from core.rag.index_processor.constant.index_type import IndexType
  22. from core.rag.index_processor.index_processor_base import BaseIndexProcessor
  23. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  24. from core.rag.models.document import ChildDocument, Document
  25. from core.rag.splitter.fixed_text_splitter import (
  26. EnhanceRecursiveCharacterTextSplitter,
  27. FixedRecursiveCharacterTextSplitter,
  28. )
  29. from core.rag.splitter.text_splitter import TextSplitter
  30. from core.tools.utils.web_reader_tool import get_image_upload_file_ids
  31. from extensions.ext_database import db
  32. from extensions.ext_redis import redis_client
  33. from extensions.ext_storage import storage
  34. from libs import helper
  35. from libs.datetime_utils import naive_utc_now
  36. from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegment
  37. from models.dataset import Document as DatasetDocument
  38. from models.model import UploadFile
  39. from services.feature_service import FeatureService
  40. logger = logging.getLogger(__name__)
  41. class IndexingRunner:
  42. def __init__(self):
  43. self.storage = storage
  44. self.model_manager = ModelManager()
  45. def run(self, dataset_documents: list[DatasetDocument]):
  46. """Run the indexing process."""
  47. for dataset_document in dataset_documents:
  48. try:
  49. # get dataset
  50. dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first()
  51. if not dataset:
  52. raise ValueError("no dataset found")
  53. # get the process rule
  54. stmt = select(DatasetProcessRule).where(
  55. DatasetProcessRule.id == dataset_document.dataset_process_rule_id
  56. )
  57. processing_rule = db.session.scalar(stmt)
  58. if not processing_rule:
  59. raise ValueError("no process rule found")
  60. index_type = dataset_document.doc_form
  61. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  62. # extract
  63. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  64. # transform
  65. documents = self._transform(
  66. index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict()
  67. )
  68. # save segment
  69. self._load_segments(dataset, dataset_document, documents)
  70. # load
  71. self._load(
  72. index_processor=index_processor,
  73. dataset=dataset,
  74. dataset_document=dataset_document,
  75. documents=documents,
  76. )
  77. except DocumentIsPausedError:
  78. raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
  79. except ProviderTokenNotInitError as e:
  80. dataset_document.indexing_status = "error"
  81. dataset_document.error = str(e.description)
  82. dataset_document.stopped_at = naive_utc_now()
  83. db.session.commit()
  84. except ObjectDeletedError:
  85. logger.warning("Document deleted, document id: %s", dataset_document.id)
  86. except Exception as e:
  87. logger.exception("consume document failed")
  88. dataset_document.indexing_status = "error"
  89. dataset_document.error = str(e)
  90. dataset_document.stopped_at = naive_utc_now()
  91. db.session.commit()
  92. def run_in_splitting_status(self, dataset_document: DatasetDocument):
  93. """Run the indexing process when the index_status is splitting."""
  94. try:
  95. # get dataset
  96. dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first()
  97. if not dataset:
  98. raise ValueError("no dataset found")
  99. # get exist document_segment list and delete
  100. document_segments = (
  101. db.session.query(DocumentSegment)
  102. .filter_by(dataset_id=dataset.id, document_id=dataset_document.id)
  103. .all()
  104. )
  105. for document_segment in document_segments:
  106. db.session.delete(document_segment)
  107. if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:
  108. # delete child chunks
  109. db.session.query(ChildChunk).where(ChildChunk.segment_id == document_segment.id).delete()
  110. db.session.commit()
  111. # get the process rule
  112. stmt = select(DatasetProcessRule).where(DatasetProcessRule.id == dataset_document.dataset_process_rule_id)
  113. processing_rule = db.session.scalar(stmt)
  114. if not processing_rule:
  115. raise ValueError("no process rule found")
  116. index_type = dataset_document.doc_form
  117. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  118. # extract
  119. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  120. # transform
  121. documents = self._transform(
  122. index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict()
  123. )
  124. # save segment
  125. self._load_segments(dataset, dataset_document, documents)
  126. # load
  127. self._load(
  128. index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents
  129. )
  130. except DocumentIsPausedError:
  131. raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
  132. except ProviderTokenNotInitError as e:
  133. dataset_document.indexing_status = "error"
  134. dataset_document.error = str(e.description)
  135. dataset_document.stopped_at = naive_utc_now()
  136. db.session.commit()
  137. except Exception as e:
  138. logger.exception("consume document failed")
  139. dataset_document.indexing_status = "error"
  140. dataset_document.error = str(e)
  141. dataset_document.stopped_at = naive_utc_now()
  142. db.session.commit()
  143. def run_in_indexing_status(self, dataset_document: DatasetDocument):
  144. """Run the indexing process when the index_status is indexing."""
  145. try:
  146. # get dataset
  147. dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first()
  148. if not dataset:
  149. raise ValueError("no dataset found")
  150. # get exist document_segment list and delete
  151. document_segments = (
  152. db.session.query(DocumentSegment)
  153. .filter_by(dataset_id=dataset.id, document_id=dataset_document.id)
  154. .all()
  155. )
  156. documents = []
  157. if document_segments:
  158. for document_segment in document_segments:
  159. # transform segment to node
  160. if document_segment.status != "completed":
  161. document = Document(
  162. page_content=document_segment.content,
  163. metadata={
  164. "doc_id": document_segment.index_node_id,
  165. "doc_hash": document_segment.index_node_hash,
  166. "document_id": document_segment.document_id,
  167. "dataset_id": document_segment.dataset_id,
  168. },
  169. )
  170. if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:
  171. child_chunks = document_segment.get_child_chunks()
  172. if child_chunks:
  173. child_documents = []
  174. for child_chunk in child_chunks:
  175. child_document = ChildDocument(
  176. page_content=child_chunk.content,
  177. metadata={
  178. "doc_id": child_chunk.index_node_id,
  179. "doc_hash": child_chunk.index_node_hash,
  180. "document_id": document_segment.document_id,
  181. "dataset_id": document_segment.dataset_id,
  182. },
  183. )
  184. child_documents.append(child_document)
  185. document.children = child_documents
  186. documents.append(document)
  187. # build index
  188. index_type = dataset_document.doc_form
  189. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  190. self._load(
  191. index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents
  192. )
  193. except DocumentIsPausedError:
  194. raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
  195. except ProviderTokenNotInitError as e:
  196. dataset_document.indexing_status = "error"
  197. dataset_document.error = str(e.description)
  198. dataset_document.stopped_at = naive_utc_now()
  199. db.session.commit()
  200. except Exception as e:
  201. logger.exception("consume document failed")
  202. dataset_document.indexing_status = "error"
  203. dataset_document.error = str(e)
  204. dataset_document.stopped_at = naive_utc_now()
  205. db.session.commit()
  206. def indexing_estimate(
  207. self,
  208. tenant_id: str,
  209. extract_settings: list[ExtractSetting],
  210. tmp_processing_rule: dict,
  211. doc_form: Optional[str] = None,
  212. doc_language: str = "English",
  213. dataset_id: Optional[str] = None,
  214. indexing_technique: str = "economy",
  215. ) -> IndexingEstimate:
  216. """
  217. Estimate the indexing for the document.
  218. """
  219. # check document limit
  220. features = FeatureService.get_features(tenant_id)
  221. if features.billing.enabled:
  222. count = len(extract_settings)
  223. batch_upload_limit = dify_config.BATCH_UPLOAD_LIMIT
  224. if count > batch_upload_limit:
  225. raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
  226. embedding_model_instance = None
  227. if dataset_id:
  228. dataset = db.session.query(Dataset).filter_by(id=dataset_id).first()
  229. if not dataset:
  230. raise ValueError("Dataset not found.")
  231. if dataset.indexing_technique == "high_quality" or indexing_technique == "high_quality":
  232. if dataset.embedding_model_provider:
  233. embedding_model_instance = self.model_manager.get_model_instance(
  234. tenant_id=tenant_id,
  235. provider=dataset.embedding_model_provider,
  236. model_type=ModelType.TEXT_EMBEDDING,
  237. model=dataset.embedding_model,
  238. )
  239. else:
  240. embedding_model_instance = self.model_manager.get_default_model_instance(
  241. tenant_id=tenant_id,
  242. model_type=ModelType.TEXT_EMBEDDING,
  243. )
  244. else:
  245. if indexing_technique == "high_quality":
  246. embedding_model_instance = self.model_manager.get_default_model_instance(
  247. tenant_id=tenant_id,
  248. model_type=ModelType.TEXT_EMBEDDING,
  249. )
  250. preview_texts = [] # type: ignore
  251. total_segments = 0
  252. index_type = doc_form
  253. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  254. for extract_setting in extract_settings:
  255. # extract
  256. processing_rule = DatasetProcessRule(
  257. mode=tmp_processing_rule["mode"], rules=json.dumps(tmp_processing_rule["rules"])
  258. )
  259. text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])
  260. documents = index_processor.transform(
  261. text_docs,
  262. embedding_model_instance=embedding_model_instance,
  263. process_rule=processing_rule.to_dict(),
  264. tenant_id=tenant_id,
  265. doc_language=doc_language,
  266. preview=True,
  267. )
  268. total_segments += len(documents)
  269. for document in documents:
  270. if len(preview_texts) < 10:
  271. if doc_form and doc_form == "qa_model":
  272. preview_detail = QAPreviewDetail(
  273. question=document.page_content, answer=document.metadata.get("answer") or ""
  274. )
  275. preview_texts.append(preview_detail)
  276. else:
  277. preview_detail = PreviewDetail(content=document.page_content) # type: ignore
  278. if document.children:
  279. preview_detail.child_chunks = [child.page_content for child in document.children] # type: ignore
  280. preview_texts.append(preview_detail)
  281. # delete image files and related db records
  282. image_upload_file_ids = get_image_upload_file_ids(document.page_content)
  283. for upload_file_id in image_upload_file_ids:
  284. stmt = select(UploadFile).where(UploadFile.id == upload_file_id)
  285. image_file = db.session.scalar(stmt)
  286. if image_file is None:
  287. continue
  288. try:
  289. storage.delete(image_file.key)
  290. except Exception:
  291. logger.exception(
  292. "Delete image_files failed while indexing_estimate, \
  293. image_upload_file_is: %s",
  294. upload_file_id,
  295. )
  296. db.session.delete(image_file)
  297. if doc_form and doc_form == "qa_model":
  298. return IndexingEstimate(total_segments=total_segments * 20, qa_preview=preview_texts, preview=[])
  299. return IndexingEstimate(total_segments=total_segments, preview=preview_texts) # type: ignore
  300. def _extract(
  301. self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict
  302. ) -> list[Document]:
  303. # load file
  304. if dataset_document.data_source_type not in {"upload_file", "notion_import", "website_crawl"}:
  305. return []
  306. data_source_info = dataset_document.data_source_info_dict
  307. text_docs = []
  308. if dataset_document.data_source_type == "upload_file":
  309. if not data_source_info or "upload_file_id" not in data_source_info:
  310. raise ValueError("no upload file found")
  311. stmt = select(UploadFile).where(UploadFile.id == data_source_info["upload_file_id"])
  312. file_detail = db.session.scalars(stmt).one_or_none()
  313. if file_detail:
  314. extract_setting = ExtractSetting(
  315. datasource_type="upload_file", upload_file=file_detail, document_model=dataset_document.doc_form
  316. )
  317. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])
  318. elif dataset_document.data_source_type == "notion_import":
  319. if (
  320. not data_source_info
  321. or "notion_workspace_id" not in data_source_info
  322. or "notion_page_id" not in data_source_info
  323. ):
  324. raise ValueError("no notion import info found")
  325. extract_setting = ExtractSetting(
  326. datasource_type="notion_import",
  327. notion_info={
  328. "notion_workspace_id": data_source_info["notion_workspace_id"],
  329. "notion_obj_id": data_source_info["notion_page_id"],
  330. "notion_page_type": data_source_info["type"],
  331. "document": dataset_document,
  332. "tenant_id": dataset_document.tenant_id,
  333. },
  334. document_model=dataset_document.doc_form,
  335. )
  336. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])
  337. elif dataset_document.data_source_type == "website_crawl":
  338. if (
  339. not data_source_info
  340. or "provider" not in data_source_info
  341. or "url" not in data_source_info
  342. or "job_id" not in data_source_info
  343. ):
  344. raise ValueError("no website import info found")
  345. extract_setting = ExtractSetting(
  346. datasource_type="website_crawl",
  347. website_info={
  348. "provider": data_source_info["provider"],
  349. "job_id": data_source_info["job_id"],
  350. "tenant_id": dataset_document.tenant_id,
  351. "url": data_source_info["url"],
  352. "mode": data_source_info["mode"],
  353. "only_main_content": data_source_info["only_main_content"],
  354. },
  355. document_model=dataset_document.doc_form,
  356. )
  357. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])
  358. # update document status to splitting
  359. self._update_document_index_status(
  360. document_id=dataset_document.id,
  361. after_indexing_status="splitting",
  362. extra_update_params={
  363. DatasetDocument.word_count: sum(len(text_doc.page_content) for text_doc in text_docs),
  364. DatasetDocument.parsing_completed_at: naive_utc_now(),
  365. },
  366. )
  367. # replace doc id to document model id
  368. text_docs = cast(list[Document], text_docs)
  369. for text_doc in text_docs:
  370. if text_doc.metadata is not None:
  371. text_doc.metadata["document_id"] = dataset_document.id
  372. text_doc.metadata["dataset_id"] = dataset_document.dataset_id
  373. return text_docs
  374. @staticmethod
  375. def filter_string(text):
  376. text = re.sub(r"<\|", "<", text)
  377. text = re.sub(r"\|>", ">", text)
  378. text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\xEF\xBF\xBE]", "", text)
  379. # Unicode U+FFFE
  380. text = re.sub("\ufffe", "", text)
  381. return text
  382. @staticmethod
  383. def _get_splitter(
  384. processing_rule_mode: str,
  385. max_tokens: int,
  386. chunk_overlap: int,
  387. separator: str,
  388. embedding_model_instance: Optional[ModelInstance],
  389. ) -> TextSplitter:
  390. """
  391. Get the NodeParser object according to the processing rule.
  392. """
  393. if processing_rule_mode in ["custom", "hierarchical"]:
  394. # The user-defined segmentation rule
  395. max_segmentation_tokens_length = dify_config.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH
  396. if max_tokens < 50 or max_tokens > max_segmentation_tokens_length:
  397. raise ValueError(f"Custom segment length should be between 50 and {max_segmentation_tokens_length}.")
  398. if separator:
  399. separator = separator.replace("\\n", "\n")
  400. character_splitter = FixedRecursiveCharacterTextSplitter.from_encoder(
  401. chunk_size=max_tokens,
  402. chunk_overlap=chunk_overlap,
  403. fixed_separator=separator,
  404. separators=["\n\n", "。", ". ", " ", ""],
  405. embedding_model_instance=embedding_model_instance,
  406. )
  407. else:
  408. # Automatic segmentation
  409. automatic_rules: dict[str, Any] = dict(DatasetProcessRule.AUTOMATIC_RULES["segmentation"])
  410. character_splitter = EnhanceRecursiveCharacterTextSplitter.from_encoder(
  411. chunk_size=automatic_rules["max_tokens"],
  412. chunk_overlap=automatic_rules["chunk_overlap"],
  413. separators=["\n\n", "。", ". ", " ", ""],
  414. embedding_model_instance=embedding_model_instance,
  415. )
  416. return character_splitter # type: ignore
  417. def _split_to_documents_for_estimate(
  418. self, text_docs: list[Document], splitter: TextSplitter, processing_rule: DatasetProcessRule
  419. ) -> list[Document]:
  420. """
  421. Split the text documents into nodes.
  422. """
  423. all_documents: list[Document] = []
  424. for text_doc in text_docs:
  425. # document clean
  426. document_text = self._document_clean(text_doc.page_content, processing_rule)
  427. text_doc.page_content = document_text
  428. # parse document to nodes
  429. documents = splitter.split_documents([text_doc])
  430. split_documents = []
  431. for document in documents:
  432. if document.page_content is None or not document.page_content.strip():
  433. continue
  434. if document.metadata is not None:
  435. doc_id = str(uuid.uuid4())
  436. hash = helper.generate_text_hash(document.page_content)
  437. document.metadata["doc_id"] = doc_id
  438. document.metadata["doc_hash"] = hash
  439. split_documents.append(document)
  440. all_documents.extend(split_documents)
  441. return all_documents
  442. @staticmethod
  443. def _document_clean(text: str, processing_rule: DatasetProcessRule) -> str:
  444. """
  445. Clean the document text according to the processing rules.
  446. """
  447. if processing_rule.mode == "automatic":
  448. rules = DatasetProcessRule.AUTOMATIC_RULES
  449. else:
  450. rules = json.loads(processing_rule.rules) if processing_rule.rules else {}
  451. document_text = CleanProcessor.clean(text, {"rules": rules})
  452. return document_text
  453. @staticmethod
  454. def format_split_text(text: str) -> list[QAPreviewDetail]:
  455. regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q\d+:|$)"
  456. matches = re.findall(regex, text, re.UNICODE)
  457. return [QAPreviewDetail(question=q, answer=re.sub(r"\n\s*", "\n", a.strip())) for q, a in matches if q and a]
  458. def _load(
  459. self,
  460. index_processor: BaseIndexProcessor,
  461. dataset: Dataset,
  462. dataset_document: DatasetDocument,
  463. documents: list[Document],
  464. ) -> None:
  465. """
  466. insert index and update document/segment status to completed
  467. """
  468. embedding_model_instance = None
  469. if dataset.indexing_technique == "high_quality":
  470. embedding_model_instance = self.model_manager.get_model_instance(
  471. tenant_id=dataset.tenant_id,
  472. provider=dataset.embedding_model_provider,
  473. model_type=ModelType.TEXT_EMBEDDING,
  474. model=dataset.embedding_model,
  475. )
  476. # chunk nodes by chunk size
  477. indexing_start_at = time.perf_counter()
  478. tokens = 0
  479. if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
  480. # create keyword index
  481. create_keyword_thread = threading.Thread(
  482. target=self._process_keyword_index,
  483. args=(current_app._get_current_object(), dataset.id, dataset_document.id, documents), # type: ignore
  484. )
  485. create_keyword_thread.start()
  486. max_workers = 10
  487. if dataset.indexing_technique == "high_quality":
  488. with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  489. futures = []
  490. # Distribute documents into multiple groups based on the hash values of page_content
  491. # This is done to prevent multiple threads from processing the same document,
  492. # Thereby avoiding potential database insertion deadlocks
  493. document_groups: list[list[Document]] = [[] for _ in range(max_workers)]
  494. for document in documents:
  495. hash = helper.generate_text_hash(document.page_content)
  496. group_index = int(hash, 16) % max_workers
  497. document_groups[group_index].append(document)
  498. for chunk_documents in document_groups:
  499. if len(chunk_documents) == 0:
  500. continue
  501. futures.append(
  502. executor.submit(
  503. self._process_chunk,
  504. current_app._get_current_object(), # type: ignore
  505. index_processor,
  506. chunk_documents,
  507. dataset,
  508. dataset_document,
  509. embedding_model_instance,
  510. )
  511. )
  512. for future in futures:
  513. tokens += future.result()
  514. if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
  515. create_keyword_thread.join()
  516. indexing_end_at = time.perf_counter()
  517. # update document status to completed
  518. self._update_document_index_status(
  519. document_id=dataset_document.id,
  520. after_indexing_status="completed",
  521. extra_update_params={
  522. DatasetDocument.tokens: tokens,
  523. DatasetDocument.completed_at: naive_utc_now(),
  524. DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at,
  525. DatasetDocument.error: None,
  526. },
  527. )
  528. @staticmethod
  529. def _process_keyword_index(flask_app, dataset_id, document_id, documents):
  530. with flask_app.app_context():
  531. dataset = db.session.query(Dataset).filter_by(id=dataset_id).first()
  532. if not dataset:
  533. raise ValueError("no dataset found")
  534. keyword = Keyword(dataset)
  535. keyword.create(documents)
  536. if dataset.indexing_technique != "high_quality":
  537. document_ids = [document.metadata["doc_id"] for document in documents]
  538. db.session.query(DocumentSegment).where(
  539. DocumentSegment.document_id == document_id,
  540. DocumentSegment.dataset_id == dataset_id,
  541. DocumentSegment.index_node_id.in_(document_ids),
  542. DocumentSegment.status == "indexing",
  543. ).update(
  544. {
  545. DocumentSegment.status: "completed",
  546. DocumentSegment.enabled: True,
  547. DocumentSegment.completed_at: naive_utc_now(),
  548. }
  549. )
  550. db.session.commit()
  551. def _process_chunk(
  552. self, flask_app, index_processor, chunk_documents, dataset, dataset_document, embedding_model_instance
  553. ):
  554. with flask_app.app_context():
  555. # check document is paused
  556. self._check_document_paused_status(dataset_document.id)
  557. tokens = 0
  558. if embedding_model_instance:
  559. page_content_list = [document.page_content for document in chunk_documents]
  560. tokens += sum(embedding_model_instance.get_text_embedding_num_tokens(page_content_list))
  561. # load index
  562. index_processor.load(dataset, chunk_documents, with_keywords=False)
  563. document_ids = [document.metadata["doc_id"] for document in chunk_documents]
  564. db.session.query(DocumentSegment).where(
  565. DocumentSegment.document_id == dataset_document.id,
  566. DocumentSegment.dataset_id == dataset.id,
  567. DocumentSegment.index_node_id.in_(document_ids),
  568. DocumentSegment.status == "indexing",
  569. ).update(
  570. {
  571. DocumentSegment.status: "completed",
  572. DocumentSegment.enabled: True,
  573. DocumentSegment.completed_at: naive_utc_now(),
  574. }
  575. )
  576. db.session.commit()
  577. return tokens
  578. @staticmethod
  579. def _check_document_paused_status(document_id: str):
  580. indexing_cache_key = f"document_{document_id}_is_paused"
  581. result = redis_client.get(indexing_cache_key)
  582. if result:
  583. raise DocumentIsPausedError()
  584. @staticmethod
  585. def _update_document_index_status(
  586. document_id: str, after_indexing_status: str, extra_update_params: Optional[dict] = None
  587. ) -> None:
  588. """
  589. Update the document indexing status.
  590. """
  591. count = db.session.query(DatasetDocument).filter_by(id=document_id, is_paused=True).count()
  592. if count > 0:
  593. raise DocumentIsPausedError()
  594. document = db.session.query(DatasetDocument).filter_by(id=document_id).first()
  595. if not document:
  596. raise DocumentIsDeletedPausedError()
  597. update_params = {DatasetDocument.indexing_status: after_indexing_status}
  598. if extra_update_params:
  599. update_params.update(extra_update_params)
  600. db.session.query(DatasetDocument).filter_by(id=document_id).update(update_params) # type: ignore
  601. db.session.commit()
  602. @staticmethod
  603. def _update_segments_by_document(dataset_document_id: str, update_params: dict) -> None:
  604. """
  605. Update the document segment by document id.
  606. """
  607. db.session.query(DocumentSegment).filter_by(document_id=dataset_document_id).update(update_params)
  608. db.session.commit()
  609. def _transform(
  610. self,
  611. index_processor: BaseIndexProcessor,
  612. dataset: Dataset,
  613. text_docs: list[Document],
  614. doc_language: str,
  615. process_rule: dict,
  616. ) -> list[Document]:
  617. # get embedding model instance
  618. embedding_model_instance = None
  619. if dataset.indexing_technique == "high_quality":
  620. if dataset.embedding_model_provider:
  621. embedding_model_instance = self.model_manager.get_model_instance(
  622. tenant_id=dataset.tenant_id,
  623. provider=dataset.embedding_model_provider,
  624. model_type=ModelType.TEXT_EMBEDDING,
  625. model=dataset.embedding_model,
  626. )
  627. else:
  628. embedding_model_instance = self.model_manager.get_default_model_instance(
  629. tenant_id=dataset.tenant_id,
  630. model_type=ModelType.TEXT_EMBEDDING,
  631. )
  632. documents = index_processor.transform(
  633. text_docs,
  634. embedding_model_instance=embedding_model_instance,
  635. process_rule=process_rule,
  636. tenant_id=dataset.tenant_id,
  637. doc_language=doc_language,
  638. )
  639. return documents
  640. def _load_segments(self, dataset, dataset_document, documents):
  641. # save node to document segment
  642. doc_store = DatasetDocumentStore(
  643. dataset=dataset, user_id=dataset_document.created_by, document_id=dataset_document.id
  644. )
  645. # add document segments
  646. doc_store.add_documents(docs=documents, save_child=dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX)
  647. # update document status to indexing
  648. cur_time = naive_utc_now()
  649. self._update_document_index_status(
  650. document_id=dataset_document.id,
  651. after_indexing_status="indexing",
  652. extra_update_params={
  653. DatasetDocument.cleaning_completed_at: cur_time,
  654. DatasetDocument.splitting_completed_at: cur_time,
  655. },
  656. )
  657. # update segment status to indexing
  658. self._update_segments_by_document(
  659. dataset_document_id=dataset_document.id,
  660. update_params={
  661. DocumentSegment.status: "indexing",
  662. DocumentSegment.indexing_at: naive_utc_now(),
  663. },
  664. )
  665. pass
  666. class DocumentIsPausedError(Exception):
  667. pass
  668. class DocumentIsDeletedPausedError(Exception):
  669. pass