You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ext_celery.py 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import ssl
  2. from datetime import timedelta
  3. from typing import Any, Optional
  4. import pytz
  5. from celery import Celery, Task
  6. from celery.schedules import crontab
  7. from configs import dify_config
  8. from dify_app import DifyApp
  9. def _get_celery_ssl_options() -> Optional[dict[str, Any]]:
  10. """Get SSL configuration for Celery broker/backend connections."""
  11. # Use REDIS_USE_SSL for consistency with the main Redis client
  12. # Only apply SSL if we're using Redis as broker/backend
  13. if not dify_config.REDIS_USE_SSL:
  14. return None
  15. # Check if Celery is actually using Redis
  16. broker_is_redis = dify_config.CELERY_BROKER_URL and (
  17. dify_config.CELERY_BROKER_URL.startswith("redis://") or dify_config.CELERY_BROKER_URL.startswith("rediss://")
  18. )
  19. if not broker_is_redis:
  20. return None
  21. # Map certificate requirement strings to SSL constants
  22. cert_reqs_map = {
  23. "CERT_NONE": ssl.CERT_NONE,
  24. "CERT_OPTIONAL": ssl.CERT_OPTIONAL,
  25. "CERT_REQUIRED": ssl.CERT_REQUIRED,
  26. }
  27. ssl_cert_reqs = cert_reqs_map.get(dify_config.REDIS_SSL_CERT_REQS, ssl.CERT_NONE)
  28. ssl_options = {
  29. "ssl_cert_reqs": ssl_cert_reqs,
  30. "ssl_ca_certs": dify_config.REDIS_SSL_CA_CERTS,
  31. "ssl_certfile": dify_config.REDIS_SSL_CERTFILE,
  32. "ssl_keyfile": dify_config.REDIS_SSL_KEYFILE,
  33. }
  34. return ssl_options
  35. def init_app(app: DifyApp) -> Celery:
  36. class FlaskTask(Task):
  37. def __call__(self, *args: object, **kwargs: object) -> object:
  38. with app.app_context():
  39. return self.run(*args, **kwargs)
  40. broker_transport_options = {}
  41. if dify_config.CELERY_USE_SENTINEL:
  42. broker_transport_options = {
  43. "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,
  44. "sentinel_kwargs": {
  45. "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,
  46. "password": dify_config.CELERY_SENTINEL_PASSWORD,
  47. },
  48. }
  49. celery_app = Celery(
  50. app.name,
  51. task_cls=FlaskTask,
  52. broker=dify_config.CELERY_BROKER_URL,
  53. backend=dify_config.CELERY_BACKEND,
  54. )
  55. celery_app.conf.update(
  56. result_backend=dify_config.CELERY_RESULT_BACKEND,
  57. broker_transport_options=broker_transport_options,
  58. broker_connection_retry_on_startup=True,
  59. worker_log_format=dify_config.LOG_FORMAT,
  60. worker_task_log_format=dify_config.LOG_FORMAT,
  61. worker_hijack_root_logger=False,
  62. timezone=pytz.timezone(dify_config.LOG_TZ or "UTC"),
  63. task_ignore_result=True,
  64. )
  65. # Apply SSL configuration if enabled
  66. ssl_options = _get_celery_ssl_options()
  67. if ssl_options:
  68. celery_app.conf.update(
  69. broker_use_ssl=ssl_options,
  70. # Also apply SSL to the backend if it's Redis
  71. redis_backend_use_ssl=ssl_options if dify_config.CELERY_BACKEND == "redis" else None,
  72. )
  73. if dify_config.LOG_FILE:
  74. celery_app.conf.update(
  75. worker_logfile=dify_config.LOG_FILE,
  76. )
  77. celery_app.set_default()
  78. app.extensions["celery"] = celery_app
  79. imports = []
  80. day = dify_config.CELERY_BEAT_SCHEDULER_TIME
  81. # if you add a new task, please add the switch to CeleryScheduleTasksConfig
  82. beat_schedule = {}
  83. if dify_config.ENABLE_CLEAN_EMBEDDING_CACHE_TASK:
  84. imports.append("schedule.clean_embedding_cache_task")
  85. beat_schedule["clean_embedding_cache_task"] = {
  86. "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
  87. "schedule": crontab(minute="0", hour="2", day_of_month=f"*/{day}"),
  88. }
  89. if dify_config.ENABLE_CLEAN_UNUSED_DATASETS_TASK:
  90. imports.append("schedule.clean_unused_datasets_task")
  91. beat_schedule["clean_unused_datasets_task"] = {
  92. "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
  93. "schedule": crontab(minute="0", hour="3", day_of_month=f"*/{day}"),
  94. }
  95. if dify_config.ENABLE_CREATE_TIDB_SERVERLESS_TASK:
  96. imports.append("schedule.create_tidb_serverless_task")
  97. beat_schedule["create_tidb_serverless_task"] = {
  98. "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task",
  99. "schedule": crontab(minute="0", hour="*"),
  100. }
  101. if dify_config.ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK:
  102. imports.append("schedule.update_tidb_serverless_status_task")
  103. beat_schedule["update_tidb_serverless_status_task"] = {
  104. "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
  105. "schedule": timedelta(minutes=10),
  106. }
  107. if dify_config.ENABLE_CLEAN_MESSAGES:
  108. imports.append("schedule.clean_messages")
  109. beat_schedule["clean_messages"] = {
  110. "task": "schedule.clean_messages.clean_messages",
  111. "schedule": crontab(minute="0", hour="4", day_of_month=f"*/{day}"),
  112. }
  113. if dify_config.ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK:
  114. imports.append("schedule.mail_clean_document_notify_task")
  115. beat_schedule["mail_clean_document_notify_task"] = {
  116. "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task",
  117. "schedule": crontab(minute="0", hour="10", day_of_week="1"),
  118. }
  119. if dify_config.ENABLE_DATASETS_QUEUE_MONITOR:
  120. imports.append("schedule.queue_monitor_task")
  121. beat_schedule["datasets-queue-monitor"] = {
  122. "task": "schedule.queue_monitor_task.queue_monitor_task",
  123. "schedule": timedelta(
  124. minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30
  125. ),
  126. }
  127. if dify_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK and dify_config.MARKETPLACE_ENABLED:
  128. imports.append("schedule.check_upgradable_plugin_task")
  129. beat_schedule["check_upgradable_plugin_task"] = {
  130. "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task",
  131. "schedule": crontab(minute="*/15"),
  132. }
  133. if dify_config.WORKFLOW_LOG_CLEANUP_ENABLED:
  134. # 2:00 AM every day
  135. imports.append("schedule.clean_workflow_runlogs_precise")
  136. beat_schedule["clean_workflow_runlogs_precise"] = {
  137. "task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise",
  138. "schedule": crontab(minute="0", hour="2"),
  139. }
  140. celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
  141. return celery_app