https://github.com/infiniflow/ragflow/issues/2496 ### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [ ] Bug Fix (non-breaking change which fixes an issue) - [ ] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [x] Refactoring - [ ] Performance Improvement - [ ] Other (please describe):tags/v0.12.0
| e, doc = DocumentService.get_by_id(doc["id"]) | e, doc = DocumentService.get_by_id(doc["id"]) | ||||
| doc = doc.to_dict() | doc = doc.to_dict() | ||||
| doc["tenant_id"] = tenant_id | doc["tenant_id"] = tenant_id | ||||
| bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"]) | |||||
| bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) | |||||
| queue_tasks(doc, bucket, name) | queue_tasks(doc, bucket, name) | ||||
| except Exception as e: | except Exception as e: | ||||
| return server_error_response(e) | return server_error_response(e) | ||||
| if not tenant_id: | if not tenant_id: | ||||
| return get_data_error_result(retmsg="Tenant not found!") | return get_data_error_result(retmsg="Tenant not found!") | ||||
| b, n = File2DocumentService.get_minio_address(doc_id=doc_id) | |||||
| b, n = File2DocumentService.get_storage_address(doc_id=doc_id) | |||||
| if not DocumentService.remove_document(doc, tenant_id): | if not DocumentService.remove_document(doc, tenant_id): | ||||
| return get_data_error_result( | return get_data_error_result( |
| f" reason!", code=RetCode.AUTHENTICATION_ERROR) | f" reason!", code=RetCode.AUTHENTICATION_ERROR) | ||||
| # get the doc's id and location | # get the doc's id and location | ||||
| real_dataset_id, location = File2DocumentService.get_minio_address(doc_id=document_id) | |||||
| real_dataset_id, location = File2DocumentService.get_storage_address(doc_id=document_id) | |||||
| if real_dataset_id != dataset_id: | if real_dataset_id != dataset_id: | ||||
| return construct_json_result(message=f"The document {document_id} is not in the dataset: {dataset_id}, " | return construct_json_result(message=f"The document {document_id} is not in the dataset: {dataset_id}, " | ||||
| code=RetCode.ARGUMENT_ERROR) | code=RetCode.ARGUMENT_ERROR) | ||||
| # The process of downloading | # The process of downloading | ||||
| doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address | |||||
| doc_id, doc_location = File2DocumentService.get_storage_address(doc_id=document_id) # minio address | |||||
| file_stream = STORAGE_IMPL.get(doc_id, doc_location) | file_stream = STORAGE_IMPL.get(doc_id, doc_location) | ||||
| if not file_stream: | if not file_stream: | ||||
| return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) | return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) | ||||
| doc_attributes = doc_attributes.to_dict() | doc_attributes = doc_attributes.to_dict() | ||||
| doc_id = doc_attributes["id"] | doc_id = doc_attributes["id"] | ||||
| bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id) | |||||
| bucket, doc_name = File2DocumentService.get_storage_address(doc_id=doc_id) | |||||
| binary = STORAGE_IMPL.get(bucket, doc_name) | binary = STORAGE_IMPL.get(bucket, doc_name) | ||||
| parser_name = doc_attributes["parser_id"] | parser_name = doc_attributes["parser_id"] | ||||
| if binary: | if binary: |
| if not tenant_id: | if not tenant_id: | ||||
| return get_data_error_result(retmsg="Tenant not found!") | return get_data_error_result(retmsg="Tenant not found!") | ||||
| b, n = File2DocumentService.get_minio_address(doc_id=doc_id) | |||||
| b, n = File2DocumentService.get_storage_address(doc_id=doc_id) | |||||
| if not DocumentService.remove_document(doc, tenant_id): | if not DocumentService.remove_document(doc, tenant_id): | ||||
| return get_data_error_result( | return get_data_error_result( | ||||
| e, doc = DocumentService.get_by_id(id) | e, doc = DocumentService.get_by_id(id) | ||||
| doc = doc.to_dict() | doc = doc.to_dict() | ||||
| doc["tenant_id"] = tenant_id | doc["tenant_id"] = tenant_id | ||||
| bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"]) | |||||
| bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) | |||||
| queue_tasks(doc, bucket, name) | queue_tasks(doc, bucket, name) | ||||
| return get_json_result(data=True) | return get_json_result(data=True) | ||||
| if not e: | if not e: | ||||
| return get_data_error_result(retmsg="Document not found!") | return get_data_error_result(retmsg="Document not found!") | ||||
| b, n = File2DocumentService.get_minio_address(doc_id=doc_id) | |||||
| b, n = File2DocumentService.get_storage_address(doc_id=doc_id) | |||||
| response = flask.make_response(STORAGE_IMPL.get(b, n)) | response = flask.make_response(STORAGE_IMPL.get(b, n)) | ||||
| ext = re.search(r"\.([^.]+)$", doc.name) | ext = re.search(r"\.([^.]+)$", doc.name) |
| e, file = FileService.get_by_id(file_id) | e, file = FileService.get_by_id(file_id) | ||||
| if not e: | if not e: | ||||
| return get_data_error_result(retmsg="Document not found!") | return get_data_error_result(retmsg="Document not found!") | ||||
| b, n = File2DocumentService.get_minio_address(file_id=file_id) | |||||
| b, n = File2DocumentService.get_storage_address(file_id=file_id) | |||||
| response = flask.make_response(STORAGE_IMPL.get(b, n)) | response = flask.make_response(STORAGE_IMPL.get(b, n)) | ||||
| ext = re.search(r"\.([^.]+)$", file.name) | ext = re.search(r"\.([^.]+)$", file.name) | ||||
| if ext: | if ext: |
| code=RetCode.ARGUMENT_ERROR) | code=RetCode.ARGUMENT_ERROR) | ||||
| # The process of downloading | # The process of downloading | ||||
| doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address | |||||
| doc_id, doc_location = File2DocumentService.get_storage_address(doc_id=document_id) # minio address | |||||
| file_stream = STORAGE_IMPL.get(doc_id, doc_location) | file_stream = STORAGE_IMPL.get(doc_id, doc_location) | ||||
| if not file_stream: | if not file_stream: | ||||
| return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) | return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) | ||||
| if not tenant_id: | if not tenant_id: | ||||
| return get_data_error_result(retmsg="Tenant not found!") | return get_data_error_result(retmsg="Tenant not found!") | ||||
| b, n = File2DocumentService.get_minio_address(doc_id=doc_id) | |||||
| b, n = File2DocumentService.get_storage_address(doc_id=doc_id) | |||||
| if not DocumentService.remove_document(doc, tenant_id): | if not DocumentService.remove_document(doc, tenant_id): | ||||
| return get_data_error_result( | return get_data_error_result( | ||||
| e, doc = DocumentService.get_by_id(id) | e, doc = DocumentService.get_by_id(id) | ||||
| doc = doc.to_dict() | doc = doc.to_dict() | ||||
| doc["tenant_id"] = tenant_id | doc["tenant_id"] = tenant_id | ||||
| bucket, name = File2DocumentService.get_minio_address(doc_id=doc["id"]) | |||||
| bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) | |||||
| queue_tasks(doc, bucket, name) | queue_tasks(doc, bucket, name) | ||||
| return get_json_result(data=True) | return get_json_result(data=True) |
| @classmethod | @classmethod | ||||
| @DB.connection_context() | @DB.connection_context() | ||||
| def get_minio_address(cls, doc_id=None, file_id=None): | |||||
| def get_storage_address(cls, doc_id=None, file_id=None): | |||||
| if doc_id: | if doc_id: | ||||
| f2d = cls.get_by_document_id(doc_id) | f2d = cls.get_by_document_id(doc_id) | ||||
| else: | else: |
| return tasks | return tasks | ||||
| def get_minio_binary(bucket, name): | |||||
| def get_storage_binary(bucket, name): | |||||
| return STORAGE_IMPL.get(bucket, name) | return STORAGE_IMPL.get(bucket, name) | ||||
| chunker = FACTORY[row["parser_id"].lower()] | chunker = FACTORY[row["parser_id"].lower()] | ||||
| try: | try: | ||||
| st = timer() | st = timer() | ||||
| bucket, name = File2DocumentService.get_minio_address(doc_id=row["doc_id"]) | |||||
| binary = get_minio_binary(bucket, name) | |||||
| bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"]) | |||||
| binary = get_storage_binary(bucket, name) | |||||
| cron_logger.info( | cron_logger.info( | ||||
| "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"])) | "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"])) | ||||
| except TimeoutError as e: | except TimeoutError as e: |