Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

s3_conn.py 4.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import boto3
  2. import os
  3. from botocore.exceptions import ClientError
  4. from botocore.client import Config
  5. import time
  6. from io import BytesIO
  7. from rag.settings import s3_logger
  8. from rag.utils import singleton
  9. @singleton
  10. class RAGFlowS3(object):
  11. def __init__(self):
  12. self.conn = None
  13. self.endpoint = os.getenv('ENDPOINT', None)
  14. self.access_key = os.getenv('ACCESS_KEY', None)
  15. self.secret_key = os.getenv('SECRET_KEY', None)
  16. self.region = os.getenv('REGION', None)
  17. self.__open__()
  18. def __open__(self):
  19. try:
  20. if self.conn:
  21. self.__close__()
  22. except Exception as e:
  23. pass
  24. try:
  25. config = Config(
  26. s3={
  27. 'addressing_style': 'virtual'
  28. }
  29. )
  30. self.conn = boto3.client(
  31. 's3',
  32. endpoint_url=self.endpoint,
  33. region_name=self.region,
  34. aws_access_key_id=self.access_key,
  35. aws_secret_access_key=self.secret_key,
  36. config=config
  37. )
  38. except Exception as e:
  39. s3_logger.error(
  40. "Fail to connect %s " % self.endpoint + str(e))
  41. def __close__(self):
  42. del self.conn
  43. self.conn = None
  44. def bucket_exists(self, bucket):
  45. try:
  46. s3_logger.error(f"head_bucket bucketname {bucket}")
  47. self.conn.head_bucket(Bucket=bucket)
  48. exists = True
  49. except ClientError as e:
  50. s3_logger.error(f"head_bucket error {bucket}: " + str(e))
  51. exists = False
  52. return exists
  53. def health(self):
  54. bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
  55. if not self.bucket_exists(bucket):
  56. self.conn.create_bucket(Bucket=bucket)
  57. s3_logger.error(f"create bucket {bucket} ********")
  58. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  59. return r
  60. def get_properties(self, bucket, key):
  61. return {}
  62. def list(self, bucket, dir, recursive=True):
  63. return []
  64. def put(self, bucket, fnm, binary):
  65. s3_logger.error(f"bucket name {bucket}; filename :{fnm}:")
  66. for _ in range(1):
  67. try:
  68. if not self.bucket_exists(bucket):
  69. self.conn.create_bucket(Bucket=bucket)
  70. s3_logger.error(f"create bucket {bucket} ********")
  71. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  72. return r
  73. except Exception as e:
  74. s3_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
  75. self.__open__()
  76. time.sleep(1)
  77. def rm(self, bucket, fnm):
  78. try:
  79. self.conn.delete_object(Bucket=bucket, Key=fnm)
  80. except Exception as e:
  81. s3_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
  82. def get(self, bucket, fnm):
  83. for _ in range(1):
  84. try:
  85. r = self.conn.get_object(Bucket=bucket, Key=fnm)
  86. object_data = r['Body'].read()
  87. return object_data
  88. except Exception as e:
  89. s3_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
  90. self.__open__()
  91. time.sleep(1)
  92. return
  93. def obj_exist(self, bucket, fnm):
  94. try:
  95. if self.conn.head_object(Bucket=bucket, Key=fnm):
  96. return True
  97. except ClientError as e:
  98. if e.response['Error']['Code'] == '404':
  99. return False
  100. else:
  101. raise
  102. def get_presigned_url(self, bucket, fnm, expires):
  103. for _ in range(10):
  104. try:
  105. r = self.conn.generate_presigned_url('get_object',
  106. Params={'Bucket': bucket,
  107. 'Key': fnm},
  108. ExpiresIn=expires)
  109. return r
  110. except Exception as e:
  111. s3_logger.error(f"fail get url {bucket}/{fnm}: " + str(e))
  112. self.__open__()
  113. time.sleep(1)
  114. return