選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

ext_redis.py 7.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import functools
  2. import logging
  3. from collections.abc import Callable
  4. from datetime import timedelta
  5. from typing import TYPE_CHECKING, Any, Union
  6. import redis
  7. from redis import RedisError
  8. from redis.cache import CacheConfig
  9. from redis.cluster import ClusterNode, RedisCluster
  10. from redis.connection import Connection, SSLConnection
  11. from redis.lock import Lock
  12. from redis.sentinel import Sentinel
  13. from configs import dify_config
  14. from dify_app import DifyApp
  15. if TYPE_CHECKING:
  16. from redis.lock import Lock
  17. logger = logging.getLogger(__name__)
  18. class RedisClientWrapper:
  19. """
  20. A wrapper class for the Redis client that addresses the issue where the global
  21. `redis_client` variable cannot be updated when a new Redis instance is returned
  22. by Sentinel.
  23. This class allows for deferred initialization of the Redis client, enabling the
  24. client to be re-initialized with a new instance when necessary. This is particularly
  25. useful in scenarios where the Redis instance may change dynamically, such as during
  26. a failover in a Sentinel-managed Redis setup.
  27. Attributes:
  28. _client: The actual Redis client instance. It remains None until
  29. initialized with the `initialize` method.
  30. Methods:
  31. initialize(client): Initializes the Redis client if it hasn't been initialized already.
  32. __getattr__(item): Delegates attribute access to the Redis client, raising an error
  33. if the client is not initialized.
  34. """
  35. _client: Union[redis.Redis, RedisCluster, None]
  36. def __init__(self) -> None:
  37. self._client = None
  38. def initialize(self, client: Union[redis.Redis, RedisCluster]) -> None:
  39. if self._client is None:
  40. self._client = client
  41. if TYPE_CHECKING:
  42. # Type hints for IDE support and static analysis
  43. # These are not executed at runtime but provide type information
  44. def get(self, name: str | bytes) -> Any: ...
  45. def set(
  46. self,
  47. name: str | bytes,
  48. value: Any,
  49. ex: int | None = None,
  50. px: int | None = None,
  51. nx: bool = False,
  52. xx: bool = False,
  53. keepttl: bool = False,
  54. get: bool = False,
  55. exat: int | None = None,
  56. pxat: int | None = None,
  57. ) -> Any: ...
  58. def setex(self, name: str | bytes, time: int | timedelta, value: Any) -> Any: ...
  59. def setnx(self, name: str | bytes, value: Any) -> Any: ...
  60. def delete(self, *names: str | bytes) -> Any: ...
  61. def incr(self, name: str | bytes, amount: int = 1) -> Any: ...
  62. def expire(
  63. self,
  64. name: str | bytes,
  65. time: int | timedelta,
  66. nx: bool = False,
  67. xx: bool = False,
  68. gt: bool = False,
  69. lt: bool = False,
  70. ) -> Any: ...
  71. def lock(
  72. self,
  73. name: str,
  74. timeout: float | None = None,
  75. sleep: float = 0.1,
  76. blocking: bool = True,
  77. blocking_timeout: float | None = None,
  78. thread_local: bool = True,
  79. ) -> Lock: ...
  80. def zadd(
  81. self,
  82. name: str | bytes,
  83. mapping: dict[str | bytes | int | float, float | int | str | bytes],
  84. nx: bool = False,
  85. xx: bool = False,
  86. ch: bool = False,
  87. incr: bool = False,
  88. gt: bool = False,
  89. lt: bool = False,
  90. ) -> Any: ...
  91. def zremrangebyscore(self, name: str | bytes, min: float | str, max: float | str) -> Any: ...
  92. def zcard(self, name: str | bytes) -> Any: ...
  93. def getdel(self, name: str | bytes) -> Any: ...
  94. def __getattr__(self, item: str) -> Any:
  95. if self._client is None:
  96. raise RuntimeError("Redis client is not initialized. Call init_app first.")
  97. return getattr(self._client, item)
  98. redis_client: RedisClientWrapper = RedisClientWrapper()
  99. def init_app(app: DifyApp):
  100. global redis_client
  101. connection_class: type[Union[Connection, SSLConnection]] = Connection
  102. if dify_config.REDIS_USE_SSL:
  103. connection_class = SSLConnection
  104. resp_protocol = dify_config.REDIS_SERIALIZATION_PROTOCOL
  105. if dify_config.REDIS_ENABLE_CLIENT_SIDE_CACHE:
  106. if resp_protocol >= 3:
  107. clientside_cache_config = CacheConfig()
  108. else:
  109. raise ValueError("Client side cache is only supported in RESP3")
  110. else:
  111. clientside_cache_config = None
  112. redis_params: dict[str, Any] = {
  113. "username": dify_config.REDIS_USERNAME,
  114. "password": dify_config.REDIS_PASSWORD or None, # Temporary fix for empty password
  115. "db": dify_config.REDIS_DB,
  116. "encoding": "utf-8",
  117. "encoding_errors": "strict",
  118. "decode_responses": False,
  119. "protocol": resp_protocol,
  120. "cache_config": clientside_cache_config,
  121. }
  122. if dify_config.REDIS_USE_SENTINEL:
  123. assert dify_config.REDIS_SENTINELS is not None, "REDIS_SENTINELS must be set when REDIS_USE_SENTINEL is True"
  124. assert dify_config.REDIS_SENTINEL_SERVICE_NAME is not None, (
  125. "REDIS_SENTINEL_SERVICE_NAME must be set when REDIS_USE_SENTINEL is True"
  126. )
  127. sentinel_hosts = [
  128. (node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",")
  129. ]
  130. sentinel = Sentinel(
  131. sentinel_hosts,
  132. sentinel_kwargs={
  133. "socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT,
  134. "username": dify_config.REDIS_SENTINEL_USERNAME,
  135. "password": dify_config.REDIS_SENTINEL_PASSWORD,
  136. },
  137. )
  138. master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
  139. redis_client.initialize(master)
  140. elif dify_config.REDIS_USE_CLUSTERS:
  141. assert dify_config.REDIS_CLUSTERS is not None, "REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True"
  142. nodes = [
  143. ClusterNode(host=node.split(":")[0], port=int(node.split(":")[1]))
  144. for node in dify_config.REDIS_CLUSTERS.split(",")
  145. ]
  146. redis_client.initialize(
  147. RedisCluster(
  148. startup_nodes=nodes,
  149. password=dify_config.REDIS_CLUSTERS_PASSWORD,
  150. protocol=resp_protocol,
  151. cache_config=clientside_cache_config,
  152. )
  153. )
  154. else:
  155. redis_params.update(
  156. {
  157. "host": dify_config.REDIS_HOST,
  158. "port": dify_config.REDIS_PORT,
  159. "connection_class": connection_class,
  160. "protocol": resp_protocol,
  161. "cache_config": clientside_cache_config,
  162. }
  163. )
  164. pool = redis.ConnectionPool(**redis_params)
  165. redis_client.initialize(redis.Redis(connection_pool=pool))
  166. app.extensions["redis"] = redis_client
  167. def redis_fallback(default_return: Any = None):
  168. """
  169. decorator to handle Redis operation exceptions and return a default value when Redis is unavailable.
  170. Args:
  171. default_return: The value to return when a Redis operation fails. Defaults to None.
  172. """
  173. def decorator(func: Callable):
  174. @functools.wraps(func)
  175. def wrapper(*args, **kwargs):
  176. try:
  177. return func(*args, **kwargs)
  178. except RedisError as e:
  179. logger.warning("Redis operation failed in %s: %s", func.__name__, str(e), exc_info=True)
  180. return default_return
  181. return wrapper
  182. return decorator