Du kannst nicht mehr als 25 Themen auswählen Themen müssen mit entweder einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

azure_spn_conn.py 3.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #
  2. # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import logging
  17. import os
  18. import time
  19. from rag import settings
  20. from rag.utils import singleton
  21. from azure.identity import ClientSecretCredential, AzureAuthorityHosts
  22. from azure.storage.filedatalake import FileSystemClient
  23. @singleton
  24. class RAGFlowAzureSpnBlob:
  25. def __init__(self):
  26. self.conn = None
  27. self.account_url = os.getenv('ACCOUNT_URL', settings.AZURE["account_url"])
  28. self.client_id = os.getenv('CLIENT_ID', settings.AZURE["client_id"])
  29. self.secret = os.getenv('SECRET', settings.AZURE["secret"])
  30. self.tenant_id = os.getenv('TENANT_ID', settings.AZURE["tenant_id"])
  31. self.container_name = os.getenv('CONTAINER_NAME', settings.AZURE["container_name"])
  32. self.__open__()
  33. def __open__(self):
  34. try:
  35. if self.conn:
  36. self.__close__()
  37. except Exception:
  38. pass
  39. try:
  40. credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.secret, authority=AzureAuthorityHosts.AZURE_CHINA)
  41. self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name, credential=credentials)
  42. except Exception:
  43. logging.exception("Fail to connect %s" % self.account_url)
  44. def __close__(self):
  45. del self.conn
  46. self.conn = None
  47. def health(self):
  48. _bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
  49. f = self.conn.create_file(fnm)
  50. f.append_data(binary, offset=0, length=len(binary))
  51. return f.flush_data(len(binary))
  52. def put(self, bucket, fnm, binary):
  53. for _ in range(3):
  54. try:
  55. f = self.conn.create_file(fnm)
  56. f.append_data(binary, offset=0, length=len(binary))
  57. return f.flush_data(len(binary))
  58. except Exception:
  59. logging.exception(f"Fail put {bucket}/{fnm}")
  60. self.__open__()
  61. time.sleep(1)
  62. def rm(self, bucket, fnm):
  63. try:
  64. self.conn.delete_file(fnm)
  65. except Exception:
  66. logging.exception(f"Fail rm {bucket}/{fnm}")
  67. def get(self, bucket, fnm):
  68. for _ in range(1):
  69. try:
  70. client = self.conn.get_file_client(fnm)
  71. r = client.download_file()
  72. return r.read()
  73. except Exception:
  74. logging.exception(f"fail get {bucket}/{fnm}")
  75. self.__open__()
  76. time.sleep(1)
  77. return
  78. def obj_exist(self, bucket, fnm):
  79. try:
  80. client = self.conn.get_file_client(fnm)
  81. return client.exists()
  82. except Exception:
  83. logging.exception(f"Fail put {bucket}/{fnm}")
  84. return False
  85. def get_presigned_url(self, bucket, fnm, expires):
  86. for _ in range(10):
  87. try:
  88. return self.conn.get_presigned_url("GET", bucket, fnm, expires)
  89. except Exception:
  90. logging.exception(f"fail get {bucket}/{fnm}")
  91. self.__open__()
  92. time.sleep(1)
  93. return