Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

clean_dataset_task.py 5.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task # type: ignore
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.tools.utils.rag_web_reader import get_image_upload_file_ids
  7. from extensions.ext_database import db
  8. from extensions.ext_storage import storage
  9. from models.dataset import (
  10. AppDatasetJoin,
  11. Dataset,
  12. DatasetMetadata,
  13. DatasetMetadataBinding,
  14. DatasetProcessRule,
  15. DatasetQuery,
  16. Document,
  17. DocumentSegment,
  18. )
  19. from models.model import UploadFile
  20. # Add import statement for ValueError
  21. @shared_task(queue="dataset")
  22. def clean_dataset_task(
  23. dataset_id: str,
  24. tenant_id: str,
  25. indexing_technique: str,
  26. index_struct: str,
  27. collection_binding_id: str,
  28. doc_form: str,
  29. ):
  30. """
  31. Clean dataset when dataset deleted.
  32. :param dataset_id: dataset id
  33. :param tenant_id: tenant id
  34. :param indexing_technique: indexing technique
  35. :param index_struct: index struct dict
  36. :param collection_binding_id: collection binding id
  37. :param doc_form: dataset form
  38. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  39. """
  40. logging.info(click.style(f"Start clean dataset when dataset deleted: {dataset_id}", fg="green"))
  41. start_at = time.perf_counter()
  42. try:
  43. dataset = Dataset(
  44. id=dataset_id,
  45. tenant_id=tenant_id,
  46. indexing_technique=indexing_technique,
  47. index_struct=index_struct,
  48. collection_binding_id=collection_binding_id,
  49. )
  50. documents = db.session.query(Document).where(Document.dataset_id == dataset_id).all()
  51. segments = db.session.query(DocumentSegment).where(DocumentSegment.dataset_id == dataset_id).all()
  52. # Fix: Always clean vector database resources regardless of document existence
  53. # This ensures all 33 vector databases properly drop tables/collections/indices
  54. if doc_form is None:
  55. # Use default paragraph index type for empty datasets to enable vector database cleanup
  56. from core.rag.index_processor.constant.index_type import IndexType
  57. doc_form = IndexType.PARAGRAPH_INDEX
  58. logging.info(
  59. click.style(f"No documents found, using default index type for cleanup: {doc_form}", fg="yellow")
  60. )
  61. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  62. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  63. if documents is None or len(documents) == 0:
  64. logging.info(click.style(f"No documents found for dataset: {dataset_id}", fg="green"))
  65. else:
  66. logging.info(click.style(f"Cleaning documents for dataset: {dataset_id}", fg="green"))
  67. for document in documents:
  68. db.session.delete(document)
  69. for segment in segments:
  70. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  71. for upload_file_id in image_upload_file_ids:
  72. image_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
  73. if image_file is None:
  74. continue
  75. try:
  76. storage.delete(image_file.key)
  77. except Exception:
  78. logging.exception(
  79. "Delete image_files failed when storage deleted, \
  80. image_upload_file_is: %s",
  81. upload_file_id,
  82. )
  83. db.session.delete(image_file)
  84. db.session.delete(segment)
  85. db.session.query(DatasetProcessRule).where(DatasetProcessRule.dataset_id == dataset_id).delete()
  86. db.session.query(DatasetQuery).where(DatasetQuery.dataset_id == dataset_id).delete()
  87. db.session.query(AppDatasetJoin).where(AppDatasetJoin.dataset_id == dataset_id).delete()
  88. # delete dataset metadata
  89. db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
  90. db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  91. # delete files
  92. if documents:
  93. for document in documents:
  94. try:
  95. if document.data_source_type == "upload_file":
  96. if document.data_source_info:
  97. data_source_info = document.data_source_info_dict
  98. if data_source_info and "upload_file_id" in data_source_info:
  99. file_id = data_source_info["upload_file_id"]
  100. file = (
  101. db.session.query(UploadFile)
  102. .where(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  103. .first()
  104. )
  105. if not file:
  106. continue
  107. storage.delete(file.key)
  108. db.session.delete(file)
  109. except Exception:
  110. continue
  111. db.session.commit()
  112. end_at = time.perf_counter()
  113. logging.info(
  114. click.style(f"Cleaned dataset when dataset deleted: {dataset_id} latency: {end_at - start_at}", fg="green")
  115. )
  116. except Exception:
  117. logging.exception("Cleaned dataset when dataset deleted failed")
  118. finally:
  119. db.session.close()