|
|
|
@@ -30,6 +30,7 @@ class RAGFlowS3: |
|
|
|
self.s3_config = settings.S3 |
|
|
|
self.access_key = self.s3_config.get('access_key', None) |
|
|
|
self.secret_key = self.s3_config.get('secret_key', None) |
|
|
|
self.session_token = self.s3_config.get('session_token', None) |
|
|
|
self.region = self.s3_config.get('region', None) |
|
|
|
self.endpoint_url = self.s3_config.get('endpoint_url', None) |
|
|
|
self.signature_version = self.s3_config.get('signature_version', None) |
|
|
|
@@ -73,31 +74,32 @@ class RAGFlowS3: |
|
|
|
s3_params = { |
|
|
|
'aws_access_key_id': self.access_key, |
|
|
|
'aws_secret_access_key': self.secret_key, |
|
|
|
'aws_session_token': self.session_token, |
|
|
|
} |
|
|
|
if self.region in self.s3_config: |
|
|
|
if self.region: |
|
|
|
s3_params['region_name'] = self.region |
|
|
|
if 'endpoint_url' in self.s3_config: |
|
|
|
if self.endpoint_url: |
|
|
|
s3_params['endpoint_url'] = self.endpoint_url |
|
|
|
if 'signature_version' in self.s3_config: |
|
|
|
config_kwargs['signature_version'] = self.signature_version |
|
|
|
if 'addressing_style' in self.s3_config: |
|
|
|
config_kwargs['addressing_style'] = self.addressing_style |
|
|
|
if self.signature_version: |
|
|
|
s3_params['signature_version'] = self.signature_version |
|
|
|
if self.addressing_style: |
|
|
|
s3_params['addressing_style'] = self.addressing_style |
|
|
|
if config_kwargs: |
|
|
|
s3_params['config'] = Config(**config_kwargs) |
|
|
|
|
|
|
|
self.conn = boto3.client('s3', **s3_params) |
|
|
|
self.conn = [boto3.client('s3', **s3_params)] |
|
|
|
except Exception: |
|
|
|
logging.exception(f"Fail to connect at region {self.region} or endpoint {self.endpoint_url}") |
|
|
|
|
|
|
|
def __close__(self): |
|
|
|
del self.conn |
|
|
|
del self.conn[0] |
|
|
|
self.conn = None |
|
|
|
|
|
|
|
@use_default_bucket |
|
|
|
def bucket_exists(self, bucket): |
|
|
|
def bucket_exists(self, bucket, *args, **kwargs): |
|
|
|
try: |
|
|
|
logging.debug(f"head_bucket bucketname {bucket}") |
|
|
|
self.conn.head_bucket(Bucket=bucket) |
|
|
|
self.conn[0].head_bucket(Bucket=bucket) |
|
|
|
exists = True |
|
|
|
except ClientError: |
|
|
|
logging.exception(f"head_bucket error {bucket}") |
|
|
|
@@ -109,10 +111,10 @@ class RAGFlowS3: |
|
|
|
fnm = "txtxtxtxt1" |
|
|
|
fnm, binary = f"{self.prefix_path}/{fnm}" if self.prefix_path else fnm, b"_t@@@1" |
|
|
|
if not self.bucket_exists(bucket): |
|
|
|
self.conn.create_bucket(Bucket=bucket) |
|
|
|
self.conn[0].create_bucket(Bucket=bucket) |
|
|
|
logging.debug(f"create bucket {bucket} ********") |
|
|
|
|
|
|
|
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) |
|
|
|
r = self.conn[0].upload_fileobj(BytesIO(binary), bucket, fnm) |
|
|
|
return r |
|
|
|
|
|
|
|
def get_properties(self, bucket, key): |
|
|
|
@@ -123,14 +125,14 @@ class RAGFlowS3: |
|
|
|
|
|
|
|
@use_prefix_path |
|
|
|
@use_default_bucket |
|
|
|
def put(self, bucket, fnm, binary, **kwargs): |
|
|
|
def put(self, bucket, fnm, binary, *args, **kwargs): |
|
|
|
logging.debug(f"bucket name {bucket}; filename :{fnm}:") |
|
|
|
for _ in range(1): |
|
|
|
try: |
|
|
|
if not self.bucket_exists(bucket): |
|
|
|
self.conn.create_bucket(Bucket=bucket) |
|
|
|
self.conn[0].create_bucket(Bucket=bucket) |
|
|
|
logging.info(f"create bucket {bucket} ********") |
|
|
|
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) |
|
|
|
r = self.conn[0].upload_fileobj(BytesIO(binary), bucket, fnm) |
|
|
|
|
|
|
|
return r |
|
|
|
except Exception: |
|
|
|
@@ -140,18 +142,18 @@ class RAGFlowS3: |
|
|
|
|
|
|
|
@use_prefix_path |
|
|
|
@use_default_bucket |
|
|
|
def rm(self, bucket, fnm, **kwargs): |
|
|
|
def rm(self, bucket, fnm, *args, **kwargs): |
|
|
|
try: |
|
|
|
self.conn.delete_object(Bucket=bucket, Key=fnm) |
|
|
|
self.conn[0].delete_object(Bucket=bucket, Key=fnm) |
|
|
|
except Exception: |
|
|
|
logging.exception(f"Fail rm {bucket}/{fnm}") |
|
|
|
|
|
|
|
@use_prefix_path |
|
|
|
@use_default_bucket |
|
|
|
def get(self, bucket, fnm, **kwargs): |
|
|
|
def get(self, bucket, fnm, *args, **kwargs): |
|
|
|
for _ in range(1): |
|
|
|
try: |
|
|
|
r = self.conn.get_object(Bucket=bucket, Key=fnm) |
|
|
|
r = self.conn[0].get_object(Bucket=bucket, Key=fnm) |
|
|
|
object_data = r['Body'].read() |
|
|
|
return object_data |
|
|
|
except Exception: |
|
|
|
@@ -162,9 +164,9 @@ class RAGFlowS3: |
|
|
|
|
|
|
|
@use_prefix_path |
|
|
|
@use_default_bucket |
|
|
|
def obj_exist(self, bucket, fnm, **kwargs): |
|
|
|
def obj_exist(self, bucket, fnm, *args, **kwargs): |
|
|
|
try: |
|
|
|
if self.conn.head_object(Bucket=bucket, Key=fnm): |
|
|
|
if self.conn[0].head_object(Bucket=bucket, Key=fnm): |
|
|
|
return True |
|
|
|
except ClientError as e: |
|
|
|
if e.response['Error']['Code'] == '404': |
|
|
|
@@ -174,10 +176,10 @@ class RAGFlowS3: |
|
|
|
|
|
|
|
@use_prefix_path |
|
|
|
@use_default_bucket |
|
|
|
def get_presigned_url(self, bucket, fnm, expires, **kwargs): |
|
|
|
def get_presigned_url(self, bucket, fnm, expires, *args, **kwargs): |
|
|
|
for _ in range(10): |
|
|
|
try: |
|
|
|
r = self.conn.generate_presigned_url('get_object', |
|
|
|
r = self.conn[0].generate_presigned_url('get_object', |
|
|
|
Params={'Bucket': bucket, |
|
|
|
'Key': fnm}, |
|
|
|
ExpiresIn=expires) |