|  | import logging | 
					
						
						|  | import boto3 | 
					
						
						|  | from botocore.exceptions import ClientError | 
					
						
						|  | from botocore.client import Config | 
					
						
						|  | import time | 
					
						
						|  | from io import BytesIO | 
					
						
						|  | from rag.utils import singleton | 
					
						
						|  | from rag import settings | 
					
						
						|  |  | 
					
						
						|  | @singleton | 
					
						
						|  | class RAGFlowS3(object): | 
					
						
						|  | def __init__(self): | 
					
						
						|  | self.conn = None | 
					
						
						|  | self.s3_config = settings.S3 | 
					
						
						|  | self.endpoint = self.s3_config.get('endpoint', None) | 
					
						
						|  | self.access_key = self.s3_config.get('access_key', None) | 
					
						
						|  | self.secret_key = self.s3_config.get('secret_key', None) | 
					
						
						|  | self.region = self.s3_config.get('region', None) | 
					
						
						|  | self.__open__() | 
					
						
						|  |  | 
					
						
						|  | def __open__(self): | 
					
						
						|  | try: | 
					
						
						|  | if self.conn: | 
					
						
						|  | self.__close__() | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | config = Config( | 
					
						
						|  | s3={ | 
					
						
						|  | 'addressing_style': 'virtual' | 
					
						
						|  | } | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | self.conn = boto3.client( | 
					
						
						|  | 's3', | 
					
						
						|  | endpoint_url=self.endpoint, | 
					
						
						|  | region_name=self.region, | 
					
						
						|  | aws_access_key_id=self.access_key, | 
					
						
						|  | aws_secret_access_key=self.secret_key, | 
					
						
						|  | config=config | 
					
						
						|  | ) | 
					
						
						|  | except Exception: | 
					
						
						|  | logging.exception( | 
					
						
						|  | "Fail to connect %s" % self.endpoint) | 
					
						
						|  |  | 
					
						
						|  | def __close__(self): | 
					
						
						|  | del self.conn | 
					
						
						|  | self.conn = None | 
					
						
						|  |  | 
					
						
						|  | def bucket_exists(self, bucket): | 
					
						
						|  | try: | 
					
						
						|  | logging.debug(f"head_bucket bucketname {bucket}") | 
					
						
						|  | self.conn.head_bucket(Bucket=bucket) | 
					
						
						|  | exists = True | 
					
						
						|  | except ClientError: | 
					
						
						|  | logging.exception(f"head_bucket error {bucket}") | 
					
						
						|  | exists = False | 
					
						
						|  | return exists | 
					
						
						|  |  | 
					
						
						|  | def health(self): | 
					
						
						|  | bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" | 
					
						
						|  |  | 
					
						
						|  | if not self.bucket_exists(bucket): | 
					
						
						|  | self.conn.create_bucket(Bucket=bucket) | 
					
						
						|  | logging.debug(f"create bucket {bucket} ********") | 
					
						
						|  |  | 
					
						
						|  | r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) | 
					
						
						|  | return r | 
					
						
						|  |  | 
					
						
						|  | def get_properties(self, bucket, key): | 
					
						
						|  | return {} | 
					
						
						|  |  | 
					
						
						|  | def list(self, bucket, dir, recursive=True): | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  | def put(self, bucket, fnm, binary): | 
					
						
						|  | logging.debug(f"bucket name {bucket}; filename :{fnm}:") | 
					
						
						|  | for _ in range(1): | 
					
						
						|  | try: | 
					
						
						|  | if not self.bucket_exists(bucket): | 
					
						
						|  | self.conn.create_bucket(Bucket=bucket) | 
					
						
						|  | logging.info(f"create bucket {bucket} ********") | 
					
						
						|  | r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm) | 
					
						
						|  |  | 
					
						
						|  | return r | 
					
						
						|  | except Exception: | 
					
						
						|  | logging.exception(f"Fail put {bucket}/{fnm}") | 
					
						
						|  | self.__open__() | 
					
						
						|  | time.sleep(1) | 
					
						
						|  |  | 
					
						
						|  | def rm(self, bucket, fnm): | 
					
						
						|  | try: | 
					
						
						|  | self.conn.delete_object(Bucket=bucket, Key=fnm) | 
					
						
						|  | except Exception: | 
					
						
						|  | logging.exception(f"Fail rm {bucket}/{fnm}") | 
					
						
						|  |  | 
					
						
						|  | def get(self, bucket, fnm): | 
					
						
						|  | for _ in range(1): | 
					
						
						|  | try: | 
					
						
						|  | r = self.conn.get_object(Bucket=bucket, Key=fnm) | 
					
						
						|  | object_data = r['Body'].read() | 
					
						
						|  | return object_data | 
					
						
						|  | except Exception: | 
					
						
						|  | logging.exception(f"fail get {bucket}/{fnm}") | 
					
						
						|  | self.__open__() | 
					
						
						|  | time.sleep(1) | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | def obj_exist(self, bucket, fnm): | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | if self.conn.head_object(Bucket=bucket, Key=fnm): | 
					
						
						|  | return True | 
					
						
						|  | except ClientError as e: | 
					
						
						|  | if e.response['Error']['Code'] == '404': | 
					
						
						|  |  | 
					
						
						|  | return False | 
					
						
						|  | else: | 
					
						
						|  | raise | 
					
						
						|  |  | 
					
						
						|  | def get_presigned_url(self, bucket, fnm, expires): | 
					
						
						|  | for _ in range(10): | 
					
						
						|  | try: | 
					
						
						|  | r = self.conn.generate_presigned_url('get_object', | 
					
						
						|  | Params={'Bucket': bucket, | 
					
						
						|  | 'Key': fnm}, | 
					
						
						|  | ExpiresIn=expires) | 
					
						
						|  |  | 
					
						
						|  | return r | 
					
						
						|  | except Exception: | 
					
						
						|  | logging.exception(f"fail get url {bucket}/{fnm}") | 
					
						
						|  | self.__open__() | 
					
						
						|  | time.sleep(1) | 
					
						
						|  | return | 
					
						
						|  |  |