### What problem does this PR solve? _Briefly describe what this PR aims to solve. Include background context that will help reviewers understand the purpose of the PR._ ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>tags/v0.11.0
| @@ -0,0 +1,180 @@ | |||
| from io import BytesIO | |||
| from flask import request,send_file | |||
| from api.utils.api_utils import get_json_result, construct_json_result, server_error_response | |||
| from api.utils.api_utils import get_json_result, token_required, get_data_error_result | |||
| from api.db import FileType, ParserType, FileSource, TaskStatus | |||
| from api.db.db_models import File | |||
| from api.db.services.document_service import DocumentService | |||
| from api.db.services.file2document_service import File2DocumentService | |||
| from api.db.services.file_service import FileService | |||
| from api.db.services.knowledgebase_service import KnowledgebaseService | |||
| from api.db.services.user_service import TenantService, UserTenantService | |||
| from api.settings import RetCode | |||
| from api.utils.api_utils import construct_json_result, construct_error_response | |||
| from rag.utils.storage_factory import STORAGE_IMPL | |||
| @manager.route('/dataset/<dataset_id>/documents/upload', methods=['POST']) | |||
| @token_required | |||
| def upload(dataset_id, tenant_id): | |||
| if 'file' not in request.files: | |||
| return get_json_result( | |||
| data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR) | |||
| file_objs = request.files.getlist('file') | |||
| for file_obj in file_objs: | |||
| if file_obj.filename == '': | |||
| return get_json_result( | |||
| data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR) | |||
| e, kb = KnowledgebaseService.get_by_id(dataset_id) | |||
| if not e: | |||
| raise LookupError(f"Can't find the knowledgebase with ID {dataset_id}!") | |||
| err, _ = FileService.upload_document(kb, file_objs, tenant_id) | |||
| if err: | |||
| return get_json_result( | |||
| data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR) | |||
| return get_json_result(data=True) | |||
| @manager.route('/infos', methods=['GET']) | |||
| @token_required | |||
| def docinfos(tenant_id): | |||
| req = request.args | |||
| if "id" in req: | |||
| doc_id = req["id"] | |||
| e, doc = DocumentService.get_by_id(doc_id) | |||
| return get_json_result(data=doc.to_json()) | |||
| if "name" in req: | |||
| doc_name = req["name"] | |||
| doc_id = DocumentService.get_doc_id_by_doc_name(doc_name) | |||
| e, doc = DocumentService.get_by_id(doc_id) | |||
| return get_json_result(data=doc.to_json()) | |||
| @manager.route('/save', methods=['POST']) | |||
| @token_required | |||
| def save_doc(tenant_id): | |||
| req = request.json # Expecting JSON input | |||
| if "id" in req: | |||
| doc_id = req["id"] | |||
| if "name" in req: | |||
| doc_name = req["name"] | |||
| doc_id = DocumentService.get_doc_id_by_doc_name(doc_name) | |||
| data = request.json | |||
| # Call the update method with the provided id and data | |||
| try: | |||
| num = DocumentService.update_by_id(doc_id, data) | |||
| if num > 0: | |||
| return get_json_result(retmsg="success", data={"updated_count": num}) | |||
| else: | |||
| return get_json_result(retcode=404, retmsg="Document not found") | |||
| except Exception as e: | |||
| return get_json_result(retmsg=f"Error occurred: {str(e)}") | |||
| @manager.route("/<dataset_id>/documents/<document_id>", methods=["GET"]) | |||
| @token_required | |||
| def download_document(dataset_id, document_id): | |||
| try: | |||
| # Check whether there is this dataset | |||
| exist, _ = KnowledgebaseService.get_by_id(dataset_id) | |||
| if not exist: | |||
| return construct_json_result(code=RetCode.DATA_ERROR, | |||
| message=f"This dataset '{dataset_id}' cannot be found!") | |||
| # Check whether there is this document | |||
| exist, document = DocumentService.get_by_id(document_id) | |||
| if not exist: | |||
| return construct_json_result(message=f"This document '{document_id}' cannot be found!", | |||
| code=RetCode.ARGUMENT_ERROR) | |||
| # The process of downloading | |||
| doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address | |||
| file_stream = STORAGE_IMPL.get(doc_id, doc_location) | |||
| if not file_stream: | |||
| return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) | |||
| file = BytesIO(file_stream) | |||
| # Use send_file with a proper filename and MIME type | |||
| return send_file( | |||
| file, | |||
| as_attachment=True, | |||
| download_name=document.name, | |||
| mimetype='application/octet-stream' # Set a default MIME type | |||
| ) | |||
| # Error | |||
| except Exception as e: | |||
| return construct_error_response(e) | |||
| @manager.route('/dataset/<dataset_id>/documents', methods=['GET']) | |||
| @token_required | |||
| def list_docs(dataset_id,tenant_id): | |||
| kb_id = request.args.get("kb_id") | |||
| if not kb_id: | |||
| return get_json_result( | |||
| data=False, retmsg='Lack of "KB ID"', retcode=RetCode.ARGUMENT_ERROR) | |||
| tenants = UserTenantService.query(user_id=tenant_id) | |||
| for tenant in tenants: | |||
| if KnowledgebaseService.query( | |||
| tenant_id=tenant.tenant_id, id=kb_id): | |||
| break | |||
| else: | |||
| return get_json_result( | |||
| data=False, retmsg=f'Only owner of knowledgebase authorized for this operation.', | |||
| retcode=RetCode.OPERATING_ERROR) | |||
| keywords = request.args.get("keywords", "") | |||
| page_number = int(request.args.get("page", 1)) | |||
| items_per_page = int(request.args.get("page_size", 15)) | |||
| orderby = request.args.get("orderby", "create_time") | |||
| desc = request.args.get("desc", True) | |||
| try: | |||
| docs, tol = DocumentService.get_by_kb_id( | |||
| kb_id, page_number, items_per_page, orderby, desc, keywords) | |||
| return get_json_result(data={"total": tol, "docs": docs}) | |||
| except Exception as e: | |||
| return server_error_response(e) | |||
| @manager.route('/delete', methods=['DELETE']) | |||
| @token_required | |||
| def rm(tenant_id): | |||
| req = request.args | |||
| if "doc_id" not in req: | |||
| return get_data_error_result( | |||
| retmsg="doc_id is required") | |||
| doc_ids = req["doc_id"] | |||
| if isinstance(doc_ids, str): doc_ids = [doc_ids] | |||
| root_folder = FileService.get_root_folder(tenant_id) | |||
| pf_id = root_folder["id"] | |||
| FileService.init_knowledgebase_docs(pf_id, tenant_id) | |||
| errors = "" | |||
| for doc_id in doc_ids: | |||
| try: | |||
| e, doc = DocumentService.get_by_id(doc_id) | |||
| if not e: | |||
| return get_data_error_result(retmsg="Document not found!") | |||
| tenant_id = DocumentService.get_tenant_id(doc_id) | |||
| if not tenant_id: | |||
| return get_data_error_result(retmsg="Tenant not found!") | |||
| b, n = File2DocumentService.get_minio_address(doc_id=doc_id) | |||
| if not DocumentService.remove_document(doc, tenant_id): | |||
| return get_data_error_result( | |||
| retmsg="Database error (Document removal)!") | |||
| f2d = File2DocumentService.get_by_document_id(doc_id) | |||
| FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) | |||
| File2DocumentService.delete_by_document_id(doc_id) | |||
| STORAGE_IMPL.rm(b, n) | |||
| except Exception as e: | |||
| errors += str(e) | |||
| if errors: | |||
| return get_json_result(data=False, retmsg=errors, retcode=RetCode.SERVER_ERROR) | |||
| return get_json_result(data=True,retmsg="success") | |||
| @@ -5,4 +5,5 @@ __version__ = importlib.metadata.version("ragflow") | |||
| from .ragflow import RAGFlow | |||
| from .modules.dataset import DataSet | |||
| from .modules.assistant import Assistant | |||
| from .modules.session import Session | |||
| from .modules.session import Session | |||
| from .modules.document import Document | |||
| @@ -1,3 +1,7 @@ | |||
| from typing import Optional, List | |||
| from .document import Document | |||
| from .base import Base | |||
| @@ -46,3 +50,39 @@ class DataSet(Base): | |||
| res = res.json() | |||
| if res.get("retmsg") == "success": return True | |||
| raise Exception(res["retmsg"]) | |||
| def list_docs(self, keywords: Optional[str] = None, offset: int = 0, limit: int = -1) -> List[Document]: | |||
| """ | |||
| List the documents in the dataset, optionally filtering by keywords, with pagination support. | |||
| Args: | |||
| keywords (Optional[str]): A string of keywords to filter the documents. Defaults to None. | |||
| offset (int): The starting point for pagination. Defaults to 0. | |||
| limit (int): The maximum number of documents to return. Defaults to -1 (no limit). | |||
| Returns: | |||
| List[Document]: A list of Document objects. | |||
| """ | |||
| # Construct the request payload for listing documents | |||
| payload = { | |||
| "kb_id": self.id, | |||
| "keywords": keywords, | |||
| "offset": offset, | |||
| "limit": limit | |||
| } | |||
| # Send the request to the server to list documents | |||
| res = self.get(f'/doc/dataset/{self.id}/documents', payload) | |||
| res_json = res.json() | |||
| # Handle response and error checking | |||
| if res_json.get("retmsg") != "success": | |||
| raise Exception(res_json.get("retmsg")) | |||
| # Parse the document data from the response | |||
| documents = [] | |||
| for doc_data in res_json["data"].get("docs", []): | |||
| doc = Document(self.rag, doc_data) | |||
| documents.append(doc) | |||
| return documents | |||
| @@ -0,0 +1,75 @@ | |||
| from .base import Base | |||
| class Document(Base): | |||
| def __init__(self, rag, res_dict): | |||
| self.id = "" | |||
| self.name = "" | |||
| self.thumbnail = None | |||
| self.kb_id = None | |||
| self.parser_method = "" | |||
| self.parser_config = {"pages": [[1, 1000000]]} | |||
| self.source_type = "local" | |||
| self.type = "" | |||
| self.created_by = "" | |||
| self.size = 0 | |||
| self.token_num = 0 | |||
| self.chunk_num = 0 | |||
| self.progress = 0.0 | |||
| self.progress_msg = "" | |||
| self.process_begin_at = None | |||
| self.process_duration = 0.0 | |||
| for k in list(res_dict.keys()): | |||
| if k not in self.__dict__: | |||
| res_dict.pop(k) | |||
| super().__init__(rag, res_dict) | |||
| def save(self) -> bool: | |||
| """ | |||
| Save the document details to the server. | |||
| """ | |||
| res = self.post('/doc/save', | |||
| {"id": self.id, "name": self.name, "thumbnail": self.thumbnail, "kb_id": self.kb_id, | |||
| "parser_id": self.parser_method, "parser_config": self.parser_config.to_json(), | |||
| "source_type": self.source_type, "type": self.type, "created_by": self.created_by, | |||
| "size": self.size, "token_num": self.token_num, "chunk_num": self.chunk_num, | |||
| "progress": self.progress, "progress_msg": self.progress_msg, | |||
| "process_begin_at": self.process_begin_at, "process_duation": self.process_duration | |||
| }) | |||
| res = res.json() | |||
| if res.get("retmsg") == "success": | |||
| return True | |||
| raise Exception(res["retmsg"]) | |||
| def delete(self) -> bool: | |||
| """ | |||
| Delete the document from the server. | |||
| """ | |||
| res = self.rm('/doc/delete', | |||
| {"doc_id": self.id}) | |||
| res = res.json() | |||
| if res.get("retmsg") == "success": | |||
| return True | |||
| raise Exception(res["retmsg"]) | |||
| def download(self) -> bytes: | |||
| """ | |||
| Download the document content from the server using the Flask API. | |||
| :return: The downloaded document content in bytes. | |||
| """ | |||
| # Construct the URL for the API request using the document ID and knowledge base ID | |||
| res = self.get(f"/doc/{self.kb_id}/documents/{self.id}", | |||
| {"headers": self.rag.authorization_header, "id": self.id, "name": self.name, "stream": True}) | |||
| # Check the response status code to ensure the request was successful | |||
| if res.status_code == 200: | |||
| # Return the document content as bytes | |||
| return res.content | |||
| else: | |||
| # Handle the error and raise an exception | |||
| raise Exception( | |||
| f"Failed to download document. Server responded with: {res.status_code}, {res.text}" | |||
| ) | |||
| @@ -19,7 +19,7 @@ import requests | |||
| from .modules.assistant import Assistant | |||
| from .modules.dataset import DataSet | |||
| from .modules.document import Document | |||
| class RAGFlow: | |||
| def __init__(self, user_key, base_url, version='v1'): | |||
| @@ -142,3 +142,32 @@ class RAGFlow: | |||
| result_list.append(Assistant(self, data)) | |||
| return result_list | |||
| raise Exception(res["retmsg"]) | |||
| def create_document(self, ds:DataSet, name: str, blob: bytes) -> bool: | |||
| url = f"/doc/dataset/{ds.id}/documents/upload" | |||
| files = { | |||
| 'file': (name, blob) | |||
| } | |||
| data = { | |||
| 'kb_id': ds.id | |||
| } | |||
| headers = { | |||
| 'Authorization': f"Bearer {ds.rag.user_key}" | |||
| } | |||
| response = requests.post(self.api_url + url, data=data, files=files, | |||
| headers=headers) | |||
| if response.status_code == 200 and response.json().get('retmsg') == 'success': | |||
| return True | |||
| else: | |||
| raise Exception(f"Upload failed: {response.json().get('retmsg')}") | |||
| return False | |||
| def get_document(self, id: str = None, name: str = None) -> Document: | |||
| res = self.get("/doc/infos", {"id": id, "name": name}) | |||
| res = res.json() | |||
| if res.get("retmsg") == "success": | |||
| return Document(self, res['data']) | |||
| raise Exception(res["retmsg"]) | |||
| @@ -0,0 +1 @@ | |||
| {"data":null,"retcode":100,"retmsg":"TypeError(\"download_document() got an unexpected keyword argument 'tenant_id'\")"} | |||
| @@ -0,0 +1,144 @@ | |||
| from ragflow import RAGFlow, DataSet, Document | |||
| from common import API_KEY, HOST_ADDRESS | |||
| from test_sdkbase import TestSdk | |||
| class TestDocument(TestSdk): | |||
| def test_upload_document_with_success(self): | |||
| """ | |||
| Test ingesting a document into a dataset with success. | |||
| """ | |||
| # Initialize RAGFlow instance | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| # Step 1: Create a new dataset | |||
| ds = rag.create_dataset(name="God") | |||
| # Ensure dataset creation was successful | |||
| assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" | |||
| assert ds.name == "God", "Dataset name does not match." | |||
| # Step 2: Create a new document | |||
| # The blob is the actual file content or a placeholder in this case | |||
| name = "TestDocument.txt" | |||
| blob = b"Sample document content for ingestion test." | |||
| res = rag.create_document(ds, name=name, blob=blob) | |||
| # Ensure document ingestion was successful | |||
| assert res is True, f"Failed to create document, error: {res}" | |||
| def test_get_detail_document_with_success(self): | |||
| """ | |||
| Test getting a document's detail with success | |||
| """ | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| doc = rag.get_document(name="TestDocument.txt") | |||
| assert isinstance(doc, Document), f"Failed to get dataset, error: {doc}." | |||
| assert doc.name == "TestDocument.txt", "Name does not match" | |||
| def test_update_document_with_success(self): | |||
| """ | |||
| Test updating a document with success. | |||
| """ | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| doc = rag.get_document(name="TestDocument.txt") | |||
| if isinstance(doc, Document): | |||
| doc.parser_method = "manual" | |||
| res = doc.save() | |||
| assert res is True, f"Failed to update document, error: {res}" | |||
| else: | |||
| assert False, f"Failed to get document, error: {doc}" | |||
| def test_download_document_with_success(self): | |||
| """ | |||
| Test downloading a document with success. | |||
| """ | |||
| # Initialize RAGFlow instance | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| # Retrieve a document | |||
| doc = rag.get_document(name="TestDocument.txt") | |||
| # Check if the retrieved document is of type Document | |||
| if isinstance(doc, Document): | |||
| # Download the document content and save it to a file | |||
| try: | |||
| with open("ragflow.txt", "wb+") as file: | |||
| file.write(doc.download()) | |||
| # Print the document object for debugging | |||
| print(doc) | |||
| # Assert that the download was successful | |||
| assert True, "Document downloaded successfully." | |||
| except Exception as e: | |||
| # If an error occurs, raise an assertion error | |||
| assert False, f"Failed to download document, error: {str(e)}" | |||
| else: | |||
| # If the document retrieval fails, assert failure | |||
| assert False, f"Failed to get document, error: {doc}" | |||
| def test_list_all_documents_in_dataset_with_success(self): | |||
| """ | |||
| Test list all documents into a dataset with success. | |||
| """ | |||
| # Initialize RAGFlow instance | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| # Step 1: Create a new dataset | |||
| ds = rag.create_dataset(name="God2") | |||
| # Ensure dataset creation was successful | |||
| assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" | |||
| assert ds.name == "God2", "Dataset name does not match." | |||
| # Step 2: Create a new document | |||
| # The blob is the actual file content or a placeholder in this case | |||
| name1 = "Test Document111.txt" | |||
| blob1 = b"Sample document content for ingestion test111." | |||
| name2 = "Test Document222.txt" | |||
| blob2 = b"Sample document content for ingestion test222." | |||
| rag.create_document(ds, name=name1, blob=blob1) | |||
| rag.create_document(ds, name=name2, blob=blob2) | |||
| for d in ds.list_docs(keywords="test", offset=0, limit=12): | |||
| assert isinstance(d, Document) | |||
| print(d) | |||
| def test_delete_documents_in_dataset_with_success(self): | |||
| """ | |||
| Test list all documents into a dataset with success. | |||
| """ | |||
| # Initialize RAGFlow instance | |||
| rag = RAGFlow(API_KEY, HOST_ADDRESS) | |||
| # Step 1: Create a new dataset | |||
| ds = rag.create_dataset(name="God3") | |||
| # Ensure dataset creation was successful | |||
| assert isinstance(ds, DataSet), f"Failed to create dataset, error: {ds}" | |||
| assert ds.name == "God3", "Dataset name does not match." | |||
| # Step 2: Create a new document | |||
| # The blob is the actual file content or a placeholder in this case | |||
| name1 = "Test Document333.txt" | |||
| blob1 = b"Sample document content for ingestion test333." | |||
| name2 = "Test Document444.txt" | |||
| blob2 = b"Sample document content for ingestion test444." | |||
| name3='test.txt' | |||
| path='test_data/test.txt' | |||
| rag.create_document(ds, name=name3, blob=open(path, "rb").read()) | |||
| rag.create_document(ds, name=name1, blob=blob1) | |||
| rag.create_document(ds, name=name2, blob=blob2) | |||
| for d in ds.list_docs(keywords="document", offset=0, limit=12): | |||
| assert isinstance(d, Document) | |||
| d.delete() | |||
| print(d) | |||
| remaining_docs = ds.list_docs(keywords="rag", offset=0, limit=12) | |||
| assert len(remaining_docs) == 0, "Documents were not properly deleted." | |||