You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_monitor_task.py 2.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import logging
  2. from datetime import datetime
  3. from urllib.parse import urlparse
  4. import click
  5. from redis import Redis
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from libs.email_i18n import EmailType, get_email_i18n_service
  10. # Create a dedicated Redis connection (using the same configuration as Celery)
  11. celery_broker_url = dify_config.CELERY_BROKER_URL
  12. parsed = urlparse(celery_broker_url)
  13. host = parsed.hostname or "localhost"
  14. port = parsed.port or 6379
  15. password = parsed.password or None
  16. redis_db = parsed.path.strip("/") or "1" # type: ignore
  17. celery_redis = Redis(host=host, port=port, password=password, db=redis_db)
  18. @app.celery.task(queue="monitor")
  19. def queue_monitor_task():
  20. queue_name = "dataset"
  21. threshold = dify_config.QUEUE_MONITOR_THRESHOLD
  22. try:
  23. queue_length = celery_redis.llen(f"{queue_name}")
  24. logging.info(click.style(f"Start monitor {queue_name}", fg="green"))
  25. logging.info(click.style(f"Queue length: {queue_length}", fg="green"))
  26. if queue_length >= threshold:
  27. warning_msg = f"Queue {queue_name} task count exceeded the limit.: {queue_length}/{threshold}"
  28. logging.warning(click.style(warning_msg, fg="red"))
  29. alter_emails = dify_config.QUEUE_MONITOR_ALERT_EMAILS
  30. if alter_emails:
  31. to_list = alter_emails.split(",")
  32. email_service = get_email_i18n_service()
  33. for to in to_list:
  34. try:
  35. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  36. email_service.send_email(
  37. email_type=EmailType.QUEUE_MONITOR_ALERT,
  38. language_code="en-US",
  39. to=to,
  40. template_context={
  41. "queue_name": queue_name,
  42. "queue_length": queue_length,
  43. "threshold": threshold,
  44. "alert_time": current_time,
  45. },
  46. )
  47. except Exception as e:
  48. logging.exception(click.style("Exception occurred during sending email", fg="red"))
  49. except Exception as e:
  50. logging.exception(click.style("Exception occurred during queue monitoring", fg="red"))
  51. finally:
  52. if db.session.is_active:
  53. db.session.close()