Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

document_app.py 18KB


  1. #
  2. # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License
  15. #
  16. import datetime
  17. import hashlib
  18. import json
  19. import os
  20. import pathlib
  21. import re
  22. import traceback
  23. from concurrent.futures import ThreadPoolExecutor
  24. from copy import deepcopy
  25. from io import BytesIO
  26. import flask
  27. from elasticsearch_dsl import Q
  28. from flask import request
  29. from flask_login import login_required, current_user
  30. from api.db.db_models import Task, File
  31. from api.db.services.dialog_service import DialogService, ConversationService
  32. from api.db.services.file2document_service import File2DocumentService
  33. from api.db.services.file_service import FileService
  34. from api.db.services.llm_service import LLMBundle
  35. from api.db.services.task_service import TaskService, queue_tasks
  36. from api.db.services.user_service import TenantService, UserTenantService
  37. from graphrag.mind_map_extractor import MindMapExtractor
  38. from rag.app import naive
  39. from rag.nlp import search
  40. from rag.utils.es_conn import ELASTICSEARCH
  41. from api.db.services import duplicate_name
  42. from api.db.services.knowledgebase_service import KnowledgebaseService
  43. from api.utils.api_utils import server_error_response, get_data_error_result, validate_request
  44. from api.utils import get_uuid
  45. from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
  46. from api.db.services.document_service import DocumentService, doc_upload_and_parse
  47. from api.settings import RetCode, stat_logger
  48. from api.utils.api_utils import get_json_result
  49. from rag.utils.minio_conn import MINIO
  50. from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
  51. from api.utils.web_utils import html2pdf, is_valid_url
  52. @manager.route('/upload', methods=['POST'])
  53. @login_required
  54. @validate_request("kb_id")
  55. def upload():
  56. kb_id = request.form.get("kb_id")
  57. if not kb_id:
  58. return get_json_result(
  59. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  60. if 'file' not in request.files:
  61. return get_json_result(
  62. data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
  63. file_objs = request.files.getlist('file')
  64. for file_obj in file_objs:
  65. if file_obj.filename == '':
  66. return get_json_result(
  67. data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
  68. e, kb = KnowledgebaseService.get_by_id(kb_id)
  69. if not e:
  70. raise LookupError("Can't find this knowledgebase!")
  71. err, _ = FileService.upload_document(kb, file_objs, current_user.id)
  72. if err:
  73. return get_json_result(
  74. data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR)
  75. return get_json_result(data=True)
  76. @manager.route('/web_crawl', methods=['POST'])
  77. @login_required
  78. @validate_request("kb_id", "name", "url")
  79. def web_crawl():
  80. kb_id = request.form.get("kb_id")
  81. if not kb_id:
  82. return get_json_result(
  83. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  84. name = request.form.get("name")
  85. url = request.form.get("url")
  86. if not is_valid_url(url):
  87. return get_json_result(
  88. data=False, retmsg='The URL format is invalid', retcode=RetCode.ARGUMENT_ERROR)
  89. e, kb = KnowledgebaseService.get_by_id(kb_id)
  90. if not e:
  91. raise LookupError("Can't find this knowledgebase!")
  92. blob = html2pdf(url)
  93. if not blob: return server_error_response(ValueError("Download failure."))
  94. root_folder = FileService.get_root_folder(current_user.id)
  95. pf_id = root_folder["id"]
  96. FileService.init_knowledgebase_docs(pf_id, current_user.id)
  97. kb_root_folder = FileService.get_kb_folder(current_user.id)
  98. kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])
  99. try:
  100. filename = duplicate_name(
  101. DocumentService.query,
  102. name=name + ".pdf",
  103. kb_id=kb.id)
  104. filetype = filename_type(filename)
  105. if filetype == FileType.OTHER.value:
  106. raise RuntimeError("This type of file has not been supported yet!")
  107. location = filename
  108. while MINIO.obj_exist(kb_id, location):
  109. location += "_"
  110. MINIO.put(kb_id, location, blob)
  111. doc = {
  112. "id": get_uuid(),
  113. "kb_id": kb.id,
  114. "parser_id": kb.parser_id,
  115. "parser_config": kb.parser_config,
  116. "created_by": current_user.id,
  117. "type": filetype,
  118. "name": filename,
  119. "location": location,
  120. "size": len(blob),
  121. "thumbnail": thumbnail(filename, blob)
  122. }
  123. if doc["type"] == FileType.VISUAL:
  124. doc["parser_id"] = ParserType.PICTURE.value
  125. if doc["type"] == FileType.AURAL:
  126. doc["parser_id"] = ParserType.AUDIO.value
  127. if re.search(r"\.(ppt|pptx|pages)$", filename):
  128. doc["parser_id"] = ParserType.PRESENTATION.value
  129. DocumentService.insert(doc)
  130. FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
  131. except Exception as e:
  132. return server_error_response(e)
  133. return get_json_result(data=True)
  134. @manager.route('/create', methods=['POST'])
  135. @login_required
  136. @validate_request("name", "kb_id")
  137. def create():
  138. req = request.json
  139. kb_id = req["kb_id"]
  140. if not kb_id:
  141. return get_json_result(
  142. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  143. try:
  144. e, kb = KnowledgebaseService.get_by_id(kb_id)
  145. if not e:
  146. return get_data_error_result(
  147. retmsg="Can't find this knowledgebase!")
  148. if DocumentService.query(name=req["name"], kb_id=kb_id):
  149. return get_data_error_result(
  150. retmsg="Duplicated document name in the same knowledgebase.")
  151. doc = DocumentService.insert({
  152. "id": get_uuid(),
  153. "kb_id": kb.id,
  154. "parser_id": kb.parser_id,
  155. "parser_config": kb.parser_config,
  156. "created_by": current_user.id,
  157. "type": FileType.VIRTUAL,
  158. "name": req["name"],
  159. "location": "",
  160. "size": 0
  161. })
  162. return get_json_result(data=doc.to_json())
  163. except Exception as e:
  164. return server_error_response(e)
  165. @manager.route('/list', methods=['GET'])
  166. @login_required
  167. def list_docs():
  168. kb_id = request.args.get("kb_id")
  169. if not kb_id:
  170. return get_json_result(
  171. data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR)
  172. tenants = UserTenantService.query(user_id=current_user.id)
  173. for tenant in tenants:
  174. if KnowledgebaseService.query(
  175. tenant_id=tenant.tenant_id, id=kb_id):
  176. break
  177. else:
  178. return get_json_result(
  179. data=False, retmsg=f'Only owner of knowledgebase authorized for this operation.',
  180. retcode=RetCode.OPERATING_ERROR)
  181. keywords = request.args.get("keywords", "")
  182. page_number = int(request.args.get("page", 1))
  183. items_per_page = int(request.args.get("page_size", 15))
  184. orderby = request.args.get("orderby", "create_time")
  185. desc = request.args.get("desc", True)
  186. try:
  187. docs, tol = DocumentService.get_by_kb_id(
  188. kb_id, page_number, items_per_page, orderby, desc, keywords)
  189. return get_json_result(data={"total": tol, "docs": docs})
  190. except Exception as e:
  191. return server_error_response(e)
  192. @manager.route('/infos', methods=['POST'])
  193. def docinfos():
  194. req = request.json
  195. doc_ids = req["doc_ids"]
  196. docs = DocumentService.get_by_ids(doc_ids)
  197. return get_json_result(data=list(docs.dicts()))
  198. @manager.route('/thumbnails', methods=['GET'])
  199. #@login_required
  200. def thumbnails():
  201. doc_ids = request.args.get("doc_ids").split(",")
  202. if not doc_ids:
  203. return get_json_result(
  204. data=False, retmsg='Lack of "Document ID"', retcode=RetCode.ARGUMENT_ERROR)
  205. try:
  206. docs = DocumentService.get_thumbnails(doc_ids)
  207. return get_json_result(data={d["id"]: d["thumbnail"] for d in docs})
  208. except Exception as e:
  209. return server_error_response(e)
  210. @manager.route('/change_status', methods=['POST'])
  211. @login_required
  212. @validate_request("doc_id", "status")
  213. def change_status():
  214. req = request.json
  215. if str(req["status"]) not in ["0", "1"]:
  216. get_json_result(
  217. data=False,
  218. retmsg='"Status" must be either 0 or 1!',
  219. retcode=RetCode.ARGUMENT_ERROR)
  220. try:
  221. e, doc = DocumentService.get_by_id(req["doc_id"])
  222. if not e:
  223. return get_data_error_result(retmsg="Document not found!")
  224. e, kb = KnowledgebaseService.get_by_id(doc.kb_id)
  225. if not e:
  226. return get_data_error_result(
  227. retmsg="Can't find this knowledgebase!")
  228. if not DocumentService.update_by_id(
  229. req["doc_id"], {"status": str(req["status"])}):
  230. return get_data_error_result(
  231. retmsg="Database error (Document update)!")
  232. if str(req["status"]) == "0":
  233. ELASTICSEARCH.updateScriptByQuery(Q("term", doc_id=req["doc_id"]),
  234. scripts="ctx._source.available_int=0;",
  235. idxnm=search.index_name(
  236. kb.tenant_id)
  237. )
  238. else:
  239. ELASTICSEARCH.updateScriptByQuery(Q("term", doc_id=req["doc_id"]),
  240. scripts="ctx._source.available_int=1;",
  241. idxnm=search.index_name(
  242. kb.tenant_id)
  243. )
  244. return get_json_result(data=True)
  245. except Exception as e:
  246. return server_error_response(e)
  247. @manager.route('/rm', methods=['POST'])
  248. @login_required
  249. @validate_request("doc_id")
  250. def rm():
  251. req = request.json
  252. doc_ids = req["doc_id"]
  253. if isinstance(doc_ids, str): doc_ids = [doc_ids]
  254. root_folder = FileService.get_root_folder(current_user.id)
  255. pf_id = root_folder["id"]
  256. FileService.init_knowledgebase_docs(pf_id, current_user.id)
  257. errors = ""
  258. for doc_id in doc_ids:
  259. try:
  260. e, doc = DocumentService.get_by_id(doc_id)
  261. if not e:
  262. return get_data_error_result(retmsg="Document not found!")
  263. tenant_id = DocumentService.get_tenant_id(doc_id)
  264. if not tenant_id:
  265. return get_data_error_result(retmsg="Tenant not found!")
  266. b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
  267. if not DocumentService.remove_document(doc, tenant_id):
  268. return get_data_error_result(
  269. retmsg="Database error (Document removal)!")
  270. f2d = File2DocumentService.get_by_document_id(doc_id)
  271. FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
  272. File2DocumentService.delete_by_document_id(doc_id)
  273. MINIO.rm(b, n)
  274. except Exception as e:
  275. errors += str(e)
  276. if errors:
  277. return get_json_result(data=False, retmsg=errors, retcode=RetCode.SERVER_ERROR)
  278. return get_json_result(data=True)
  279. @manager.route('/run', methods=['POST'])
  280. @login_required
  281. @validate_request("doc_ids", "run")
  282. def run():
  283. req = request.json
  284. try:
  285. for id in req["doc_ids"]:
  286. info = {"run": str(req["run"]), "progress": 0}
  287. if str(req["run"]) == TaskStatus.RUNNING.value:
  288. info["progress_msg"] = ""
  289. info["chunk_num"] = 0
  290. info["token_num"] = 0
  291. DocumentService.update_by_id(id, info)
  292. # if str(req["run"]) == TaskStatus.CANCEL.value:
  293. tenant_id = DocumentService.get_tenant_id(id)
  294. if not tenant_id:
  295. return get_data_error_result(retmsg="Tenant not found!")
  296. ELASTICSEARCH.deleteByQuery(
  297. Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
  298. if str(req["run"]) == TaskStatus.RUNNING.value:
  299. TaskService.filter_delete([Task.doc_id == id])
  300. e, doc = DocumentService.get_by_id(id)
  301. doc = doc.to_dict()
  302. doc["tenant_id"] = tenant_id
  303. bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"])
  304. queue_tasks(doc, bucket, name)
  305. return get_json_result(data=True)
  306. except Exception as e:
  307. return server_error_response(e)
  308. @manager.route('/rename', methods=['POST'])
  309. @login_required
  310. @validate_request("doc_id", "name")
  311. def rename():
  312. req = request.json
  313. try:
  314. e, doc = DocumentService.get_by_id(req["doc_id"])
  315. if not e:
  316. return get_data_error_result(retmsg="Document not found!")
  317. if pathlib.Path(req["name"].lower()).suffix != pathlib.Path(
  318. doc.name.lower()).suffix:
  319. return get_json_result(
  320. data=False,
  321. retmsg="The extension of file can't be changed",
  322. retcode=RetCode.ARGUMENT_ERROR)
  323. for d in DocumentService.query(name=req["name"], kb_id=doc.kb_id):
  324. if d.name == req["name"]:
  325. return get_data_error_result(
  326. retmsg="Duplicated document name in the same knowledgebase.")
  327. if not DocumentService.update_by_id(
  328. req["doc_id"], {"name": req["name"]}):
  329. return get_data_error_result(
  330. retmsg="Database error (Document rename)!")
  331. informs = File2DocumentService.get_by_document_id(req["doc_id"])
  332. if informs:
  333. e, file = FileService.get_by_id(informs[0].file_id)
  334. FileService.update_by_id(file.id, {"name": req["name"]})
  335. return get_json_result(data=True)
  336. except Exception as e:
  337. return server_error_response(e)
  338. @manager.route('/get/<doc_id>', methods=['GET'])
  339. # @login_required
  340. def get(doc_id):
  341. try:
  342. e, doc = DocumentService.get_by_id(doc_id)
  343. if not e:
  344. return get_data_error_result(retmsg="Document not found!")
  345. b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
  346. response = flask.make_response(MINIO.get(b, n))
  347. ext = re.search(r"\.([^.]+)$", doc.name)
  348. if ext:
  349. if doc.type == FileType.VISUAL.value:
  350. response.headers.set('Content-Type', 'image/%s' % ext.group(1))
  351. else:
  352. response.headers.set(
  353. 'Content-Type',
  354. 'application/%s' %
  355. ext.group(1))
  356. return response
  357. except Exception as e:
  358. return server_error_response(e)
  359. @manager.route('/change_parser', methods=['POST'])
  360. @login_required
  361. @validate_request("doc_id", "parser_id")
  362. def change_parser():
  363. req = request.json
  364. try:
  365. e, doc = DocumentService.get_by_id(req["doc_id"])
  366. if not e:
  367. return get_data_error_result(retmsg="Document not found!")
  368. if doc.parser_id.lower() == req["parser_id"].lower():
  369. if "parser_config" in req:
  370. if req["parser_config"] == doc.parser_config:
  371. return get_json_result(data=True)
  372. else:
  373. return get_json_result(data=True)
  374. if doc.type == FileType.VISUAL or re.search(
  375. r"\.(ppt|pptx|pages)$", doc.name):
  376. return get_data_error_result(retmsg="Not supported yet!")
  377. e = DocumentService.update_by_id(doc.id,
  378. {"parser_id": req["parser_id"], "progress": 0, "progress_msg": "",
  379. "run": TaskStatus.UNSTART.value})
  380. if not e:
  381. return get_data_error_result(retmsg="Document not found!")
  382. if "parser_config" in req:
  383. DocumentService.update_parser_config(doc.id, req["parser_config"])
  384. if doc.token_num > 0:
  385. e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num * -1, doc.chunk_num * -1,
  386. doc.process_duation * -1)
  387. if not e:
  388. return get_data_error_result(retmsg="Document not found!")
  389. tenant_id = DocumentService.get_tenant_id(req["doc_id"])
  390. if not tenant_id:
  391. return get_data_error_result(retmsg="Tenant not found!")
  392. ELASTICSEARCH.deleteByQuery(
  393. Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id))
  394. return get_json_result(data=True)
  395. except Exception as e:
  396. return server_error_response(e)
  397. @manager.route('/image/<image_id>', methods=['GET'])
  398. # @login_required
  399. def get_image(image_id):
  400. try:
  401. bkt, nm = image_id.split("-")
  402. response = flask.make_response(MINIO.get(bkt, nm))
  403. response.headers.set('Content-Type', 'image/JPEG')
  404. return response
  405. except Exception as e:
  406. return server_error_response(e)
  407. @manager.route('/upload_and_parse', methods=['POST'])
  408. @login_required
  409. @validate_request("conversation_id")
  410. def upload_and_parse():
  411. if 'file' not in request.files:
  412. return get_json_result(
  413. data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR)
  414. file_objs = request.files.getlist('file')
  415. for file_obj in file_objs:
  416. if file_obj.filename == '':
  417. return get_json_result(
  418. data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR)
  419. doc_ids = doc_upload_and_parse(request.form.get("conversation_id"), file_objs, current_user.id)
  420. return get_json_result(data=doc_ids)