You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_unused_datasets_task.py 6.6KB

преди 1 година
преди 1 година
преди 1 година
преди 1 година
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import datetime
  2. import time
  3. from typing import TypedDict
  4. import click
  5. from sqlalchemy import func, select
  6. from sqlalchemy.exc import SQLAlchemyError
  7. import app
  8. from configs import dify_config
  9. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  10. from extensions.ext_database import db
  11. from extensions.ext_redis import redis_client
  12. from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
  13. from services.feature_service import FeatureService
  14. class CleanupConfig(TypedDict):
  15. clean_day: datetime.datetime
  16. plan_filter: str | None
  17. add_logs: bool
  18. @app.celery.task(queue="dataset")
  19. def clean_unused_datasets_task():
  20. click.echo(click.style("Start clean unused datasets indexes.", fg="green"))
  21. start_at = time.perf_counter()
  22. # Define cleanup configurations
  23. cleanup_configs: list[CleanupConfig] = [
  24. {
  25. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING),
  26. "plan_filter": None,
  27. "add_logs": True,
  28. },
  29. {
  30. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_PRO_CLEAN_DAY_SETTING),
  31. "plan_filter": "sandbox",
  32. "add_logs": False,
  33. },
  34. ]
  35. for config in cleanup_configs:
  36. clean_day = config["clean_day"]
  37. plan_filter = config["plan_filter"]
  38. add_logs = config["add_logs"]
  39. page = 1
  40. while True:
  41. try:
  42. # Subquery for counting new documents
  43. document_subquery_new = (
  44. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  45. .where(
  46. Document.indexing_status == "completed",
  47. Document.enabled == True,
  48. Document.archived == False,
  49. Document.updated_at > clean_day,
  50. )
  51. .group_by(Document.dataset_id)
  52. .subquery()
  53. )
  54. # Subquery for counting old documents
  55. document_subquery_old = (
  56. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  57. .where(
  58. Document.indexing_status == "completed",
  59. Document.enabled == True,
  60. Document.archived == False,
  61. Document.updated_at < clean_day,
  62. )
  63. .group_by(Document.dataset_id)
  64. .subquery()
  65. )
  66. # Main query with join and filter
  67. stmt = (
  68. select(Dataset)
  69. .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
  70. .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
  71. .where(
  72. Dataset.created_at < clean_day,
  73. func.coalesce(document_subquery_new.c.document_count, 0) == 0,
  74. func.coalesce(document_subquery_old.c.document_count, 0) > 0,
  75. )
  76. .order_by(Dataset.created_at.desc())
  77. )
  78. datasets = db.paginate(stmt, page=page, per_page=50, error_out=False)
  79. except SQLAlchemyError:
  80. raise
  81. if datasets is None or datasets.items is None or len(datasets.items) == 0:
  82. break
  83. for dataset in datasets:
  84. dataset_query = db.session.scalars(
  85. select(DatasetQuery).where(
  86. DatasetQuery.created_at > clean_day, DatasetQuery.dataset_id == dataset.id
  87. )
  88. ).all()
  89. if not dataset_query or len(dataset_query) == 0:
  90. try:
  91. should_clean = True
  92. # Check plan filter if specified
  93. if plan_filter:
  94. features_cache_key = f"features:{dataset.tenant_id}"
  95. plan_cache = redis_client.get(features_cache_key)
  96. if plan_cache is None:
  97. features = FeatureService.get_features(dataset.tenant_id)
  98. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  99. plan = features.billing.subscription.plan
  100. else:
  101. plan = plan_cache.decode()
  102. should_clean = plan == plan_filter
  103. if should_clean:
  104. # Add auto disable log if required
  105. if add_logs:
  106. documents = db.session.scalars(
  107. select(Document).where(
  108. Document.dataset_id == dataset.id,
  109. Document.enabled == True,
  110. Document.archived == False,
  111. )
  112. ).all()
  113. for document in documents:
  114. dataset_auto_disable_log = DatasetAutoDisableLog(
  115. tenant_id=dataset.tenant_id,
  116. dataset_id=dataset.id,
  117. document_id=document.id,
  118. )
  119. db.session.add(dataset_auto_disable_log)
  120. # Remove index
  121. index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
  122. index_processor.clean(dataset, None)
  123. # Update document
  124. db.session.query(Document).filter_by(dataset_id=dataset.id).update(
  125. {Document.enabled: False}
  126. )
  127. db.session.commit()
  128. click.echo(click.style(f"Cleaned unused dataset {dataset.id} from db success!", fg="green"))
  129. except Exception as e:
  130. click.echo(click.style(f"clean dataset index error: {e.__class__.__name__} {str(e)}", fg="red"))
  131. page += 1
  132. end_at = time.perf_counter()
  133. click.echo(click.style(f"Cleaned unused dataset from db success latency: {end_at - start_at}", fg="green"))