You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_dataset_task.py 6.7KB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.tools.utils.web_reader_tool import get_image_upload_file_ids
  7. from extensions.ext_database import db
  8. from extensions.ext_storage import storage
  9. from models.dataset import (
  10. AppDatasetJoin,
  11. Dataset,
  12. DatasetMetadata,
  13. DatasetMetadataBinding,
  14. DatasetProcessRule,
  15. DatasetQuery,
  16. Document,
  17. DocumentSegment,
  18. )
  19. from models.model import UploadFile
  20. logger = logging.getLogger(__name__)
  21. # Add import statement for ValueError
  22. @shared_task(queue="dataset")
  23. def clean_dataset_task(
  24. dataset_id: str,
  25. tenant_id: str,
  26. indexing_technique: str,
  27. index_struct: str,
  28. collection_binding_id: str,
  29. doc_form: str,
  30. ):
  31. """
  32. Clean dataset when dataset deleted.
  33. :param dataset_id: dataset id
  34. :param tenant_id: tenant id
  35. :param indexing_technique: indexing technique
  36. :param index_struct: index struct dict
  37. :param collection_binding_id: collection binding id
  38. :param doc_form: dataset form
  39. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  40. """
  41. logger.info(click.style(f"Start clean dataset when dataset deleted: {dataset_id}", fg="green"))
  42. start_at = time.perf_counter()
  43. try:
  44. dataset = Dataset(
  45. id=dataset_id,
  46. tenant_id=tenant_id,
  47. indexing_technique=indexing_technique,
  48. index_struct=index_struct,
  49. collection_binding_id=collection_binding_id,
  50. )
  51. documents = db.session.query(Document).where(Document.dataset_id == dataset_id).all()
  52. segments = db.session.query(DocumentSegment).where(DocumentSegment.dataset_id == dataset_id).all()
  53. # Enhanced validation: Check if doc_form is None, empty string, or contains only whitespace
  54. # This ensures all invalid doc_form values are properly handled
  55. if doc_form is None or (isinstance(doc_form, str) and not doc_form.strip()):
  56. # Use default paragraph index type for empty/invalid datasets to enable vector database cleanup
  57. from core.rag.index_processor.constant.index_type import IndexType
  58. doc_form = IndexType.PARAGRAPH_INDEX
  59. logger.info(
  60. click.style(f"Invalid doc_form detected, using default index type for cleanup: {doc_form}", fg="yellow")
  61. )
  62. # Add exception handling around IndexProcessorFactory.clean() to prevent single point of failure
  63. # This ensures Document/Segment deletion can continue even if vector database cleanup fails
  64. try:
  65. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  66. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  67. logger.info(click.style(f"Successfully cleaned vector database for dataset: {dataset_id}", fg="green"))
  68. except Exception as index_cleanup_error:
  69. logger.exception(click.style(f"Failed to clean vector database for dataset {dataset_id}", fg="red"))
  70. # Continue with document and segment deletion even if vector cleanup fails
  71. logger.info(
  72. click.style(f"Continuing with document and segment deletion for dataset: {dataset_id}", fg="yellow")
  73. )
  74. if documents is None or len(documents) == 0:
  75. logger.info(click.style(f"No documents found for dataset: {dataset_id}", fg="green"))
  76. else:
  77. logger.info(click.style(f"Cleaning documents for dataset: {dataset_id}", fg="green"))
  78. for document in documents:
  79. db.session.delete(document)
  80. for segment in segments:
  81. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  82. for upload_file_id in image_upload_file_ids:
  83. image_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
  84. if image_file is None:
  85. continue
  86. try:
  87. storage.delete(image_file.key)
  88. except Exception:
  89. logger.exception(
  90. "Delete image_files failed when storage deleted, \
  91. image_upload_file_is: %s",
  92. upload_file_id,
  93. )
  94. db.session.delete(image_file)
  95. db.session.delete(segment)
  96. db.session.query(DatasetProcessRule).where(DatasetProcessRule.dataset_id == dataset_id).delete()
  97. db.session.query(DatasetQuery).where(DatasetQuery.dataset_id == dataset_id).delete()
  98. db.session.query(AppDatasetJoin).where(AppDatasetJoin.dataset_id == dataset_id).delete()
  99. # delete dataset metadata
  100. db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
  101. db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  102. # delete files
  103. if documents:
  104. for document in documents:
  105. try:
  106. if document.data_source_type == "upload_file":
  107. if document.data_source_info:
  108. data_source_info = document.data_source_info_dict
  109. if data_source_info and "upload_file_id" in data_source_info:
  110. file_id = data_source_info["upload_file_id"]
  111. file = (
  112. db.session.query(UploadFile)
  113. .where(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  114. .first()
  115. )
  116. if not file:
  117. continue
  118. storage.delete(file.key)
  119. db.session.delete(file)
  120. except Exception:
  121. continue
  122. db.session.commit()
  123. end_at = time.perf_counter()
  124. logger.info(
  125. click.style(f"Cleaned dataset when dataset deleted: {dataset_id} latency: {end_at - start_at}", fg="green")
  126. )
  127. except Exception:
  128. # Add rollback to prevent dirty session state in case of exceptions
  129. # This ensures the database session is properly cleaned up
  130. try:
  131. db.session.rollback()
  132. logger.info(click.style(f"Rolled back database session for dataset: {dataset_id}", fg="yellow"))
  133. except Exception as rollback_error:
  134. logger.exception("Failed to rollback database session")
  135. logger.exception("Cleaned dataset when dataset deleted failed")
  136. finally:
  137. db.session.close()