You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_dataset_task.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import logging
  2. import time
  3. import click
  4. from celery import shared_task # type: ignore
  5. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  6. from core.tools.utils.rag_web_reader import get_image_upload_file_ids
  7. from extensions.ext_database import db
  8. from extensions.ext_storage import storage
  9. from models.dataset import (
  10. AppDatasetJoin,
  11. Dataset,
  12. DatasetMetadata,
  13. DatasetMetadataBinding,
  14. DatasetProcessRule,
  15. DatasetQuery,
  16. Document,
  17. DocumentSegment,
  18. )
  19. from models.model import UploadFile
  20. # Add import statement for ValueError
  21. @shared_task(queue="dataset")
  22. def clean_dataset_task(
  23. dataset_id: str,
  24. tenant_id: str,
  25. indexing_technique: str,
  26. index_struct: str,
  27. collection_binding_id: str,
  28. doc_form: str,
  29. ):
  30. """
  31. Clean dataset when dataset deleted.
  32. :param dataset_id: dataset id
  33. :param tenant_id: tenant id
  34. :param indexing_technique: indexing technique
  35. :param index_struct: index struct dict
  36. :param collection_binding_id: collection binding id
  37. :param doc_form: dataset form
  38. Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
  39. """
  40. logging.info(click.style("Start clean dataset when dataset deleted: {}".format(dataset_id), fg="green"))
  41. start_at = time.perf_counter()
  42. try:
  43. dataset = Dataset(
  44. id=dataset_id,
  45. tenant_id=tenant_id,
  46. indexing_technique=indexing_technique,
  47. index_struct=index_struct,
  48. collection_binding_id=collection_binding_id,
  49. )
  50. documents = db.session.query(Document).filter(Document.dataset_id == dataset_id).all()
  51. segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all()
  52. if documents is None or len(documents) == 0:
  53. logging.info(click.style("No documents found for dataset: {}".format(dataset_id), fg="green"))
  54. else:
  55. logging.info(click.style("Cleaning documents for dataset: {}".format(dataset_id), fg="green"))
  56. # Specify the index type before initializing the index processor
  57. if doc_form is None:
  58. raise ValueError("Index type must be specified.")
  59. index_processor = IndexProcessorFactory(doc_form).init_index_processor()
  60. index_processor.clean(dataset, None, with_keywords=True, delete_child_chunks=True)
  61. for document in documents:
  62. db.session.delete(document)
  63. for segment in segments:
  64. image_upload_file_ids = get_image_upload_file_ids(segment.content)
  65. for upload_file_id in image_upload_file_ids:
  66. image_file = db.session.query(UploadFile).filter(UploadFile.id == upload_file_id).first()
  67. if image_file is None:
  68. continue
  69. try:
  70. storage.delete(image_file.key)
  71. except Exception:
  72. logging.exception(
  73. "Delete image_files failed when storage deleted, \
  74. image_upload_file_is: {}".format(upload_file_id)
  75. )
  76. db.session.delete(image_file)
  77. db.session.delete(segment)
  78. db.session.query(DatasetProcessRule).filter(DatasetProcessRule.dataset_id == dataset_id).delete()
  79. db.session.query(DatasetQuery).filter(DatasetQuery.dataset_id == dataset_id).delete()
  80. db.session.query(AppDatasetJoin).filter(AppDatasetJoin.dataset_id == dataset_id).delete()
  81. # delete dataset metadata
  82. db.session.query(DatasetMetadata).filter(DatasetMetadata.dataset_id == dataset_id).delete()
  83. db.session.query(DatasetMetadataBinding).filter(DatasetMetadataBinding.dataset_id == dataset_id).delete()
  84. # delete files
  85. if documents:
  86. for document in documents:
  87. try:
  88. if document.data_source_type == "upload_file":
  89. if document.data_source_info:
  90. data_source_info = document.data_source_info_dict
  91. if data_source_info and "upload_file_id" in data_source_info:
  92. file_id = data_source_info["upload_file_id"]
  93. file = (
  94. db.session.query(UploadFile)
  95. .filter(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id)
  96. .first()
  97. )
  98. if not file:
  99. continue
  100. storage.delete(file.key)
  101. db.session.delete(file)
  102. except Exception:
  103. continue
  104. db.session.commit()
  105. end_at = time.perf_counter()
  106. logging.info(
  107. click.style(
  108. "Cleaned dataset when dataset deleted: {} latency: {}".format(dataset_id, end_at - start_at), fg="green"
  109. )
  110. )
  111. except Exception:
  112. logging.exception("Cleaned dataset when dataset deleted failed")
  113. finally:
  114. db.session.close()