You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

преди 11 месеца
преди 10 месеца
преди 11 месеца
преди 10 месеца
преди 11 месеца
преди 10 месеца
преди 11 месеца
преди 10 месеца
преди 11 месеца
преди 10 месеца
преди 11 месеца
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import datetime
  2. import time
  3. import click
  4. from werkzeug.exceptions import NotFound
  5. import app
  6. from configs import dify_config
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.model import (
  10. App,
  11. Message,
  12. MessageAgentThought,
  13. MessageAnnotation,
  14. MessageChain,
  15. MessageFeedback,
  16. MessageFile,
  17. )
  18. from models.web import SavedMessage
  19. from services.feature_service import FeatureService
  20. @app.celery.task(queue="dataset")
  21. def clean_messages():
  22. click.echo(click.style("Start clean messages.", fg="green"))
  23. start_at = time.perf_counter()
  24. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  25. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  26. )
  27. while True:
  28. try:
  29. # Main query with join and filter
  30. # FIXME:for mypy no paginate method error
  31. messages = (
  32. db.session.query(Message) # type: ignore
  33. .filter(Message.created_at < plan_sandbox_clean_message_day)
  34. .order_by(Message.created_at.desc())
  35. .limit(100)
  36. .all()
  37. )
  38. except NotFound:
  39. break
  40. if not messages:
  41. break
  42. for message in messages:
  43. plan_sandbox_clean_message_day = message.created_at
  44. app = App.query.filter_by(id=message.app_id).first()
  45. features_cache_key = f"features:{app.tenant_id}"
  46. plan_cache = redis_client.get(features_cache_key)
  47. if plan_cache is None:
  48. features = FeatureService.get_features(app.tenant_id)
  49. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  50. plan = features.billing.subscription.plan
  51. else:
  52. plan = plan_cache.decode()
  53. if plan == "sandbox":
  54. # clean related message
  55. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  56. synchronize_session=False
  57. )
  58. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  59. synchronize_session=False
  60. )
  61. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  62. synchronize_session=False
  63. )
  64. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  65. synchronize_session=False
  66. )
  67. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  68. synchronize_session=False
  69. )
  70. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  71. synchronize_session=False
  72. )
  73. db.session.query(Message).filter(Message.id == message.id).delete()
  74. db.session.commit()
  75. end_at = time.perf_counter()
  76. click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))