Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

queue_monitor_task.py 2.8KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import logging
  2. from datetime import datetime
  3. import click
  4. from kombu.utils.url import parse_url # type: ignore
  5. from redis import Redis
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from libs.email_i18n import EmailType, get_email_i18n_service
  10. redis_config = parse_url(dify_config.CELERY_BROKER_URL)
  11. celery_redis = Redis(
  12. host=redis_config.get("hostname") or "localhost",
  13. port=redis_config.get("port") or 6379,
  14. password=redis_config.get("password") or None,
  15. db=int(redis_config.get("virtual_host")) if redis_config.get("virtual_host") else 1,
  16. )
  17. logger = logging.getLogger(__name__)
  18. @app.celery.task(queue="monitor")
  19. def queue_monitor_task():
  20. queue_name = "dataset"
  21. threshold = dify_config.QUEUE_MONITOR_THRESHOLD
  22. if threshold is None:
  23. logger.warning(click.style("QUEUE_MONITOR_THRESHOLD is not configured, skipping monitoring", fg="yellow"))
  24. return
  25. try:
  26. queue_length = celery_redis.llen(f"{queue_name}")
  27. logger.info(click.style(f"Start monitor {queue_name}", fg="green"))
  28. if queue_length is None:
  29. logger.error(
  30. click.style(f"Failed to get queue length for {queue_name} - Redis may be unavailable", fg="red")
  31. )
  32. return
  33. logger.info(click.style(f"Queue length: {queue_length}", fg="green"))
  34. if queue_length >= threshold:
  35. warning_msg = f"Queue {queue_name} task count exceeded the limit.: {queue_length}/{threshold}"
  36. logging.warning(click.style(warning_msg, fg="red"))
  37. alert_emails = dify_config.QUEUE_MONITOR_ALERT_EMAILS
  38. if alert_emails:
  39. to_list = alert_emails.split(",")
  40. email_service = get_email_i18n_service()
  41. for to in to_list:
  42. try:
  43. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  44. email_service.send_email(
  45. email_type=EmailType.QUEUE_MONITOR_ALERT,
  46. language_code="en-US",
  47. to=to,
  48. template_context={
  49. "queue_name": queue_name,
  50. "queue_length": queue_length,
  51. "threshold": threshold,
  52. "alert_time": current_time,
  53. },
  54. )
  55. except Exception as e:
  56. logger.exception(click.style("Exception occurred during sending email", fg="red"))
  57. except Exception as e:
  58. logger.exception(click.style("Exception occurred during queue monitoring", fg="red"))
  59. finally:
  60. if db.session.is_active:
  61. db.session.close()