You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

s3_conn.py 4.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import logging
  2. import boto3
  3. import os
  4. from botocore.exceptions import ClientError
  5. from botocore.client import Config
  6. import time
  7. from io import BytesIO
  8. from rag.utils import singleton
  9. from rag import settings
  10. @singleton
  11. class RAGFlowS3(object):
  12. def __init__(self):
  13. self.conn = None
  14. self.s3_config = settings.S3
  15. self.endpoint = self.s3_config.get('endpoint', None)
  16. self.access_key = self.s3_config.get('access_key', None)
  17. self.secret_key = self.s3_config.get('secret_key', None)
  18. self.region = self.s3_config.get('region', None)
  19. self.__open__()
  20. def __open__(self):
  21. try:
  22. if self.conn:
  23. self.__close__()
  24. except Exception:
  25. pass
  26. try:
  27. config = Config(
  28. s3={
  29. 'addressing_style': 'virtual'
  30. }
  31. )
  32. self.conn = boto3.client(
  33. 's3',
  34. endpoint_url=self.endpoint,
  35. region_name=self.region,
  36. aws_access_key_id=self.access_key,
  37. aws_secret_access_key=self.secret_key,
  38. config=config
  39. )
  40. except Exception:
  41. logging.exception(
  42. "Fail to connect %s" % self.endpoint)
  43. def __close__(self):
  44. del self.conn
  45. self.conn = None
  46. def bucket_exists(self, bucket):
  47. try:
  48. logging.debug(f"head_bucket bucketname {bucket}")
  49. self.conn.head_bucket(Bucket=bucket)
  50. exists = True
  51. except ClientError:
  52. logging.exception(f"head_bucket error {bucket}")
  53. exists = False
  54. return exists
  55. def health(self):
  56. bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
  57. if not self.bucket_exists(bucket):
  58. self.conn.create_bucket(Bucket=bucket)
  59. logging.debug(f"create bucket {bucket} ********")
  60. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  61. return r
  62. def get_properties(self, bucket, key):
  63. return {}
  64. def list(self, bucket, dir, recursive=True):
  65. return []
  66. def put(self, bucket, fnm, binary):
  67. logging.debug(f"bucket name {bucket}; filename :{fnm}:")
  68. for _ in range(1):
  69. try:
  70. if not self.bucket_exists(bucket):
  71. self.conn.create_bucket(Bucket=bucket)
  72. logging.info(f"create bucket {bucket} ********")
  73. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  74. return r
  75. except Exception:
  76. logging.exception(f"Fail put {bucket}/{fnm}")
  77. self.__open__()
  78. time.sleep(1)
  79. def rm(self, bucket, fnm):
  80. try:
  81. self.conn.delete_object(Bucket=bucket, Key=fnm)
  82. except Exception:
  83. logging.exception(f"Fail rm {bucket}/{fnm}")
  84. def get(self, bucket, fnm):
  85. for _ in range(1):
  86. try:
  87. r = self.conn.get_object(Bucket=bucket, Key=fnm)
  88. object_data = r['Body'].read()
  89. return object_data
  90. except Exception:
  91. logging.exception(f"fail get {bucket}/{fnm}")
  92. self.__open__()
  93. time.sleep(1)
  94. return
  95. def obj_exist(self, bucket, fnm):
  96. try:
  97. if self.conn.head_object(Bucket=bucket, Key=fnm):
  98. return True
  99. except ClientError as e:
  100. if e.response['Error']['Code'] == '404':
  101. return False
  102. else:
  103. raise
  104. def get_presigned_url(self, bucket, fnm, expires):
  105. for _ in range(10):
  106. try:
  107. r = self.conn.generate_presigned_url('get_object',
  108. Params={'Bucket': bucket,
  109. 'Key': fnm},
  110. ExpiresIn=expires)
  111. return r
  112. except Exception:
  113. logging.exception(f"fail get url {bucket}/{fnm}")
  114. self.__open__()
  115. time.sleep(1)
  116. return