您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #
  2. # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import logging
  17. import json
  18. import uuid
  19. import valkey as redis
  20. from rag import settings
  21. from rag.utils import singleton
  22. from valkey.lock import Lock
  23. class RedisMsg:
  24. def __init__(self, consumer, queue_name, group_name, msg_id, message):
  25. self.__consumer = consumer
  26. self.__queue_name = queue_name
  27. self.__group_name = group_name
  28. self.__msg_id = msg_id
  29. self.__message = json.loads(message["message"])
  30. def ack(self):
  31. try:
  32. self.__consumer.xack(self.__queue_name, self.__group_name, self.__msg_id)
  33. return True
  34. except Exception as e:
  35. logging.warning("[EXCEPTION]ack" + str(self.__queue_name) + "||" + str(e))
  36. return False
  37. def get_message(self):
  38. return self.__message
  39. def get_msg_id(self):
  40. return self.__msg_id
  41. @singleton
  42. class RedisDB:
  43. def __init__(self):
  44. self.REDIS = None
  45. self.config = settings.REDIS
  46. self.__open__()
  47. def __open__(self):
  48. try:
  49. self.REDIS = redis.StrictRedis(
  50. host=self.config["host"].split(":")[0],
  51. port=int(self.config.get("host", ":6379").split(":")[1]),
  52. db=int(self.config.get("db", 1)),
  53. password=self.config.get("password"),
  54. decode_responses=True,
  55. )
  56. except Exception:
  57. logging.warning("Redis can't be connected.")
  58. return self.REDIS
  59. def health(self):
  60. self.REDIS.ping()
  61. a, b = "xx", "yy"
  62. self.REDIS.set(a, b, 3)
  63. if self.REDIS.get(a) == b:
  64. return True
  65. def is_alive(self):
  66. return self.REDIS is not None
  67. def exist(self, k):
  68. if not self.REDIS:
  69. return
  70. try:
  71. return self.REDIS.exists(k)
  72. except Exception as e:
  73. logging.warning("RedisDB.exist " + str(k) + " got exception: " + str(e))
  74. self.__open__()
  75. def get(self, k):
  76. if not self.REDIS:
  77. return
  78. try:
  79. return self.REDIS.get(k)
  80. except Exception as e:
  81. logging.warning("RedisDB.get " + str(k) + " got exception: " + str(e))
  82. self.__open__()
  83. def set_obj(self, k, obj, exp=3600):
  84. try:
  85. self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
  86. return True
  87. except Exception as e:
  88. logging.warning("RedisDB.set_obj " + str(k) + " got exception: " + str(e))
  89. self.__open__()
  90. return False
  91. def set(self, k, v, exp=3600):
  92. try:
  93. self.REDIS.set(k, v, exp)
  94. return True
  95. except Exception as e:
  96. logging.warning("RedisDB.set " + str(k) + " got exception: " + str(e))
  97. self.__open__()
  98. return False
  99. def sadd(self, key: str, member: str):
  100. try:
  101. self.REDIS.sadd(key, member)
  102. return True
  103. except Exception as e:
  104. logging.warning("RedisDB.sadd " + str(key) + " got exception: " + str(e))
  105. self.__open__()
  106. return False
  107. def srem(self, key: str, member: str):
  108. try:
  109. self.REDIS.srem(key, member)
  110. return True
  111. except Exception as e:
  112. logging.warning("RedisDB.srem " + str(key) + " got exception: " + str(e))
  113. self.__open__()
  114. return False
  115. def smembers(self, key: str):
  116. try:
  117. res = self.REDIS.smembers(key)
  118. return res
  119. except Exception as e:
  120. logging.warning(
  121. "RedisDB.smembers " + str(key) + " got exception: " + str(e)
  122. )
  123. self.__open__()
  124. return None
  125. def zadd(self, key: str, member: str, score: float):
  126. try:
  127. self.REDIS.zadd(key, {member: score})
  128. return True
  129. except Exception as e:
  130. logging.warning("RedisDB.zadd " + str(key) + " got exception: " + str(e))
  131. self.__open__()
  132. return False
  133. def zcount(self, key: str, min: float, max: float):
  134. try:
  135. res = self.REDIS.zcount(key, min, max)
  136. return res
  137. except Exception as e:
  138. logging.warning("RedisDB.zcount " + str(key) + " got exception: " + str(e))
  139. self.__open__()
  140. return 0
  141. def zpopmin(self, key: str, count: int):
  142. try:
  143. res = self.REDIS.zpopmin(key, count)
  144. return res
  145. except Exception as e:
  146. logging.warning("RedisDB.zpopmin " + str(key) + " got exception: " + str(e))
  147. self.__open__()
  148. return None
  149. def zrangebyscore(self, key: str, min: float, max: float):
  150. try:
  151. res = self.REDIS.zrangebyscore(key, min, max)
  152. return res
  153. except Exception as e:
  154. logging.warning(
  155. "RedisDB.zrangebyscore " + str(key) + " got exception: " + str(e)
  156. )
  157. self.__open__()
  158. return None
  159. def transaction(self, key, value, exp=3600):
  160. try:
  161. pipeline = self.REDIS.pipeline(transaction=True)
  162. pipeline.set(key, value, exp, nx=True)
  163. pipeline.execute()
  164. return True
  165. except Exception as e:
  166. logging.warning(
  167. "RedisDB.transaction " + str(key) + " got exception: " + str(e)
  168. )
  169. self.__open__()
  170. return False
  171. def queue_product(self, queue, message, exp=settings.SVR_QUEUE_RETENTION) -> bool:
  172. for _ in range(3):
  173. try:
  174. payload = {"message": json.dumps(message)}
  175. pipeline = self.REDIS.pipeline()
  176. pipeline.xadd(queue, payload)
  177. # pipeline.expire(queue, exp)
  178. pipeline.execute()
  179. return True
  180. except Exception as e:
  181. logging.exception(
  182. "RedisDB.queue_product " + str(queue) + " got exception: " + str(e)
  183. )
  184. return False
  185. def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> RedisMsg:
  186. """https://redis.io/docs/latest/commands/xreadgroup/"""
  187. try:
  188. group_info = self.REDIS.xinfo_groups(queue_name)
  189. if not any(e["name"] == group_name for e in group_info):
  190. self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
  191. args = {
  192. "groupname": group_name,
  193. "consumername": consumer_name,
  194. "count": 1,
  195. "block": 5,
  196. "streams": {queue_name: msg_id},
  197. }
  198. messages = self.REDIS.xreadgroup(**args)
  199. if not messages:
  200. return None
  201. stream, element_list = messages[0]
  202. if not element_list:
  203. return None
  204. msg_id, payload = element_list[0]
  205. res = RedisMsg(self.REDIS, queue_name, group_name, msg_id, payload)
  206. return res
  207. except Exception as e:
  208. if "key" in str(e):
  209. pass
  210. else:
  211. logging.exception(
  212. "RedisDB.queue_consumer "
  213. + str(queue_name)
  214. + " got exception: "
  215. + str(e)
  216. )
  217. return None
  218. def get_unacked_iterator(self, queue_name, group_name, consumer_name):
  219. try:
  220. group_info = self.REDIS.xinfo_groups(queue_name)
  221. if not any(e["name"] == group_name for e in group_info):
  222. return
  223. current_min = 0
  224. while True:
  225. payload = self.queue_consumer(queue_name, group_name, consumer_name, current_min)
  226. if not payload:
  227. return
  228. current_min = payload.get_msg_id()
  229. logging.info(f"RedisDB.get_unacked_iterator {consumer_name} msg_id {current_min}")
  230. yield payload
  231. except Exception as e:
  232. if "key" in str(e):
  233. return
  234. logging.exception(
  235. "RedisDB.get_unacked_iterator " + consumer_name + " got exception: "
  236. )
  237. self.__open__()
  238. def queue_info(self, queue, group_name) -> dict | None:
  239. try:
  240. groups = self.REDIS.xinfo_groups(queue)
  241. for group in groups:
  242. if group["name"] == group_name:
  243. return group
  244. except Exception as e:
  245. logging.warning(
  246. "RedisDB.queue_info " + str(queue) + " got exception: " + str(e)
  247. )
  248. return None
  249. REDIS_CONN = RedisDB()
  250. class RedisDistributedLock:
  251. def __init__(self, lock_key, lock_value=None, timeout=10, blocking_timeout=1):
  252. self.lock_key = lock_key
  253. if lock_value:
  254. self.lock_value = lock_value
  255. else:
  256. self.lock_value = str(uuid.uuid4())
  257. self.timeout = timeout
  258. self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout)
  259. def acquire(self):
  260. return self.lock.acquire()
  261. def release(self):
  262. return self.lock.release()
  263. def __enter__(self):
  264. self.acquire()
  265. def __exit__(self, exception_type, exception_value, exception_traceback):
  266. self.release()