| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 | 
							- import functools
 - import logging
 - import ssl
 - from collections.abc import Callable
 - from datetime import timedelta
 - from typing import TYPE_CHECKING, Any, Union
 - 
 - import redis
 - from redis import RedisError
 - from redis.cache import CacheConfig
 - from redis.cluster import ClusterNode, RedisCluster
 - from redis.connection import Connection, SSLConnection
 - from redis.lock import Lock
 - from redis.sentinel import Sentinel
 - 
 - from configs import dify_config
 - from dify_app import DifyApp
 - 
 - if TYPE_CHECKING:
 -     from redis.lock import Lock
 - 
 - logger = logging.getLogger(__name__)
 - 
 - 
 - class RedisClientWrapper:
 -     """
 -     A wrapper class for the Redis client that addresses the issue where the global
 -     `redis_client` variable cannot be updated when a new Redis instance is returned
 -     by Sentinel.
 - 
 -     This class allows for deferred initialization of the Redis client, enabling the
 -     client to be re-initialized with a new instance when necessary. This is particularly
 -     useful in scenarios where the Redis instance may change dynamically, such as during
 -     a failover in a Sentinel-managed Redis setup.
 - 
 -     Attributes:
 -         _client: The actual Redis client instance. It remains None until
 -                  initialized with the `initialize` method.
 - 
 -     Methods:
 -         initialize(client): Initializes the Redis client if it hasn't been initialized already.
 -         __getattr__(item): Delegates attribute access to the Redis client, raising an error
 -                            if the client is not initialized.
 -     """
 - 
 -     _client: Union[redis.Redis, RedisCluster, None]
 - 
 -     def __init__(self) -> None:
 -         self._client = None
 - 
 -     def initialize(self, client: Union[redis.Redis, RedisCluster]) -> None:
 -         if self._client is None:
 -             self._client = client
 - 
 -     if TYPE_CHECKING:
 -         # Type hints for IDE support and static analysis
 -         # These are not executed at runtime but provide type information
 -         def get(self, name: str | bytes) -> Any: ...
 - 
 -         def set(
 -             self,
 -             name: str | bytes,
 -             value: Any,
 -             ex: int | None = None,
 -             px: int | None = None,
 -             nx: bool = False,
 -             xx: bool = False,
 -             keepttl: bool = False,
 -             get: bool = False,
 -             exat: int | None = None,
 -             pxat: int | None = None,
 -         ) -> Any: ...
 - 
 -         def setex(self, name: str | bytes, time: int | timedelta, value: Any) -> Any: ...
 -         def setnx(self, name: str | bytes, value: Any) -> Any: ...
 -         def delete(self, *names: str | bytes) -> Any: ...
 -         def incr(self, name: str | bytes, amount: int = 1) -> Any: ...
 -         def expire(
 -             self,
 -             name: str | bytes,
 -             time: int | timedelta,
 -             nx: bool = False,
 -             xx: bool = False,
 -             gt: bool = False,
 -             lt: bool = False,
 -         ) -> Any: ...
 -         def lock(
 -             self,
 -             name: str,
 -             timeout: float | None = None,
 -             sleep: float = 0.1,
 -             blocking: bool = True,
 -             blocking_timeout: float | None = None,
 -             thread_local: bool = True,
 -         ) -> Lock: ...
 -         def zadd(
 -             self,
 -             name: str | bytes,
 -             mapping: dict[str | bytes | int | float, float | int | str | bytes],
 -             nx: bool = False,
 -             xx: bool = False,
 -             ch: bool = False,
 -             incr: bool = False,
 -             gt: bool = False,
 -             lt: bool = False,
 -         ) -> Any: ...
 -         def zremrangebyscore(self, name: str | bytes, min: float | str, max: float | str) -> Any: ...
 -         def zcard(self, name: str | bytes) -> Any: ...
 -         def getdel(self, name: str | bytes) -> Any: ...
 - 
 -     def __getattr__(self, item: str) -> Any:
 -         if self._client is None:
 -             raise RuntimeError("Redis client is not initialized. Call init_app first.")
 -         return getattr(self._client, item)
 - 
 - 
 - redis_client: RedisClientWrapper = RedisClientWrapper()
 - 
 - 
 - def _get_ssl_configuration() -> tuple[type[Union[Connection, SSLConnection]], dict[str, Any]]:
 -     """Get SSL configuration for Redis connection."""
 -     if not dify_config.REDIS_USE_SSL:
 -         return Connection, {}
 - 
 -     cert_reqs_map = {
 -         "CERT_NONE": ssl.CERT_NONE,
 -         "CERT_OPTIONAL": ssl.CERT_OPTIONAL,
 -         "CERT_REQUIRED": ssl.CERT_REQUIRED,
 -     }
 -     ssl_cert_reqs = cert_reqs_map.get(dify_config.REDIS_SSL_CERT_REQS, ssl.CERT_NONE)
 - 
 -     ssl_kwargs = {
 -         "ssl_cert_reqs": ssl_cert_reqs,
 -         "ssl_ca_certs": dify_config.REDIS_SSL_CA_CERTS,
 -         "ssl_certfile": dify_config.REDIS_SSL_CERTFILE,
 -         "ssl_keyfile": dify_config.REDIS_SSL_KEYFILE,
 -     }
 - 
 -     return SSLConnection, ssl_kwargs
 - 
 - 
 - def _get_cache_configuration() -> CacheConfig | None:
 -     """Get client-side cache configuration if enabled."""
 -     if not dify_config.REDIS_ENABLE_CLIENT_SIDE_CACHE:
 -         return None
 - 
 -     resp_protocol = dify_config.REDIS_SERIALIZATION_PROTOCOL
 -     if resp_protocol < 3:
 -         raise ValueError("Client side cache is only supported in RESP3")
 - 
 -     return CacheConfig()
 - 
 - 
 - def _get_base_redis_params() -> dict[str, Any]:
 -     """Get base Redis connection parameters."""
 -     return {
 -         "username": dify_config.REDIS_USERNAME,
 -         "password": dify_config.REDIS_PASSWORD or None,
 -         "db": dify_config.REDIS_DB,
 -         "encoding": "utf-8",
 -         "encoding_errors": "strict",
 -         "decode_responses": False,
 -         "protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
 -         "cache_config": _get_cache_configuration(),
 -     }
 - 
 - 
 - def _create_sentinel_client(redis_params: dict[str, Any]) -> Union[redis.Redis, RedisCluster]:
 -     """Create Redis client using Sentinel configuration."""
 -     if not dify_config.REDIS_SENTINELS:
 -         raise ValueError("REDIS_SENTINELS must be set when REDIS_USE_SENTINEL is True")
 - 
 -     if not dify_config.REDIS_SENTINEL_SERVICE_NAME:
 -         raise ValueError("REDIS_SENTINEL_SERVICE_NAME must be set when REDIS_USE_SENTINEL is True")
 - 
 -     sentinel_hosts = [(node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",")]
 - 
 -     sentinel = Sentinel(
 -         sentinel_hosts,
 -         sentinel_kwargs={
 -             "socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT,
 -             "username": dify_config.REDIS_SENTINEL_USERNAME,
 -             "password": dify_config.REDIS_SENTINEL_PASSWORD,
 -         },
 -     )
 - 
 -     master: redis.Redis = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
 -     return master
 - 
 - 
 - def _create_cluster_client() -> Union[redis.Redis, RedisCluster]:
 -     """Create Redis cluster client."""
 -     if not dify_config.REDIS_CLUSTERS:
 -         raise ValueError("REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True")
 - 
 -     nodes = [
 -         ClusterNode(host=node.split(":")[0], port=int(node.split(":")[1]))
 -         for node in dify_config.REDIS_CLUSTERS.split(",")
 -     ]
 - 
 -     cluster: RedisCluster = RedisCluster(
 -         startup_nodes=nodes,
 -         password=dify_config.REDIS_CLUSTERS_PASSWORD,
 -         protocol=dify_config.REDIS_SERIALIZATION_PROTOCOL,
 -         cache_config=_get_cache_configuration(),
 -     )
 -     return cluster
 - 
 - 
 - def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis, RedisCluster]:
 -     """Create standalone Redis client."""
 -     connection_class, ssl_kwargs = _get_ssl_configuration()
 - 
 -     redis_params.update(
 -         {
 -             "host": dify_config.REDIS_HOST,
 -             "port": dify_config.REDIS_PORT,
 -             "connection_class": connection_class,
 -         }
 -     )
 - 
 -     if ssl_kwargs:
 -         redis_params.update(ssl_kwargs)
 - 
 -     pool = redis.ConnectionPool(**redis_params)
 -     client: redis.Redis = redis.Redis(connection_pool=pool)
 -     return client
 - 
 - 
 - def init_app(app: DifyApp):
 -     """Initialize Redis client and attach it to the app."""
 -     global redis_client
 - 
 -     # Determine Redis mode and create appropriate client
 -     if dify_config.REDIS_USE_SENTINEL:
 -         redis_params = _get_base_redis_params()
 -         client = _create_sentinel_client(redis_params)
 -     elif dify_config.REDIS_USE_CLUSTERS:
 -         client = _create_cluster_client()
 -     else:
 -         redis_params = _get_base_redis_params()
 -         client = _create_standalone_client(redis_params)
 - 
 -     # Initialize the wrapper and attach to app
 -     redis_client.initialize(client)
 -     app.extensions["redis"] = redis_client
 - 
 - 
 - def redis_fallback(default_return: Any | None = None):
 -     """
 -     decorator to handle Redis operation exceptions and return a default value when Redis is unavailable.
 - 
 -     Args:
 -         default_return: The value to return when a Redis operation fails. Defaults to None.
 -     """
 - 
 -     def decorator(func: Callable):
 -         @functools.wraps(func)
 -         def wrapper(*args, **kwargs):
 -             try:
 -                 return func(*args, **kwargs)
 -             except RedisError as e:
 -                 func_name = getattr(func, "__name__", "Unknown")
 -                 logger.warning("Redis operation failed in %s: %s", func_name, str(e), exc_info=True)
 -                 return default_return
 - 
 -         return wrapper
 - 
 -     return decorator
 
 
  |