You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

metadata_service.py 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import copy
  2. import datetime
  3. import logging
  4. from typing import Optional
  5. from flask_login import current_user
  6. from core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSource
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding
  10. from services.dataset_service import DocumentService
  11. from services.entities.knowledge_entities.knowledge_entities import (
  12. MetadataArgs,
  13. MetadataOperationData,
  14. )
  15. class MetadataService:
  16. @staticmethod
  17. def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata:
  18. # check if metadata name is too long
  19. if len(metadata_args.name) > 255:
  20. raise ValueError("Metadata name cannot exceed 255 characters.")
  21. # check if metadata name already exists
  22. if (
  23. db.session.query(DatasetMetadata)
  24. .filter_by(tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=metadata_args.name)
  25. .first()
  26. ):
  27. raise ValueError("Metadata name already exists.")
  28. for field in BuiltInField:
  29. if field.value == metadata_args.name:
  30. raise ValueError("Metadata name already exists in Built-in fields.")
  31. metadata = DatasetMetadata(
  32. tenant_id=current_user.current_tenant_id,
  33. dataset_id=dataset_id,
  34. type=metadata_args.type,
  35. name=metadata_args.name,
  36. created_by=current_user.id,
  37. )
  38. db.session.add(metadata)
  39. db.session.commit()
  40. return metadata
  41. @staticmethod
  42. def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata: # type: ignore
  43. # check if metadata name is too long
  44. if len(name) > 255:
  45. raise ValueError("Metadata name cannot exceed 255 characters.")
  46. lock_key = f"dataset_metadata_lock_{dataset_id}"
  47. # check if metadata name already exists
  48. if (
  49. db.session.query(DatasetMetadata)
  50. .filter_by(tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=name)
  51. .first()
  52. ):
  53. raise ValueError("Metadata name already exists.")
  54. for field in BuiltInField:
  55. if field.value == name:
  56. raise ValueError("Metadata name already exists in Built-in fields.")
  57. try:
  58. MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
  59. metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first()
  60. if metadata is None:
  61. raise ValueError("Metadata not found.")
  62. old_name = metadata.name
  63. metadata.name = name
  64. metadata.updated_by = current_user.id
  65. metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
  66. # update related documents
  67. dataset_metadata_bindings = (
  68. db.session.query(DatasetMetadataBinding).filter_by(metadata_id=metadata_id).all()
  69. )
  70. if dataset_metadata_bindings:
  71. document_ids = [binding.document_id for binding in dataset_metadata_bindings]
  72. documents = DocumentService.get_document_by_ids(document_ids)
  73. for document in documents:
  74. doc_metadata = copy.deepcopy(document.doc_metadata)
  75. value = doc_metadata.pop(old_name, None)
  76. doc_metadata[name] = value
  77. document.doc_metadata = doc_metadata
  78. db.session.add(document)
  79. db.session.commit()
  80. return metadata # type: ignore
  81. except Exception:
  82. logging.exception("Update metadata name failed")
  83. finally:
  84. redis_client.delete(lock_key)
  85. @staticmethod
  86. def delete_metadata(dataset_id: str, metadata_id: str):
  87. lock_key = f"dataset_metadata_lock_{dataset_id}"
  88. try:
  89. MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
  90. metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first()
  91. if metadata is None:
  92. raise ValueError("Metadata not found.")
  93. db.session.delete(metadata)
  94. # deal related documents
  95. dataset_metadata_bindings = (
  96. db.session.query(DatasetMetadataBinding).filter_by(metadata_id=metadata_id).all()
  97. )
  98. if dataset_metadata_bindings:
  99. document_ids = [binding.document_id for binding in dataset_metadata_bindings]
  100. documents = DocumentService.get_document_by_ids(document_ids)
  101. for document in documents:
  102. doc_metadata = copy.deepcopy(document.doc_metadata)
  103. doc_metadata.pop(metadata.name, None)
  104. document.doc_metadata = doc_metadata
  105. db.session.add(document)
  106. db.session.commit()
  107. return metadata
  108. except Exception:
  109. logging.exception("Delete metadata failed")
  110. finally:
  111. redis_client.delete(lock_key)
  112. @staticmethod
  113. def get_built_in_fields():
  114. return [
  115. {"name": BuiltInField.document_name.value, "type": "string"},
  116. {"name": BuiltInField.uploader.value, "type": "string"},
  117. {"name": BuiltInField.upload_date.value, "type": "time"},
  118. {"name": BuiltInField.last_update_date.value, "type": "time"},
  119. {"name": BuiltInField.source.value, "type": "string"},
  120. ]
  121. @staticmethod
  122. def enable_built_in_field(dataset: Dataset):
  123. if dataset.built_in_field_enabled:
  124. return
  125. lock_key = f"dataset_metadata_lock_{dataset.id}"
  126. try:
  127. MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
  128. dataset.built_in_field_enabled = True
  129. db.session.add(dataset)
  130. documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
  131. if documents:
  132. for document in documents:
  133. if not document.doc_metadata:
  134. doc_metadata = {}
  135. else:
  136. doc_metadata = copy.deepcopy(document.doc_metadata)
  137. doc_metadata[BuiltInField.document_name.value] = document.name
  138. doc_metadata[BuiltInField.uploader.value] = document.uploader
  139. doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()
  140. doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()
  141. doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value
  142. document.doc_metadata = doc_metadata
  143. db.session.add(document)
  144. db.session.commit()
  145. except Exception:
  146. logging.exception("Enable built-in field failed")
  147. finally:
  148. redis_client.delete(lock_key)
  149. @staticmethod
  150. def disable_built_in_field(dataset: Dataset):
  151. if not dataset.built_in_field_enabled:
  152. return
  153. lock_key = f"dataset_metadata_lock_{dataset.id}"
  154. try:
  155. MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)
  156. dataset.built_in_field_enabled = False
  157. db.session.add(dataset)
  158. documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)
  159. document_ids = []
  160. if documents:
  161. for document in documents:
  162. doc_metadata = copy.deepcopy(document.doc_metadata)
  163. doc_metadata.pop(BuiltInField.document_name.value, None)
  164. doc_metadata.pop(BuiltInField.uploader.value, None)
  165. doc_metadata.pop(BuiltInField.upload_date.value, None)
  166. doc_metadata.pop(BuiltInField.last_update_date.value, None)
  167. doc_metadata.pop(BuiltInField.source.value, None)
  168. document.doc_metadata = doc_metadata
  169. db.session.add(document)
  170. document_ids.append(document.id)
  171. db.session.commit()
  172. except Exception:
  173. logging.exception("Disable built-in field failed")
  174. finally:
  175. redis_client.delete(lock_key)
  176. @staticmethod
  177. def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData):
  178. for operation in metadata_args.operation_data:
  179. lock_key = f"document_metadata_lock_{operation.document_id}"
  180. try:
  181. MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id)
  182. document = DocumentService.get_document(dataset.id, operation.document_id)
  183. if document is None:
  184. raise ValueError("Document not found.")
  185. doc_metadata = {}
  186. for metadata_value in operation.metadata_list:
  187. doc_metadata[metadata_value.name] = metadata_value.value
  188. if dataset.built_in_field_enabled:
  189. doc_metadata[BuiltInField.document_name.value] = document.name
  190. doc_metadata[BuiltInField.uploader.value] = document.uploader
  191. doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()
  192. doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()
  193. doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value
  194. document.doc_metadata = doc_metadata
  195. db.session.add(document)
  196. db.session.commit()
  197. # deal metadata binding
  198. db.session.query(DatasetMetadataBinding).filter_by(document_id=operation.document_id).delete()
  199. for metadata_value in operation.metadata_list:
  200. dataset_metadata_binding = DatasetMetadataBinding(
  201. tenant_id=current_user.current_tenant_id,
  202. dataset_id=dataset.id,
  203. document_id=operation.document_id,
  204. metadata_id=metadata_value.id,
  205. created_by=current_user.id,
  206. )
  207. db.session.add(dataset_metadata_binding)
  208. db.session.commit()
  209. except Exception:
  210. logging.exception("Update documents metadata failed")
  211. finally:
  212. redis_client.delete(lock_key)
  213. @staticmethod
  214. def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]):
  215. if dataset_id:
  216. lock_key = f"dataset_metadata_lock_{dataset_id}"
  217. if redis_client.get(lock_key):
  218. raise ValueError("Another knowledge base metadata operation is running, please wait a moment.")
  219. redis_client.set(lock_key, 1, ex=3600)
  220. if document_id:
  221. lock_key = f"document_metadata_lock_{document_id}"
  222. if redis_client.get(lock_key):
  223. raise ValueError("Another document metadata operation is running, please wait a moment.")
  224. redis_client.set(lock_key, 1, ex=3600)
  225. @staticmethod
  226. def get_dataset_metadatas(dataset: Dataset):
  227. return {
  228. "doc_metadata": [
  229. {
  230. "id": item.get("id"),
  231. "name": item.get("name"),
  232. "type": item.get("type"),
  233. "count": db.session.query(DatasetMetadataBinding)
  234. .filter_by(metadata_id=item.get("id"), dataset_id=dataset.id)
  235. .count(),
  236. }
  237. for item in dataset.doc_metadata or []
  238. if item.get("id") != "built-in"
  239. ],
  240. "built_in_field_enabled": dataset.built_in_field_enabled,
  241. }