選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

clean_unused_datasets_task.py 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import datetime
  2. import time
  3. from typing import TypedDict
  4. import click
  5. from sqlalchemy import func, select
  6. from sqlalchemy.exc import SQLAlchemyError
  7. import app
  8. from configs import dify_config
  9. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  10. from extensions.ext_database import db
  11. from extensions.ext_redis import redis_client
  12. from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
  13. from services.feature_service import FeatureService
  14. class CleanupConfig(TypedDict):
  15. clean_day: datetime.datetime
  16. plan_filter: str | None
  17. add_logs: bool
  18. @app.celery.task(queue="dataset")
  19. def clean_unused_datasets_task():
  20. click.echo(click.style("Start clean unused datasets indexes.", fg="green"))
  21. start_at = time.perf_counter()
  22. # Define cleanup configurations
  23. cleanup_configs: list[CleanupConfig] = [
  24. {
  25. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING),
  26. "plan_filter": None,
  27. "add_logs": True,
  28. },
  29. {
  30. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_PRO_CLEAN_DAY_SETTING),
  31. "plan_filter": "sandbox",
  32. "add_logs": False,
  33. },
  34. ]
  35. for config in cleanup_configs:
  36. clean_day = config["clean_day"]
  37. plan_filter = config["plan_filter"]
  38. add_logs = config["add_logs"]
  39. page = 1
  40. while True:
  41. try:
  42. # Subquery for counting new documents
  43. document_subquery_new = (
  44. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  45. .where(
  46. Document.indexing_status == "completed",
  47. Document.enabled == True,
  48. Document.archived == False,
  49. Document.updated_at > clean_day,
  50. )
  51. .group_by(Document.dataset_id)
  52. .subquery()
  53. )
  54. # Subquery for counting old documents
  55. document_subquery_old = (
  56. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  57. .where(
  58. Document.indexing_status == "completed",
  59. Document.enabled == True,
  60. Document.archived == False,
  61. Document.updated_at < clean_day,
  62. )
  63. .group_by(Document.dataset_id)
  64. .subquery()
  65. )
  66. # Main query with join and filter
  67. stmt = (
  68. select(Dataset)
  69. .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
  70. .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
  71. .where(
  72. Dataset.created_at < clean_day,
  73. func.coalesce(document_subquery_new.c.document_count, 0) == 0,
  74. func.coalesce(document_subquery_old.c.document_count, 0) > 0,
  75. )
  76. .order_by(Dataset.created_at.desc())
  77. )
  78. datasets = db.paginate(stmt, page=page, per_page=50, error_out=False)
  79. except SQLAlchemyError:
  80. raise
  81. if datasets is None or datasets.items is None or len(datasets.items) == 0:
  82. break
  83. for dataset in datasets:
  84. dataset_query = db.session.scalars(
  85. select(DatasetQuery).where(
  86. DatasetQuery.created_at > clean_day, DatasetQuery.dataset_id == dataset.id
  87. )
  88. ).all()
  89. if not dataset_query or len(dataset_query) == 0:
  90. try:
  91. should_clean = True
  92. # Check plan filter if specified
  93. if plan_filter:
  94. features_cache_key = f"features:{dataset.tenant_id}"
  95. plan_cache = redis_client.get(features_cache_key)
  96. if plan_cache is None:
  97. features = FeatureService.get_features(dataset.tenant_id)
  98. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  99. plan = features.billing.subscription.plan
  100. else:
  101. plan = plan_cache.decode()
  102. should_clean = plan == plan_filter
  103. if should_clean:
  104. # Add auto disable log if required
  105. if add_logs:
  106. documents = db.session.scalars(
  107. select(Document).where(
  108. Document.dataset_id == dataset.id,
  109. Document.enabled == True,
  110. Document.archived == False,
  111. )
  112. ).all()
  113. for document in documents:
  114. dataset_auto_disable_log = DatasetAutoDisableLog(
  115. tenant_id=dataset.tenant_id,
  116. dataset_id=dataset.id,
  117. document_id=document.id,
  118. )
  119. db.session.add(dataset_auto_disable_log)
  120. # Remove index
  121. index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
  122. index_processor.clean(dataset, None)
  123. # Update document
  124. db.session.query(Document).filter_by(dataset_id=dataset.id).update(
  125. {Document.enabled: False}
  126. )
  127. db.session.commit()
  128. click.echo(click.style(f"Cleaned unused dataset {dataset.id} from db success!", fg="green"))
  129. except Exception as e:
  130. click.echo(click.style(f"clean dataset index error: {e.__class__.__name__} {str(e)}", fg="red"))
  131. page += 1
  132. end_at = time.perf_counter()
  133. click.echo(click.style(f"Cleaned unused dataset from db success latency: {end_at - start_at}", fg="green"))