Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

clean_messages.py 3.5KB

11 месяцев назад
11 месяцев назад
11 месяцев назад
10 месяцев назад
11 месяцев назад
10 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.model import (
  11. App,
  12. Message,
  13. MessageAgentThought,
  14. MessageAnnotation,
  15. MessageChain,
  16. MessageFeedback,
  17. MessageFile,
  18. )
  19. from models.web import SavedMessage
  20. from services.feature_service import FeatureService
  21. _logger = logging.getLogger(__name__)
  22. @app.celery.task(queue="dataset")
  23. def clean_messages():
  24. click.echo(click.style("Start clean messages.", fg="green"))
  25. start_at = time.perf_counter()
  26. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  27. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  28. )
  29. while True:
  30. try:
  31. # Main query with join and filter
  32. # FIXME:for mypy no paginate method error
  33. messages = (
  34. db.session.query(Message) # type: ignore
  35. .filter(Message.created_at < plan_sandbox_clean_message_day)
  36. .order_by(Message.created_at.desc())
  37. .limit(100)
  38. .all()
  39. )
  40. except NotFound:
  41. break
  42. if not messages:
  43. break
  44. for message in messages:
  45. plan_sandbox_clean_message_day = message.created_at
  46. app = db.session.query(App).filter_by(id=message.app_id).first()
  47. if not app:
  48. _logger.warning(
  49. "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
  50. message.app_id,
  51. message.id,
  52. )
  53. continue
  54. features_cache_key = f"features:{app.tenant_id}"
  55. plan_cache = redis_client.get(features_cache_key)
  56. if plan_cache is None:
  57. features = FeatureService.get_features(app.tenant_id)
  58. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  59. plan = features.billing.subscription.plan
  60. else:
  61. plan = plan_cache.decode()
  62. if plan == "sandbox":
  63. # clean related message
  64. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  65. synchronize_session=False
  66. )
  67. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  68. synchronize_session=False
  69. )
  70. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  71. synchronize_session=False
  72. )
  73. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  74. synchronize_session=False
  75. )
  76. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  77. synchronize_session=False
  78. )
  79. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  80. synchronize_session=False
  81. )
  82. db.session.query(Message).filter(Message.id == message.id).delete()
  83. db.session.commit()
  84. end_at = time.perf_counter()
  85. click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))