You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_unused_datasets_task.py 6.5KB

преди 1 година
преди 1 година
преди 1 година
преди 1 година
преди 1 година
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import datetime
  2. import time
  3. from typing import Optional, TypedDict
  4. import click
  5. from sqlalchemy import func, select
  6. from sqlalchemy.exc import SQLAlchemyError
  7. import app
  8. from configs import dify_config
  9. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  10. from extensions.ext_database import db
  11. from extensions.ext_redis import redis_client
  12. from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
  13. from services.feature_service import FeatureService
  14. class CleanupConfig(TypedDict):
  15. clean_day: datetime.datetime
  16. plan_filter: Optional[str]
  17. add_logs: bool
  18. @app.celery.task(queue="dataset")
  19. def clean_unused_datasets_task():
  20. click.echo(click.style("Start clean unused datasets indexes.", fg="green"))
  21. start_at = time.perf_counter()
  22. # Define cleanup configurations
  23. cleanup_configs: list[CleanupConfig] = [
  24. {
  25. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING),
  26. "plan_filter": None,
  27. "add_logs": True,
  28. },
  29. {
  30. "clean_day": datetime.datetime.now() - datetime.timedelta(days=dify_config.PLAN_PRO_CLEAN_DAY_SETTING),
  31. "plan_filter": "sandbox",
  32. "add_logs": False,
  33. },
  34. ]
  35. for config in cleanup_configs:
  36. clean_day = config["clean_day"]
  37. plan_filter = config["plan_filter"]
  38. add_logs = config["add_logs"]
  39. while True:
  40. try:
  41. # Subquery for counting new documents
  42. document_subquery_new = (
  43. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  44. .where(
  45. Document.indexing_status == "completed",
  46. Document.enabled == True,
  47. Document.archived == False,
  48. Document.updated_at > clean_day,
  49. )
  50. .group_by(Document.dataset_id)
  51. .subquery()
  52. )
  53. # Subquery for counting old documents
  54. document_subquery_old = (
  55. db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
  56. .where(
  57. Document.indexing_status == "completed",
  58. Document.enabled == True,
  59. Document.archived == False,
  60. Document.updated_at < clean_day,
  61. )
  62. .group_by(Document.dataset_id)
  63. .subquery()
  64. )
  65. # Main query with join and filter
  66. stmt = (
  67. select(Dataset)
  68. .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
  69. .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
  70. .where(
  71. Dataset.created_at < clean_day,
  72. func.coalesce(document_subquery_new.c.document_count, 0) == 0,
  73. func.coalesce(document_subquery_old.c.document_count, 0) > 0,
  74. )
  75. .order_by(Dataset.created_at.desc())
  76. )
  77. datasets = db.paginate(stmt, page=1, per_page=50)
  78. except SQLAlchemyError:
  79. raise
  80. if datasets.items is None or len(datasets.items) == 0:
  81. break
  82. for dataset in datasets:
  83. dataset_query = (
  84. db.session.query(DatasetQuery)
  85. .where(DatasetQuery.created_at > clean_day, DatasetQuery.dataset_id == dataset.id)
  86. .all()
  87. )
  88. if not dataset_query or len(dataset_query) == 0:
  89. try:
  90. should_clean = True
  91. # Check plan filter if specified
  92. if plan_filter:
  93. features_cache_key = f"features:{dataset.tenant_id}"
  94. plan_cache = redis_client.get(features_cache_key)
  95. if plan_cache is None:
  96. features = FeatureService.get_features(dataset.tenant_id)
  97. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  98. plan = features.billing.subscription.plan
  99. else:
  100. plan = plan_cache.decode()
  101. should_clean = plan == plan_filter
  102. if should_clean:
  103. # Add auto disable log if required
  104. if add_logs:
  105. documents = (
  106. db.session.query(Document)
  107. .where(
  108. Document.dataset_id == dataset.id,
  109. Document.enabled == True,
  110. Document.archived == False,
  111. )
  112. .all()
  113. )
  114. for document in documents:
  115. dataset_auto_disable_log = DatasetAutoDisableLog(
  116. tenant_id=dataset.tenant_id,
  117. dataset_id=dataset.id,
  118. document_id=document.id,
  119. )
  120. db.session.add(dataset_auto_disable_log)
  121. # Remove index
  122. index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
  123. index_processor.clean(dataset, None)
  124. # Update document
  125. db.session.query(Document).filter_by(dataset_id=dataset.id).update(
  126. {Document.enabled: False}
  127. )
  128. db.session.commit()
  129. click.echo(click.style(f"Cleaned unused dataset {dataset.id} from db success!", fg="green"))
  130. except Exception as e:
  131. click.echo(click.style(f"clean dataset index error: {e.__class__.__name__} {str(e)}", fg="red"))
  132. end_at = time.perf_counter()
  133. click.echo(click.style(f"Cleaned unused dataset from db success latency: {end_at - start_at}", fg="green"))