You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clean_messages.py 3.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.model import (
  11. App,
  12. Message,
  13. MessageAgentThought,
  14. MessageAnnotation,
  15. MessageChain,
  16. MessageFeedback,
  17. MessageFile,
  18. )
  19. from models.web import SavedMessage
  20. from services.feature_service import FeatureService
  21. _logger = logging.getLogger(__name__)
  22. @app.celery.task(queue="dataset")
  23. def clean_messages():
  24. click.echo(click.style("Start clean messages.", fg="green"))
  25. start_at = time.perf_counter()
  26. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  27. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  28. )
  29. while True:
  30. try:
  31. # Main query with join and filter
  32. messages = (
  33. db.session.query(Message)
  34. .filter(Message.created_at < plan_sandbox_clean_message_day)
  35. .order_by(Message.created_at.desc())
  36. .limit(100)
  37. .all()
  38. )
  39. except NotFound:
  40. break
  41. if not messages:
  42. break
  43. for message in messages:
  44. plan_sandbox_clean_message_day = message.created_at
  45. app = db.session.query(App).filter_by(id=message.app_id).first()
  46. if not app:
  47. _logger.warning(
  48. "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
  49. message.app_id,
  50. message.id,
  51. )
  52. continue
  53. features_cache_key = f"features:{app.tenant_id}"
  54. plan_cache = redis_client.get(features_cache_key)
  55. if plan_cache is None:
  56. features = FeatureService.get_features(app.tenant_id)
  57. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  58. plan = features.billing.subscription.plan
  59. else:
  60. plan = plan_cache.decode()
  61. if plan == "sandbox":
  62. # clean related message
  63. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  64. synchronize_session=False
  65. )
  66. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  67. synchronize_session=False
  68. )
  69. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  70. synchronize_session=False
  71. )
  72. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  73. synchronize_session=False
  74. )
  75. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  76. synchronize_session=False
  77. )
  78. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  79. synchronize_session=False
  80. )
  81. db.session.query(Message).filter(Message.id == message.id).delete()
  82. db.session.commit()
  83. end_at = time.perf_counter()
  84. click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))