Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

clean_messages.py 3.4KB

11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
5 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from sqlalchemy.exc import SQLAlchemyError
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from extensions.ext_redis import redis_client
  10. from models.model import (
  11. App,
  12. Message,
  13. MessageAgentThought,
  14. MessageAnnotation,
  15. MessageChain,
  16. MessageFeedback,
  17. MessageFile,
  18. )
  19. from models.web import SavedMessage
  20. from services.feature_service import FeatureService
  21. _logger = logging.getLogger(__name__)
  22. @app.celery.task(queue="dataset")
  23. def clean_messages():
  24. click.echo(click.style("Start clean messages.", fg="green"))
  25. start_at = time.perf_counter()
  26. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  27. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  28. )
  29. while True:
  30. try:
  31. # Main query with join and filter
  32. messages = (
  33. db.session.query(Message)
  34. .where(Message.created_at < plan_sandbox_clean_message_day)
  35. .order_by(Message.created_at.desc())
  36. .limit(100)
  37. .all()
  38. )
  39. except SQLAlchemyError:
  40. raise
  41. if not messages:
  42. break
  43. for message in messages:
  44. plan_sandbox_clean_message_day = message.created_at
  45. app = db.session.query(App).filter_by(id=message.app_id).first()
  46. if not app:
  47. _logger.warning(
  48. "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
  49. message.app_id,
  50. message.id,
  51. )
  52. continue
  53. features_cache_key = f"features:{app.tenant_id}"
  54. plan_cache = redis_client.get(features_cache_key)
  55. if plan_cache is None:
  56. features = FeatureService.get_features(app.tenant_id)
  57. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  58. plan = features.billing.subscription.plan
  59. else:
  60. plan = plan_cache.decode()
  61. if plan == "sandbox":
  62. # clean related message
  63. db.session.query(MessageFeedback).where(MessageFeedback.message_id == message.id).delete(
  64. synchronize_session=False
  65. )
  66. db.session.query(MessageAnnotation).where(MessageAnnotation.message_id == message.id).delete(
  67. synchronize_session=False
  68. )
  69. db.session.query(MessageChain).where(MessageChain.message_id == message.id).delete(
  70. synchronize_session=False
  71. )
  72. db.session.query(MessageAgentThought).where(MessageAgentThought.message_id == message.id).delete(
  73. synchronize_session=False
  74. )
  75. db.session.query(MessageFile).where(MessageFile.message_id == message.id).delete(
  76. synchronize_session=False
  77. )
  78. db.session.query(SavedMessage).where(SavedMessage.message_id == message.id).delete(
  79. synchronize_session=False
  80. )
  81. db.session.query(Message).where(Message.id == message.id).delete()
  82. db.session.commit()
  83. end_at = time.perf_counter()
  84. click.echo(click.style(f"Cleaned messages from db success latency: {end_at - start_at}", fg="green"))