Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

clean_messages.py 3.2KB

11 месяцев назад
10 месяцев назад
11 месяцев назад
10 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
11 месяцев назад
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import datetime
  2. import time
  3. import click
  4. from werkzeug.exceptions import NotFound
  5. import app
  6. from configs import dify_config
  7. from extensions.ext_database import db
  8. from extensions.ext_redis import redis_client
  9. from models.model import (
  10. App,
  11. Message,
  12. MessageAgentThought,
  13. MessageAnnotation,
  14. MessageChain,
  15. MessageFeedback,
  16. MessageFile,
  17. )
  18. from models.web import SavedMessage
  19. from services.feature_service import FeatureService
  20. @app.celery.task(queue="dataset")
  21. def clean_messages():
  22. click.echo(click.style("Start clean messages.", fg="green"))
  23. start_at = time.perf_counter()
  24. plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
  25. days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
  26. )
  27. page = 1
  28. while True:
  29. try:
  30. # Main query with join and filter
  31. # FIXME:for mypy no paginate method error
  32. messages = (
  33. db.session.query(Message) # type: ignore
  34. .filter(Message.created_at < plan_sandbox_clean_message_day)
  35. .order_by(Message.created_at.desc())
  36. .limit(100)
  37. .all()
  38. )
  39. except NotFound:
  40. break
  41. if not messages:
  42. break
  43. for message in messages:
  44. plan_sandbox_clean_message_day = message.created_at
  45. app = App.query.filter_by(id=message.app_id).first()
  46. features_cache_key = f"features:{app.tenant_id}"
  47. plan_cache = redis_client.get(features_cache_key)
  48. if plan_cache is None:
  49. features = FeatureService.get_features(app.tenant_id)
  50. redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
  51. plan = features.billing.subscription.plan
  52. else:
  53. plan = plan_cache.decode()
  54. if plan == "sandbox":
  55. # clean related message
  56. db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
  57. synchronize_session=False
  58. )
  59. db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
  60. synchronize_session=False
  61. )
  62. db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
  63. synchronize_session=False
  64. )
  65. db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
  66. synchronize_session=False
  67. )
  68. db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
  69. synchronize_session=False
  70. )
  71. db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
  72. synchronize_session=False
  73. )
  74. db.session.query(Message).filter(Message.id == message.id).delete()
  75. db.session.commit()
  76. end_at = time.perf_counter()
  77. click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))