| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- #
- # Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
-
- import pytest
- from common import (
- add_chunk,
- batch_create_datasets,
- bulk_upload_documents,
- create_chat_assistant,
- delete_chat_assistants,
- delete_datasets,
- delete_session_with_chat_assistants,
- list_documnets,
- parse_documnets,
- )
- from libs.auth import RAGFlowHttpApiAuth
- from utils import wait_for
- from utils.file_utils import (
- create_docx_file,
- create_eml_file,
- create_excel_file,
- create_html_file,
- create_image_file,
- create_json_file,
- create_md_file,
- create_pdf_file,
- create_ppt_file,
- create_txt_file,
- )
-
-
- @wait_for(30, 1, "Document parsing timeout")
- def condition(_auth, _dataset_id):
- res = list_documnets(_auth, _dataset_id)
- for doc in res["data"]["docs"]:
- if doc["run"] != "DONE":
- return False
- return True
-
-
- @pytest.fixture(scope="session")
- def api_key(token):
- return RAGFlowHttpApiAuth(token)
-
-
- @pytest.fixture(scope="function")
- def clear_datasets(request, api_key):
- def cleanup():
- delete_datasets(api_key, {"ids": None})
-
- request.addfinalizer(cleanup)
-
-
- @pytest.fixture(scope="function")
- def clear_chat_assistants(request, api_key):
- def cleanup():
- delete_chat_assistants(api_key)
-
- request.addfinalizer(cleanup)
-
-
- @pytest.fixture(scope="function")
- def clear_session_with_chat_assistants(request, api_key, add_chat_assistants):
- _, _, chat_assistant_ids = add_chat_assistants
-
- def cleanup():
- for chat_assistant_id in chat_assistant_ids:
- delete_session_with_chat_assistants(api_key, chat_assistant_id)
-
- request.addfinalizer(cleanup)
-
-
- @pytest.fixture
- def generate_test_files(request, tmp_path):
- file_creators = {
- "docx": (tmp_path / "ragflow_test.docx", create_docx_file),
- "excel": (tmp_path / "ragflow_test.xlsx", create_excel_file),
- "ppt": (tmp_path / "ragflow_test.pptx", create_ppt_file),
- "image": (tmp_path / "ragflow_test.png", create_image_file),
- "pdf": (tmp_path / "ragflow_test.pdf", create_pdf_file),
- "txt": (tmp_path / "ragflow_test.txt", create_txt_file),
- "md": (tmp_path / "ragflow_test.md", create_md_file),
- "json": (tmp_path / "ragflow_test.json", create_json_file),
- "eml": (tmp_path / "ragflow_test.eml", create_eml_file),
- "html": (tmp_path / "ragflow_test.html", create_html_file),
- }
-
- files = {}
- for file_type, (file_path, creator_func) in file_creators.items():
- if request.param in ["", file_type]:
- creator_func(file_path)
- files[file_type] = file_path
- return files
-
-
- @pytest.fixture(scope="class")
- def ragflow_tmp_dir(request, tmp_path_factory):
- class_name = request.cls.__name__
- return tmp_path_factory.mktemp(class_name)
-
-
- @pytest.fixture(scope="class")
- def add_dataset(request, api_key):
- def cleanup():
- delete_datasets(api_key, {"ids": None})
-
- request.addfinalizer(cleanup)
-
- dataset_ids = batch_create_datasets(api_key, 1)
- return dataset_ids[0]
-
-
- @pytest.fixture(scope="function")
- def add_dataset_func(request, api_key):
- def cleanup():
- delete_datasets(api_key, {"ids": None})
-
- request.addfinalizer(cleanup)
-
- return batch_create_datasets(api_key, 1)[0]
-
-
- @pytest.fixture(scope="class")
- def add_document(api_key, add_dataset, ragflow_tmp_dir):
- dataset_id = add_dataset
- document_ids = bulk_upload_documents(api_key, dataset_id, 1, ragflow_tmp_dir)
- return dataset_id, document_ids[0]
-
-
- @pytest.fixture(scope="class")
- def add_chunks(api_key, add_document):
- dataset_id, document_id = add_document
- parse_documnets(api_key, dataset_id, {"document_ids": [document_id]})
- condition(api_key, dataset_id)
-
- chunk_ids = []
- for i in range(4):
- res = add_chunk(api_key, dataset_id, document_id, {"content": f"chunk test {i}"})
- chunk_ids.append(res["data"]["chunk"]["id"])
-
- # issues/6487
- from time import sleep
-
- sleep(1)
- return dataset_id, document_id, chunk_ids
-
-
- @pytest.fixture(scope="class")
- def add_chat_assistants(request, api_key, add_document):
- def cleanup():
- delete_chat_assistants(api_key)
-
- request.addfinalizer(cleanup)
-
- dataset_id, document_id = add_document
- parse_documnets(api_key, dataset_id, {"document_ids": [document_id]})
- condition(api_key, dataset_id)
-
- chat_assistant_ids = []
- for i in range(5):
- res = create_chat_assistant(api_key, {"name": f"test_chat_assistant_{i}", "dataset_ids": [dataset_id]})
- chat_assistant_ids.append(res["data"]["id"])
-
- return dataset_id, document_id, chat_assistant_ids
|