| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- import logging
- from datetime import datetime
- from urllib.parse import urlparse
-
- import click
- from flask import render_template
- from redis import Redis
-
- import app
- from configs import dify_config
- from extensions.ext_database import db
- from extensions.ext_mail import mail
-
- # Create a dedicated Redis connection (using the same configuration as Celery)
- celery_broker_url = dify_config.CELERY_BROKER_URL
-
- parsed = urlparse(celery_broker_url)
- host = parsed.hostname or "localhost"
- port = parsed.port or 6379
- password = parsed.password or None
- redis_db = parsed.path.strip("/") or "1" # type: ignore
-
- celery_redis = Redis(host=host, port=port, password=password, db=redis_db)
-
-
- @app.celery.task(queue="monitor")
- def queue_monitor_task():
- queue_name = "dataset"
- threshold = dify_config.QUEUE_MONITOR_THRESHOLD
-
- try:
- queue_length = celery_redis.llen(f"{queue_name}")
- logging.info(click.style(f"Start monitor {queue_name}", fg="green"))
- logging.info(click.style(f"Queue length: {queue_length}", fg="green"))
-
- if queue_length >= threshold:
- warning_msg = f"Queue {queue_name} task count exceeded the limit.: {queue_length}/{threshold}"
- logging.warning(click.style(warning_msg, fg="red"))
- alter_emails = dify_config.QUEUE_MONITOR_ALERT_EMAILS
- if alter_emails:
- to_list = alter_emails.split(",")
- for to in to_list:
- try:
- current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- html_content = render_template(
- "queue_monitor_alert_email_template_en-US.html",
- queue_name=queue_name,
- queue_length=queue_length,
- threshold=threshold,
- alert_time=current_time,
- )
- mail.send(
- to=to, subject="Alert: Dataset Queue pending tasks exceeded the limit", html=html_content
- )
- except Exception as e:
- logging.exception(click.style("Exception occurred during sending email", fg="red"))
-
- except Exception as e:
- logging.exception(click.style("Exception occurred during queue monitoring", fg="red"))
- finally:
- if db.session.is_active:
- db.session.close()
|