Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

clean_dataset_task.py 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.tools.utils.web_reader_tool import get_image_upload_file_ids
  7. from extensions.ext_database import db
  8. from extensions.ext_storage import storage
  9. from models.dataset import (
  10. AppDatasetJoin,
  11. Dataset,
  12. DatasetMetadata,
  13. DatasetMetadataBinding,
  14. DatasetProcessRule,
  15. DatasetQuery,
  16. Document,
  17. DocumentSegment,
  18. )
  19. from models.model import UploadFile
  20. # Add import statement for ValueError
  21. @shared_task(queue="dataset")
  22. def clean_dataset_task(
  23. dataset_id: str,
  24. tenant_id: str,
  25. indexing_technique: str,
  26. index_struct: str,
  27. collection_binding_id: str,
  28. doc_form: str,
  29. ):
  30. """
  31. Clean dataset when dataset deleted.
  32. :param dataset_id: dataset id
  33. :param tenant_id: tenant id
  34. :param indexing_technique: indexing technique
  35. :param index_struct: index struct dict
  36. :param collection_binding_id: collection binding id
  37. :param doc_form: dataset form
  38. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  39. """
  40. logging.info(click.style(f"Start clean dataset when dataset deleted: {dataset_id}", fg="green"))
  41. start_at = time.perf_counter()
  42. try:
  43. dataset = Dataset(
  44. id=dataset_id,
  45. tenant_id=tenant_id,
  46. indexing_technique=indexing_technique,
  47. index_struct=index_struct,
  48. collection_binding_id=collection_binding_id,
  49. )
  50. documents = db.session.query(Document).where(Document.dataset_id == dataset_id).all()
  51. segments = db.session.query(DocumentSegment).where(DocumentSegment.dataset_id == dataset_id).all()
  52. # Enhanced validation: Check if doc_form is None, empty string, or contains only whitespace
  53. # This ensures all invalid doc_form values are properly handled
  54. if doc_form is None or (isinstance(doc_form, str) and not doc_form.strip()):
  55. # Use default paragraph index type for empty/invalid datasets to enable vector database cleanup
  56. from core.rag.index_processor.constant.index_type import IndexType
  57. doc_form = IndexType.PARAGRAPH_INDEX
  58. logging.info(
  59. click.style(f"Invalid doc_form detected, using default index type for cleanup: {doc_form}", fg="yellow")
  60. )
  61. # Add exception handling around IndexProcessorFactory.clean() to prevent single point of failure
  62. # This ensures Document/Segment deletion can continue even if vector database cleanup fails
  63. try:
  64. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  65. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  66. logging.info(click.style(f"Successfully cleaned vector database for dataset: {dataset_id}", fg="green"))
  67. except Exception as index_cleanup_error:
  68. logging.exception(click.style(f"Failed to clean vector database for dataset {dataset_id}", fg="red"))
  69. # Continue with document and segment deletion even if vector cleanup fails
  70. logging.info(
  71. click.style(f"Continuing with document and segment deletion for dataset: {dataset_id}", fg="yellow")
  72. )
  73. if documents is None or len(documents) == 0:
  74. logging.info(click.style(f"No documents found for dataset: {dataset_id}", fg="green"))
  75. else:
  76. logging.info(click.style(f"Cleaning documents for dataset: {dataset_id}", fg="green"))
  77. for document in documents:
  78. db.session.delete(document)
  79. for segment in segments:
  80. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  81. for upload_file_id in image_upload_file_ids:
  82. image_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
  83. if image_file is None:
  84. continue
  85. try:
  86. storage.delete(image_file.key)
  87. except Exception:
  88. logging.exception(
  89. "Delete image_files failed when storage deleted, \
  90. image_upload_file_is: %s",
  91. upload_file_id,
  92. )
  93. db.session.delete(image_file)
  94. db.session.delete(segment)
  95. db.session.query(DatasetProcessRule).where(DatasetProcessRule.dataset_id == dataset_id).delete()
  96. db.session.query(DatasetQuery).where(DatasetQuery.dataset_id == dataset_id).delete()
  97. db.session.query(AppDatasetJoin).where(AppDatasetJoin.dataset_id == dataset_id).delete()
  98. # delete dataset metadata
  99. db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
  100. db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  101. # delete files
  102. if documents:
  103. for document in documents:
  104. try:
  105. if document.data_source_type == "upload_file":
  106. if document.data_source_info:
  107. data_source_info = document.data_source_info_dict
  108. if data_source_info and "upload_file_id" in data_source_info:
  109. file_id = data_source_info["upload_file_id"]
  110. file = (
  111. db.session.query(UploadFile)
  112. .where(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  113. .first()
  114. )
  115. if not file:
  116. continue
  117. storage.delete(file.key)
  118. db.session.delete(file)
  119. except Exception:
  120. continue
  121. db.session.commit()
  122. end_at = time.perf_counter()
  123. logging.info(
  124. click.style(f"Cleaned dataset when dataset deleted: {dataset_id} latency: {end_at - start_at}", fg="green")
  125. )
  126. except Exception:
  127. # Add rollback to prevent dirty session state in case of exceptions
  128. # This ensures the database session is properly cleaned up
  129. try:
  130. db.session.rollback()
  131. logging.info(click.style(f"Rolled back database session for dataset: {dataset_id}", fg="yellow"))
  132. except Exception as rollback_error:
  133. logging.exception("Failed to rollback database session")
  134. logging.exception("Cleaned dataset when dataset deleted failed")
  135. finally:
  136. db.session.close()