Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.


  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License
  15. #
  16. import datetime
  17. import hashlib
  18. import json
  19. import os
  20. import pathlib
  21. import re
  22. import traceback
  23. from concurrent.futures import ThreadPoolExecutor
  24. from copy import deepcopy
  25. from io import BytesIO
  26. import flask
  27. from elasticsearch_dsl import Q
  28. from flask import request
  29. from flask_login import login_required, current_user
  30. from api.db.db_models import Task, File
  31. from api.db.services.dialog_service import DialogService, ConversationService
  32. from api.db.services.file2document_service import File2DocumentService
  33. from api.db.services.file_service import FileService
  34. from api.db.services.llm_service import LLMBundle
  35. from api.db.services.task_service import TaskService, queue_tasks
  36. from api.db.services.user_service import TenantService
  37. from graphrag.mind_map_extractor import MindMapExtractor
  38. from rag.app import naive
  39. from rag.nlp import search
  40. from rag.utils.es_conn import ELASTICSEARCH
  41. from api.db.services import duplicate_name
  42. from api.db.services.knowledgebase_service import KnowledgebaseService
  43. from api.utils.api_utils import server_error_response, get_data_error_result, validate_request
  44. from api.utils import get_uuid
  45. from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
  46. from api.db.services.document_service import DocumentService
  47. from api.settings import RetCode, stat_logger
  48. from api.utils.api_utils import get_json_result
  49. from rag.utils.minio_conn import MINIO
  50. from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
  51. from api.utils.web_utils import html2pdf, is_valid_url
  52. @manager.route('/upload', methods=['POST'])
  53. @login_required
  54. @validate_request("kb_id")
  55. def upload():
  56. kb_id = request.form.get("kb_id")
  57. if not kb_id:
  58. return get_json_result(
  59. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  60. if 'file' not in request.files:
  61. return get_json_result(
  62. data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
  63. file_objs = request.files.getlist('file')
  64. for file_obj in file_objs:
  65. if file_obj.filename == '':
  66. return get_json_result(
  67. data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
  68. e, kb = KnowledgebaseService.get_by_id(kb_id)
  69. if not e:
  70. raise LookupError("Can't find this knowledgebase!")
  71. err, _ = FileService.upload_document(kb, file_objs)
  72. if err:
  73. return get_json_result(
  74. data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
  75. return get_json_result(data=True)
  76. @manager.route('/web_crawl', methods=['POST'])
  77. @login_required
  78. @validate_request("kb_id", "name", "url")
  79. def web_crawl():
  80. kb_id = request.form.get("kb_id")
  81. if not kb_id:
  82. return get_json_result(
  83. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  84. name = request.form.get("name")
  85. url = request.form.get("url")
  86. if not is_valid_url(url):
  87. return get_json_result(
  88. data=False, retmsg='The URL format is invalid', retcode=RetCode.ARGUMENT_ERROR)
  89. e, kb = KnowledgebaseService.get_by_id(kb_id)
  90. if not e:
  91. raise LookupError("Can't find this knowledgebase!")
  92. blob = html2pdf(url)
  93. if not blob: return server_error_response(ValueError("Download failure."))
  94. root_folder = FileService.get_root_folder(current_user.id)
  95. pf_id = root_folder["id"]
  96. FileService.init_knowledgebase_docs(pf_id, current_user.id)
  97. kb_root_folder = FileService.get_kb_folder(current_user.id)
  98. kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
  99. try:
  100. filename = duplicate_name(
  101. DocumentService.query,
  102. name=name + ".pdf",
  103. kb_id=kb.id)
  104. filetype = filename_type(filename)
  105. if filetype == FileType.OTHER.value:
  106. raise RuntimeError("This type of file has not been supported yet!")
  107. location = filename
  108. while MINIO.obj_exist(kb_id, location):
  109. location += "_"
  110. MINIO.put(kb_id, location, blob)
  111. doc = {
  112. "id": get_uuid(),
  113. "kb_id": kb.id,
  114. "parser_id": kb.parser_id,
  115. "parser_config": kb.parser_config,
  116. "created_by": current_user.id,
  117. "type": filetype,
  118. "name": filename,
  119. "location": location,
  120. "size": len(blob),
  121. "thumbnail": thumbnail(filename, blob)
  122. }
  123. if doc["type"] == FileType.VISUAL:
  124. doc["parser_id"] = ParserType.PICTURE.value
  125. if doc["type"] == FileType.AURAL:
  126. doc["parser_id"] = ParserType.AUDIO.value
  127. if re.search(r"\.(ppt|pptx|pages)$", filename):
  128. doc["parser_id"] = ParserType.PRESENTATION.value
  129. DocumentService.insert(doc)
  130. FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
  131. except Exception as e:
  132. return server_error_response(e)
  133. return get_json_result(data=True)
  134. @manager.route('/create', methods=['POST'])
  135. @login_required
  136. @validate_request("name", "kb_id")
  137. def create():
  138. req = request.json
  139. kb_id = req["kb_id"]
  140. if not kb_id:
  141. return get_json_result(
  142. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  143. try:
  144. e, kb = KnowledgebaseService.get_by_id(kb_id)
  145. if not e:
  146. return get_data_error_result(
  147. retmsg="Can't find this knowledgebase!")
  148. if DocumentService.query(name=req["name"], kb_id=kb_id):
  149. return get_data_error_result(
  150. retmsg="Duplicated document name in the same knowledgebase.")
  151. doc = DocumentService.insert({
  152. "id": get_uuid(),
  153. "kb_id": kb.id,
  154. "parser_id": kb.parser_id,
  155. "parser_config": kb.parser_config,
  156. "created_by": current_user.id,
  157. "type": FileType.VIRTUAL,
  158. "name": req["name"],
  159. "location": "",
  160. "size": 0
  161. })
  162. return get_json_result(data=doc.to_json())
  163. except Exception as e:
  164. return server_error_response(e)
  165. @manager.route('/list', methods=['GET'])
  166. @login_required
  167. def list_docs():
  168. kb_id = request.args.get("kb_id")
  169. if not kb_id:
  170. return get_json_result(
  171. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  172. keywords = request.args.get("keywords", "")
  173. page_number = int(request.args.get("page", 1))
  174. items_per_page = int(request.args.get("page_size", 15))
  175. orderby = request.args.get("orderby", "create_time")
  176. desc = request.args.get("desc", True)
  177. try:
  178. docs, tol = DocumentService.get_by_kb_id(
  179. kb_id, page_number, items_per_page, orderby, desc, keywords)
  180. return get_json_result(data={"total": tol, "docs": docs})
  181. except Exception as e:
  182. return server_error_response(e)
  183. @manager.route('/infos', methods=['POST'])
  184. def docinfos():
  185. req = request.json
  186. doc_ids = req["doc_ids"]
  187. docs = DocumentService.get_by_ids(doc_ids)
  188. return get_json_result(data=list(docs.dicts()))
  189. @manager.route('/thumbnails', methods=['GET'])
  190. @login_required
  191. def thumbnails():
  192. doc_ids = request.args.get("doc_ids").split(",")
  193. if not doc_ids:
  194. return get_json_result(
  195. data=False, retmsg='Lack of "Document ID"', retcode=RetCode.ARGUMENT_ERROR)
  196. try:
  197. docs = DocumentService.get_thumbnails(doc_ids)
  198. return get_json_result(data={d["id"]: d["thumbnail"] for d in docs})
  199. except Exception as e:
  200. return server_error_response(e)
  201. @manager.route('/change_status', methods=['POST'])
  202. @login_required
  203. @validate_request("doc_id", "status")
  204. def change_status():
  205. req = request.json
  206. if str(req["status"]) not in ["0", "1"]:
  207. get_json_result(
  208. data=False,
  209. retmsg='"Status" must be either 0 or 1!',
  210. retcode=RetCode.ARGUMENT_ERROR)
  211. try:
  212. e, doc = DocumentService.get_by_id(req["doc_id"])
  213. if not e:
  214. return get_data_error_result(retmsg="Document not found!")
  215. e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
  216. if not e:
  217. return get_data_error_result(
  218. retmsg="Can't find this knowledgebase!")
  219. if not DocumentService.update_by_id(
  220. req["doc_id"], {"status": str(req["status"])}):
  221. return get_data_error_result(
  222. retmsg="Database error (Document update)!")
  223. if str(req["status"]) == "0":
  224. ELASTICSEARCH.updateScriptByQuery(Q("term", doc_id=req["doc_id"]),
  225. scripts="ctx._source.available_int=0;",
  226. idxnm=search.index_name(
  227. kb.tenant_id)
  228. )
  229. else:
  230. ELASTICSEARCH.updateScriptByQuery(Q("term", doc_id=req["doc_id"]),
  231. scripts="ctx._source.available_int=1;",
  232. idxnm=search.index_name(
  233. kb.tenant_id)
  234. )
  235. return get_json_result(data=True)
  236. except Exception as e:
  237. return server_error_response(e)
  238. @manager.route('/rm', methods=['POST'])
  239. @login_required
  240. @validate_request("doc_id")
  241. def rm():
  242. req = request.json
  243. doc_ids = req["doc_id"]
  244. if isinstance(doc_ids, str): doc_ids = [doc_ids]
  245. root_folder = FileService.get_root_folder(current_user.id)
  246. pf_id = root_folder["id"]
  247. FileService.init_knowledgebase_docs(pf_id, current_user.id)
  248. errors = ""
  249. for doc_id in doc_ids:
  250. try:
  251. e, doc = DocumentService.get_by_id(doc_id)
  252. if not e:
  253. return get_data_error_result(retmsg="Document not found!")
  254. tenant_id = DocumentService.get_tenant_id(doc_id)
  255. if not tenant_id:
  256. return get_data_error_result(retmsg="Tenant not found!")
  257. b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
  258. if not DocumentService.remove_document(doc, tenant_id):
  259. return get_data_error_result(
  260. retmsg="Database error (Document removal)!")
  261. f2d = File2DocumentService.get_by_document_id(doc_id)
  262. FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
  263. File2DocumentService.delete_by_document_id(doc_id)
  264. MINIO.rm(b, n)
  265. except Exception as e:
  266. errors += str(e)
  267. if errors:
  268. return get_json_result(data=False, retmsg=errors, retcode=RetCode.SERVER_ERROR)
  269. return get_json_result(data=True)
  270. @manager.route('/run', methods=['POST'])
  271. @login_required
  272. @validate_request("doc_ids", "run")
  273. def run():
  274. req = request.json
  275. try:
  276. for id in req["doc_ids"]:
  277. info = {"run": str(req["run"]), "progress": 0}
  278. if str(req["run"]) == TaskStatus.RUNNING.value:
  279. info["progress_msg"] = ""
  280. info["chunk_num"] = 0
  281. info["token_num"] = 0
  282. DocumentService.update_by_id(id, info)
  283. # if str(req["run"]) == TaskStatus.CANCEL.value:
  284. tenant_id = DocumentService.get_tenant_id(id)
  285. if not tenant_id:
  286. return get_data_error_result(retmsg="Tenant not found!")
  287. ELASTICSEARCH.deleteByQuery(
  288. Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
  289. if str(req["run"]) == TaskStatus.RUNNING.value:
  290. TaskService.filter_delete([Task.doc_id == id])
  291. e, doc = DocumentService.get_by_id(id)
  292. doc = doc.to_dict()
  293. doc["tenant_id"] = tenant_id
  294. bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"])
  295. queue_tasks(doc, bucket, name)
  296. return get_json_result(data=True)
  297. except Exception as e:
  298. return server_error_response(e)
  299. @manager.route('/rename', methods=['POST'])
  300. @login_required
  301. @validate_request("doc_id", "name")
  302. def rename():
  303. req = request.json
  304. try:
  305. e, doc = DocumentService.get_by_id(req["doc_id"])
  306. if not e:
  307. return get_data_error_result(retmsg="Document not found!")
  308. if pathlib.Path(req["name"].lower()).suffix != pathlib.Path(
  309. doc.name.lower()).suffix:
  310. return get_json_result(
  311. data=False,
  312. retmsg="The extension of file can't be changed",
  313. retcode=RetCode.ARGUMENT_ERROR)
  314. for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id):
  315. if d.name == req["name"]:
  316. return get_data_error_result(
  317. retmsg="Duplicated document name in the same knowledgebase.")
  318. if not DocumentService.update_by_id(
  319. req["doc_id"], {"name": req["name"]}):
  320. return get_data_error_result(
  321. retmsg="Database error (Document rename)!")
  322. informs = File2DocumentService.get_by_document_id(req["doc_id"])
  323. if informs:
  324. e, file = FileService.get_by_id(informs[0].file_id)
  325. FileService.update_by_id(file.id, {"name": req["name"]})
  326. return get_json_result(data=True)
  327. except Exception as e:
  328. return server_error_response(e)
  329. @manager.route('/get/<doc_id>', methods=['GET'])
  330. # @login_required
  331. def get(doc_id):
  332. try:
  333. e, doc = DocumentService.get_by_id(doc_id)
  334. if not e:
  335. return get_data_error_result(retmsg="Document not found!")
  336. b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
  337. response = flask.make_response(MINIO.get(b, n))
  338. ext = re.search(r"\.([^.]+)$", doc.name)
  339. if ext:
  340. if doc.type == FileType.VISUAL.value:
  341. response.headers.set('Content-Type', 'image/%s' % ext.group(1))
  342. else:
  343. response.headers.set(
  344. 'Content-Type',
  345. 'application/%s' %
  346. ext.group(1))
  347. return response
  348. except Exception as e:
  349. return server_error_response(e)
  350. @manager.route('/change_parser', methods=['POST'])
  351. @login_required
  352. @validate_request("doc_id", "parser_id")
  353. def change_parser():
  354. req = request.json
  355. try:
  356. e, doc = DocumentService.get_by_id(req["doc_id"])
  357. if not e:
  358. return get_data_error_result(retmsg="Document not found!")
  359. if doc.parser_id.lower() == req["parser_id"].lower():
  360. if "parser_config" in req:
  361. if req["parser_config"] == doc.parser_config:
  362. return get_json_result(data=True)
  363. else:
  364. return get_json_result(data=True)
  365. if doc.type == FileType.VISUAL or re.search(
  366. r"\.(ppt|pptx|pages)$", doc.name):
  367. return get_data_error_result(retmsg="Not supported yet!")
  368. e = DocumentService.update_by_id(doc.id,
  369. {"parser_id": req["parser_id"], "progress": 0, "progress_msg": "",
  370. "run": TaskStatus.UNSTART.value})
  371. if not e:
  372. return get_data_error_result(retmsg="Document not found!")
  373. if "parser_config" in req:
  374. DocumentService.update_parser_config(doc.id, req["parser_config"])
  375. if doc.token_num > 0:
  376. e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num * -1, doc.chunk_num * -1,
  377. doc.process_duation * -1)
  378. if not e:
  379. return get_data_error_result(retmsg="Document not found!")
  380. tenant_id = DocumentService.get_tenant_id(req["doc_id"])
  381. if not tenant_id:
  382. return get_data_error_result(retmsg="Tenant not found!")
  383. ELASTICSEARCH.deleteByQuery(
  384. Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id))
  385. return get_json_result(data=True)
  386. except Exception as e:
  387. return server_error_response(e)
  388. @manager.route('/image/<image_id>', methods=['GET'])
  389. # @login_required
  390. def get_image(image_id):
  391. try:
  392. bkt, nm = image_id.split("-")
  393. response = flask.make_response(MINIO.get(bkt, nm))
  394. response.headers.set('Content-Type', 'image/JPEG')
  395. return response
  396. except Exception as e:
  397. return server_error_response(e)
  398. @manager.route('/upload_and_parse', methods=['POST'])
  399. @login_required
  400. @validate_request("conversation_id")
  401. def upload_and_parse():
  402. from rag.app import presentation, picture, naive, audio, email
  403. if 'file' not in request.files:
  404. return get_json_result(
  405. data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
  406. file_objs = request.files.getlist('file')
  407. for file_obj in file_objs:
  408. if file_obj.filename == '':
  409. return get_json_result(
  410. data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
  411. e, conv = ConversationService.get_by_id(request.form.get("conversation_id"))
  412. if not e:
  413. return get_data_error_result(retmsg="Conversation not found!")
  414. e, dia = DialogService.get_by_id(conv.dialog_id)
  415. kb_id = dia.kb_ids[0]
  416. e, kb = KnowledgebaseService.get_by_id(kb_id)
  417. if not e:
  418. raise LookupError("Can't find this knowledgebase!")
  419. idxnm = search.index_name(kb.tenant_id)
  420. if not ELASTICSEARCH.indexExist(idxnm):
  421. ELASTICSEARCH.createIdx(idxnm, json.load(
  422. open(os.path.join(get_project_base_directory(), "conf", "mapping.json"), "r")))
  423. embd_mdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language)
  424. err, files = FileService.upload_document(kb, file_objs)
  425. if err:
  426. return get_json_result(
  427. data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
  428. def dummy(prog=None, msg=""):
  429. pass
  430. FACTORY = {
  431. ParserType.PRESENTATION.value: presentation,
  432. ParserType.PICTURE.value: picture,
  433. ParserType.AUDIO.value: audio,
  434. ParserType.EMAIL.value: email
  435. }
  436. parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?;。;!?", "layout_recognize": False}
  437. exe = ThreadPoolExecutor(max_workers=12)
  438. threads = []
  439. for d, blob in files:
  440. kwargs = {
  441. "callback": dummy,
  442. "parser_config": parser_config,
  443. "from_page": 0,
  444. "to_page": 100000,
  445. "tenant_id": kb.tenant_id,
  446. "lang": kb.language
  447. }
  448. threads.append(exe.submit(FACTORY.get(d["parser_id"], naive).chunk, d["name"], blob, **kwargs))
  449. for (docinfo,_), th in zip(files, threads):
  450. docs = []
  451. doc = {
  452. "doc_id": docinfo["id"],
  453. "kb_id": [kb.id]
  454. }
  455. for ck in th.result():
  456. d = deepcopy(doc)
  457. d.update(ck)
  458. md5 = hashlib.md5()
  459. md5.update((ck["content_with_weight"] +
  460. str(d["doc_id"])).encode("utf-8"))
  461. d["_id"] = md5.hexdigest()
  462. d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
  463. d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
  464. if not d.get("image"):
  465. docs.append(d)
  466. continue
  467. output_buffer = BytesIO()
  468. if isinstance(d["image"], bytes):
  469. output_buffer = BytesIO(d["image"])
  470. else:
  471. d["image"].save(output_buffer, format='JPEG')
  472. MINIO.put(kb.id, d["_id"], output_buffer.getvalue())
  473. d["img_id"] = "{}-{}".format(kb.id, d["_id"])
  474. del d["image"]
  475. docs.append(d)
  476. parser_ids = {d["id"]: d["parser_id"] for d, _ in files}
  477. docids = [d["id"] for d, _ in files]
  478. chunk_counts = {id: 0 for id in docids}
  479. token_counts = {id: 0 for id in docids}
  480. es_bulk_size = 64
  481. def embedding(doc_id, cnts, batch_size=16):
  482. nonlocal embd_mdl, chunk_counts, token_counts
  483. vects = []
  484. for i in range(0, len(cnts), batch_size):
  485. vts, c = embd_mdl.encode(cnts[i: i + batch_size])
  486. vects.extend(vts.tolist())
  487. chunk_counts[doc_id] += len(cnts[i:i + batch_size])
  488. token_counts[doc_id] += c
  489. return vects
  490. _, tenant = TenantService.get_by_id(kb.tenant_id)
  491. llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, tenant.llm_id)
  492. for doc_id in docids:
  493. cks = [c for c in docs if c["doc_id"] == doc_id]
  494. if False and parser_ids[doc_id] != ParserType.PICTURE.value:
  495. mindmap = MindMapExtractor(llm_bdl)
  496. try:
  497. mind_map = json.dumps(mindmap([c["content_with_weight"] for c in docs if c["doc_id"] == doc_id]).output, ensure_ascii=False, indent=2)
  498. if len(mind_map) < 32: raise Exception("Few content: "+mind_map)
  499. cks.append({
  500. "doc_id": doc_id,
  501. "kb_id": [kb.id],
  502. "content_with_weight": mind_map,
  503. "knowledge_graph_kwd": "mind_map"
  504. })
  505. except Exception as e:
  506. stat_logger.error("Mind map generation error:", traceback.format_exc())
  507. vects = embedding(doc_id, [c["content_with_weight"] for c in cks])
  508. assert len(cks) == len(vects)
  509. for i, d in enumerate(cks):
  510. v = vects[i]
  511. d["q_%d_vec" % len(v)] = v
  512. for b in range(0, len(cks), es_bulk_size):
  513. ELASTICSEARCH.bulk(cks[b:b + es_bulk_size], idxnm)
  514. DocumentService.increment_chunk_num(
  515. doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0)
  516. return get_json_result(data=[d["id"] for d,_ in files])