Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.model import (
  11. App,
  12. Message,
  13. MessageAgentThought,
  14. MessageAnnotation,
  15. MessageChain,
  16. MessageFeedback,
  17. MessageFile,
  18. )
  19. from models.web import SavedMessage
  20. from services.feature_service import FeatureService
  21. _logger = logging.getLogger(__name__)
  22. @app.celery.task(queue="dataset")
  23. def clean_messages():
  24. click.echo(click.style("Start clean messages.", fg="green"))
  25. start_at = time.perf_counter()
  26. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  27. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  28. )
  29. while True:
  30. try:
  31. # Main query with join and filter
  32. # FIXME:for mypy no paginate method error
  33. messages = (
  34. db.session.query(Message) # type: ignore
  35. .filter(Message.created_at < plan_sandbox_clean_message_day)
  36. .order_by(Message.created_at.desc())
  37. .limit(100)
  38. .all()
  39. )
  40. except NotFound:
  41. break
  42. if not messages:
  43. break
  44. for message in messages:
  45. plan_sandbox_clean_message_day = message.created_at
  46. app = db.session.query(App).filter_by(id=message.app_id).first()
  47. if not app:
  48. _logger.warning(
  49. "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
  50. message.app_id,
  51. message.id,
  52. )
  53. continue
  54. features_cache_key = f"features:{app.tenant_id}"
  55. plan_cache = redis_client.get(features_cache_key)
  56. if plan_cache is None:
  57. features = FeatureService.get_features(app.tenant_id)
  58. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  59. plan = features.billing.subscription.plan
  60. else:
  61. plan = plan_cache.decode()
  62. if plan == "sandbox":
  63. # clean related message
  64. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  65. synchronize_session=False
  66. )
  67. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  68. synchronize_session=False
  69. )
  70. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  71. synchronize_session=False
  72. )
  73. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  74. synchronize_session=False
  75. )
  76. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  77. synchronize_session=False
  78. )
  79. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  80. synchronize_session=False
  81. )
  82. db.session.query(Message).filter(Message.id == message.id).delete()
  83. db.session.commit()
  84. end_at = time.perf_counter()
  85. click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))