Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

s3_conn.py 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import logging
  2. import boto3
  3. from botocore.exceptions import ClientError
  4. import time
  5. from io import BytesIO
  6. from rag.utils import singleton
  7. from rag import settings
  8. @singleton
  9. class RAGFlowS3(object):
  10. def __init__(self):
  11. self.conn = None
  12. self.s3_config = settings.S3
  13. self.access_key = self.s3_config.get('access_key', None)
  14. self.secret_key = self.s3_config.get('secret_key', None)
  15. self.region = self.s3_config.get('region', None)
  16. self.__open__()
  17. def __open__(self):
  18. try:
  19. if self.conn:
  20. self.__close__()
  21. except Exception:
  22. pass
  23. try:
  24. self.conn = boto3.client(
  25. 's3',
  26. region_name=self.region,
  27. aws_access_key_id=self.access_key,
  28. aws_secret_access_key=self.secret_key
  29. )
  30. except Exception:
  31. logging.exception(f"Fail to connect at region {self.region}")
  32. def __close__(self):
  33. del self.conn
  34. self.conn = None
  35. def bucket_exists(self, bucket):
  36. try:
  37. logging.debug(f"head_bucket bucketname {bucket}")
  38. self.conn.head_bucket(Bucket=bucket)
  39. exists = True
  40. except ClientError:
  41. logging.exception(f"head_bucket error {bucket}")
  42. exists = False
  43. return exists
  44. def health(self):
  45. bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
  46. if not self.bucket_exists(bucket):
  47. self.conn.create_bucket(Bucket=bucket)
  48. logging.debug(f"create bucket {bucket} ********")
  49. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  50. return r
  51. def get_properties(self, bucket, key):
  52. return {}
  53. def list(self, bucket, dir, recursive=True):
  54. return []
  55. def put(self, bucket, fnm, binary):
  56. logging.debug(f"bucket name {bucket}; filename :{fnm}:")
  57. for _ in range(1):
  58. try:
  59. if not self.bucket_exists(bucket):
  60. self.conn.create_bucket(Bucket=bucket)
  61. logging.info(f"create bucket {bucket} ********")
  62. r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
  63. return r
  64. except Exception:
  65. logging.exception(f"Fail put {bucket}/{fnm}")
  66. self.__open__()
  67. time.sleep(1)
  68. def rm(self, bucket, fnm):
  69. try:
  70. self.conn.delete_object(Bucket=bucket, Key=fnm)
  71. except Exception:
  72. logging.exception(f"Fail rm {bucket}/{fnm}")
  73. def get(self, bucket, fnm):
  74. for _ in range(1):
  75. try:
  76. r = self.conn.get_object(Bucket=bucket, Key=fnm)
  77. object_data = r['Body'].read()
  78. return object_data
  79. except Exception:
  80. logging.exception(f"fail get {bucket}/{fnm}")
  81. self.__open__()
  82. time.sleep(1)
  83. return
  84. def obj_exist(self, bucket, fnm):
  85. try:
  86. if self.conn.head_object(Bucket=bucket, Key=fnm):
  87. return True
  88. except ClientError as e:
  89. if e.response['Error']['Code'] == '404':
  90. return False
  91. else:
  92. raise
  93. def get_presigned_url(self, bucket, fnm, expires):
  94. for _ in range(10):
  95. try:
  96. r = self.conn.generate_presigned_url('get_object',
  97. Params={'Bucket': bucket,
  98. 'Key': fnm},
  99. ExpiresIn=expires)
  100. return r
  101. except Exception:
  102. logging.exception(f"fail get url {bucket}/{fnm}")
  103. self.__open__()
  104. time.sleep(1)
  105. return