Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

queue_monitor_task.py 2.4KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import logging
  2. from datetime import datetime
  3. from urllib.parse import urlparse
  4. import click
  5. from flask import render_template
  6. from redis import Redis
  7. import app
  8. from configs import dify_config
  9. from extensions.ext_database import db
  10. from extensions.ext_mail import mail
  11. # Create a dedicated Redis connection (using the same configuration as Celery)
  12. celery_broker_url = dify_config.CELERY_BROKER_URL
  13. parsed = urlparse(celery_broker_url)
  14. host = parsed.hostname or "localhost"
  15. port = parsed.port or 6379
  16. password = parsed.password or None
  17. redis_db = parsed.path.strip("/") or "1" # type: ignore
  18. celery_redis = Redis(host=host, port=port, password=password, db=redis_db)
  19. @app.celery.task(queue="monitor")
  20. def queue_monitor_task():
  21. queue_name = "dataset"
  22. threshold = dify_config.QUEUE_MONITOR_THRESHOLD
  23. try:
  24. queue_length = celery_redis.llen(f"{queue_name}")
  25. logging.info(click.style(f"Start monitor {queue_name}", fg="green"))
  26. logging.info(click.style(f"Queue length: {queue_length}", fg="green"))
  27. if queue_length >= threshold:
  28. warning_msg = f"Queue {queue_name} task count exceeded the limit.: {queue_length}/{threshold}"
  29. logging.warning(click.style(warning_msg, fg="red"))
  30. alter_emails = dify_config.QUEUE_MONITOR_ALERT_EMAILS
  31. if alter_emails:
  32. to_list = alter_emails.split(",")
  33. for to in to_list:
  34. try:
  35. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  36. html_content = render_template(
  37. "queue_monitor_alert_email_template_en-US.html",
  38. queue_name=queue_name,
  39. queue_length=queue_length,
  40. threshold=threshold,
  41. alert_time=current_time,
  42. )
  43. mail.send(
  44. to=to, subject="Alert: Dataset Queue pending tasks exceeded the limit", html=html_content
  45. )
  46. except Exception as e:
  47. logging.exception(click.style("Exception occurred during sending email", fg="red"))
  48. except Exception as e:
  49. logging.exception(click.style("Exception occurred during queue monitoring", fg="red"))
  50. finally:
  51. if db.session.is_active:
  52. db.session.close()