You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 2 години
преди 10 месеца
преди 2 години
преди 2 години
преди 2 години
преди 2 години
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task
  5. from sqlalchemy import select
  6. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  7. from core.tools.utils.web_reader_tool import get_image_upload_file_ids
  8. from extensions.ext_database import db
  9. from extensions.ext_storage import storage
  10. from models.dataset import (
  11. AppDatasetJoin,
  12. Dataset,
  13. DatasetMetadata,
  14. DatasetMetadataBinding,
  15. DatasetProcessRule,
  16. DatasetQuery,
  17. Document,
  18. DocumentSegment,
  19. )
  20. from models.model import UploadFile
  21. logger = logging.getLogger(__name__)
  22. # Add import statement for ValueError
  23. @shared_task(queue="dataset")
  24. def clean_dataset_task(
  25. dataset_id: str,
  26. tenant_id: str,
  27. indexing_technique: str,
  28. index_struct: str,
  29. collection_binding_id: str,
  30. doc_form: str,
  31. ):
  32. """
  33. Clean dataset when dataset deleted.
  34. :param dataset_id: dataset id
  35. :param tenant_id: tenant id
  36. :param indexing_technique: indexing technique
  37. :param index_struct: index struct dict
  38. :param collection_binding_id: collection binding id
  39. :param doc_form: dataset form
  40. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  41. """
  42. logger.info(click.style(f"Start clean dataset when dataset deleted: {dataset_id}", fg="green"))
  43. start_at = time.perf_counter()
  44. try:
  45. dataset = Dataset(
  46. id=dataset_id,
  47. tenant_id=tenant_id,
  48. indexing_technique=indexing_technique,
  49. index_struct=index_struct,
  50. collection_binding_id=collection_binding_id,
  51. )
  52. documents = db.session.scalars(select(Document).where(Document.dataset_id == dataset_id)).all()
  53. segments = db.session.scalars(select(DocumentSegment).where(DocumentSegment.dataset_id == dataset_id)).all()
  54. # Enhanced validation: Check if doc_form is None, empty string, or contains only whitespace
  55. # This ensures all invalid doc_form values are properly handled
  56. if doc_form is None or (isinstance(doc_form, str) and not doc_form.strip()):
  57. # Use default paragraph index type for empty/invalid datasets to enable vector database cleanup
  58. from core.rag.index_processor.constant.index_type import IndexType
  59. doc_form = IndexType.PARAGRAPH_INDEX
  60. logger.info(
  61. click.style(f"Invalid doc_form detected, using default index type for cleanup: {doc_form}", fg="yellow")
  62. )
  63. # Add exception handling around IndexProcessorFactory.clean() to prevent single point of failure
  64. # This ensures Document/Segment deletion can continue even if vector database cleanup fails
  65. try:
  66. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  67. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  68. logger.info(click.style(f"Successfully cleaned vector database for dataset: {dataset_id}", fg="green"))
  69. except Exception:
  70. logger.exception(click.style(f"Failed to clean vector database for dataset {dataset_id}", fg="red"))
  71. # Continue with document and segment deletion even if vector cleanup fails
  72. logger.info(
  73. click.style(f"Continuing with document and segment deletion for dataset: {dataset_id}", fg="yellow")
  74. )
  75. if documents is None or len(documents) == 0:
  76. logger.info(click.style(f"No documents found for dataset: {dataset_id}", fg="green"))
  77. else:
  78. logger.info(click.style(f"Cleaning documents for dataset: {dataset_id}", fg="green"))
  79. for document in documents:
  80. db.session.delete(document)
  81. for segment in segments:
  82. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  83. for upload_file_id in image_upload_file_ids:
  84. image_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
  85. if image_file is None:
  86. continue
  87. try:
  88. storage.delete(image_file.key)
  89. except Exception:
  90. logger.exception(
  91. "Delete image_files failed when storage deleted, \
  92. image_upload_file_is: %s",
  93. upload_file_id,
  94. )
  95. db.session.delete(image_file)
  96. db.session.delete(segment)
  97. db.session.query(DatasetProcessRule).where(DatasetProcessRule.dataset_id == dataset_id).delete()
  98. db.session.query(DatasetQuery).where(DatasetQuery.dataset_id == dataset_id).delete()
  99. db.session.query(AppDatasetJoin).where(AppDatasetJoin.dataset_id == dataset_id).delete()
  100. # delete dataset metadata
  101. db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
  102. db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  103. # delete files
  104. if documents:
  105. for document in documents:
  106. try:
  107. if document.data_source_type == "upload_file":
  108. if document.data_source_info:
  109. data_source_info = document.data_source_info_dict
  110. if data_source_info and "upload_file_id" in data_source_info:
  111. file_id = data_source_info["upload_file_id"]
  112. file = (
  113. db.session.query(UploadFile)
  114. .where(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  115. .first()
  116. )
  117. if not file:
  118. continue
  119. storage.delete(file.key)
  120. db.session.delete(file)
  121. except Exception:
  122. continue
  123. db.session.commit()
  124. end_at = time.perf_counter()
  125. logger.info(
  126. click.style(f"Cleaned dataset when dataset deleted: {dataset_id} latency: {end_at - start_at}", fg="green")
  127. )
  128. except Exception:
  129. # Add rollback to prevent dirty session state in case of exceptions
  130. # This ensures the database session is properly cleaned up
  131. try:
  132. db.session.rollback()
  133. logger.info(click.style(f"Rolled back database session for dataset: {dataset_id}", fg="yellow"))
  134. except Exception:
  135. logger.exception("Failed to rollback database session")
  136. logger.exception("Cleaned dataset when dataset deleted failed")
  137. finally:
  138. db.session.close()