You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_dataset_task.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task # type: ignore
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.tools.utils.rag_web_reader import get_image_upload_file_ids
  7. from extensions.ext_database import db
  8. from extensions.ext_storage import storage
  9. from models.dataset import (
  10. AppDatasetJoin,
  11. Dataset,
  12. DatasetMetadata,
  13. DatasetMetadataBinding,
  14. DatasetProcessRule,
  15. DatasetQuery,
  16. Document,
  17. DocumentSegment,
  18. )
  19. from models.model import UploadFile
  20. # Add import statement for ValueError
  21. @shared_task(queue="dataset")
  22. def clean_dataset_task(
  23. dataset_id: str,
  24. tenant_id: str,
  25. indexing_technique: str,
  26. index_struct: str,
  27. collection_binding_id: str,
  28. doc_form: str,
  29. ):
  30. """
  31. Clean dataset when dataset deleted.
  32. :param dataset_id: dataset id
  33. :param tenant_id: tenant id
  34. :param indexing_technique: indexing technique
  35. :param index_struct: index struct dict
  36. :param collection_binding_id: collection binding id
  37. :param doc_form: dataset form
  38. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  39. """
  40. logging.info(click.style(f"Start clean dataset when dataset deleted: {dataset_id}", fg="green"))
  41. start_at = time.perf_counter()
  42. try:
  43. dataset = Dataset(
  44. id=dataset_id,
  45. tenant_id=tenant_id,
  46. indexing_technique=indexing_technique,
  47. index_struct=index_struct,
  48. collection_binding_id=collection_binding_id,
  49. )
  50. documents = db.session.query(Document).where(Document.dataset_id == dataset_id).all()
  51. segments = db.session.query(DocumentSegment).where(DocumentSegment.dataset_id == dataset_id).all()
  52. # Fix: Always clean vector database resources regardless of document existence
  53. # This ensures all 33 vector databases properly drop tables/collections/indices
  54. if doc_form is None:
  55. raise ValueError("Index type must be specified.")
  56. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  57. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  58. if documents is None or len(documents) == 0:
  59. logging.info(click.style(f"No documents found for dataset: {dataset_id}", fg="green"))
  60. else:
  61. logging.info(click.style(f"Cleaning documents for dataset: {dataset_id}", fg="green"))
  62. for document in documents:
  63. db.session.delete(document)
  64. for segment in segments:
  65. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  66. for upload_file_id in image_upload_file_ids:
  67. image_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
  68. if image_file is None:
  69. continue
  70. try:
  71. storage.delete(image_file.key)
  72. except Exception:
  73. logging.exception(
  74. "Delete image_files failed when storage deleted, \
  75. image_upload_file_is: %s",
  76. upload_file_id,
  77. )
  78. db.session.delete(image_file)
  79. db.session.delete(segment)
  80. db.session.query(DatasetProcessRule).where(DatasetProcessRule.dataset_id == dataset_id).delete()
  81. db.session.query(DatasetQuery).where(DatasetQuery.dataset_id == dataset_id).delete()
  82. db.session.query(AppDatasetJoin).where(AppDatasetJoin.dataset_id == dataset_id).delete()
  83. # delete dataset metadata
  84. db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
  85. db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  86. # delete files
  87. if documents:
  88. for document in documents:
  89. try:
  90. if document.data_source_type == "upload_file":
  91. if document.data_source_info:
  92. data_source_info = document.data_source_info_dict
  93. if data_source_info and "upload_file_id" in data_source_info:
  94. file_id = data_source_info["upload_file_id"]
  95. file = (
  96. db.session.query(UploadFile)
  97. .where(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  98. .first()
  99. )
  100. if not file:
  101. continue
  102. storage.delete(file.key)
  103. db.session.delete(file)
  104. except Exception:
  105. continue
  106. db.session.commit()
  107. end_at = time.perf_counter()
  108. logging.info(
  109. click.style(f"Cleaned dataset when dataset deleted: {dataset_id} latency: {end_at - start_at}", fg="green")
  110. )
  111. except Exception:
  112. logging.exception("Cleaned dataset when dataset deleted failed")
  113. finally:
  114. db.session.close()