Bladeren bron

created get_dataset, update_dataset API and fixed: delete (#1201)

### What problem does this PR solve?

Added get_dataset and update_dataset API.
Fixed delete_dataset.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [x] Documentation Update
tags/v0.8.0
cecilia-uu 1 jaar geleden
bovenliggende
commit
5ec19b5f53
No account linked to committer's email address
5 gewijzigde bestanden met toevoegingen van 281 en 124 verwijderingen
  1. 107
    30
      api/apps/dataset_api.py
  2. 2
    1
      docs/references/ragflow_api.md
  3. 15
    45
      sdk/python/ragflow/ragflow.py
  4. 1
    1
      sdk/python/test/common.py
  5. 156
    47
      sdk/python/test/test_dataset.py

+ 107
- 30
api/apps/dataset_api.py Bestand weergeven

from datetime import datetime, timedelta from datetime import datetime, timedelta
from flask import request, Response from flask import request, Response
from flask_login import login_required, current_user from flask_login import login_required, current_user
from httpx import HTTPError


from api.db import FileType, ParserType, FileSource, StatusEnum from api.db import FileType, ParserType, FileSource, StatusEnum
from api.db.db_models import APIToken, API4Conversation, Task, File from api.db.db_models import APIToken, API4Conversation, Task, File
from api.contants import NAME_LENGTH_LIMIT from api.contants import NAME_LENGTH_LIMIT


# ------------------------------ create a dataset --------------------------------------- # ------------------------------ create a dataset ---------------------------------------

@manager.route('/', methods=['POST']) @manager.route('/', methods=['POST'])
@login_required # use login @login_required # use login
@validate_request("name") # check name key @validate_request("name") # check name key
request_body["id"] = get_uuid() request_body["id"] = get_uuid()
request_body["tenant_id"] = tenant_id request_body["tenant_id"] = tenant_id
request_body["created_by"] = tenant_id request_body["created_by"] = tenant_id
e, t = TenantService.get_by_id(tenant_id)
if not e:
exist, t = TenantService.get_by_id(tenant_id)
if not exist:
return construct_result(code=RetCode.AUTHENTICATION_ERROR, message="Tenant not found.") return construct_result(code=RetCode.AUTHENTICATION_ERROR, message="Tenant not found.")
request_body["embd_id"] = t.embd_id request_body["embd_id"] = t.embd_id
if not KnowledgebaseService.save(**request_body): if not KnowledgebaseService.save(**request_body):
# failed to create new dataset # failed to create new dataset
return construct_result() return construct_result()
return construct_json_result(data={"dataset_name": request_body["name"]})
return construct_json_result(code=RetCode.SUCCESS,
data={"dataset_name": request_body["name"], "dataset_id": request_body["id"]})
except Exception as e: except Exception as e:
return construct_error_response(e) return construct_error_response(e)


# -----------------------------list datasets------------------------------------------------------- # -----------------------------list datasets-------------------------------------------------------

@manager.route('/', methods=['GET']) @manager.route('/', methods=['GET'])
@login_required @login_required
def list_datasets(): def list_datasets():
desc = request.args.get("desc", True) desc = request.args.get("desc", True)
try: try:
tenants = TenantService.get_joined_tenants_by_user_id(current_user.id) tenants = TenantService.get_joined_tenants_by_user_id(current_user.id)
kbs = KnowledgebaseService.get_by_tenant_ids_by_offset(
datasets = KnowledgebaseService.get_by_tenant_ids_by_offset(
[m["tenant_id"] for m in tenants], current_user.id, int(offset), int(count), orderby, desc) [m["tenant_id"] for m in tenants], current_user.id, int(offset), int(count), orderby, desc)
return construct_json_result(data=kbs, code=RetCode.DATA_ERROR, message=f"attempt to list datasets")
return construct_json_result(data=datasets, code=RetCode.SUCCESS, message=f"List datasets successfully!")
except Exception as e: except Exception as e:
return construct_error_response(e) return construct_error_response(e)
except HTTPError as http_err:
return construct_json_result(http_err)


# ---------------------------------delete a dataset ---------------------------- # ---------------------------------delete a dataset ----------------------------


@manager.route('/<dataset_id>', methods=['DELETE']) @manager.route('/<dataset_id>', methods=['DELETE'])
@login_required @login_required
@validate_request("dataset_id")
def remove_dataset(dataset_id): def remove_dataset(dataset_id):
req = request.json
try: try:
kbs = KnowledgebaseService.query(
created_by=current_user.id, id=req["dataset_id"])
if not kbs:
return construct_json_result(
data=False, message=f'Only owner of knowledgebase authorized for this operation.',
code=RetCode.OPERATING_ERROR)

for doc in DocumentService.query(kb_id=req["dataset_id"]):
if not DocumentService.remove_document(doc, kbs[0].tenant_id):
return construct_json_result(
message="Database error (Document removal)!")
datasets = KnowledgebaseService.query(created_by=current_user.id, id=dataset_id)

# according to the id, searching for the dataset
if not datasets:
return construct_json_result(message=f'The dataset cannot be found for your current account.',
code=RetCode.OPERATING_ERROR)

# Iterating the documents inside the dataset
for doc in DocumentService.query(kb_id=dataset_id):
if not DocumentService.remove_document(doc, datasets[0].tenant_id):
# the process of deleting failed
return construct_json_result(code=RetCode.DATA_ERROR,
message="There was an error during the document removal process. "
"Please check the status of the RAGFlow server and try the removal again.")
# delete the other files
f2d = File2DocumentService.get_by_document_id(doc.id) f2d = File2DocumentService.get_by_document_id(doc.id)
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc.id) File2DocumentService.delete_by_document_id(doc.id)


if not KnowledgebaseService.delete_by_id(req["dataset_id"]):
return construct_json_result(
message="Database error (Knowledgebase removal)!")
return construct_json_result(code=RetCode.DATA_ERROR, message=f"attempt to remove dataset: {dataset_id}")
# delete the dataset
if not KnowledgebaseService.delete_by_id(dataset_id):
return construct_json_result(code=RetCode.DATA_ERROR, message="There was an error during the dataset removal process. "
"Please check the status of the RAGFlow server and try the removal again.")
# success
return construct_json_result(code=RetCode.SUCCESS, message=f"Remove dataset: {dataset_id} successfully")
except Exception as e: except Exception as e:
return construct_error_response(e) return construct_error_response(e)


# ------------------------------ get details of a dataset ---------------------------------------- # ------------------------------ get details of a dataset ----------------------------------------

@manager.route('/<dataset_id>', methods=['GET']) @manager.route('/<dataset_id>', methods=['GET'])
@login_required @login_required
@validate_request("dataset_id")
def get_dataset():
dataset_id = request.args["dataset_id"]
def get_dataset(dataset_id):
try: try:
dataset = KnowledgebaseService.get_detail(dataset_id) dataset = KnowledgebaseService.get_detail(dataset_id)
if not dataset: if not dataset:
return construct_json_result(
message="Can't find this knowledgebase!")
return construct_json_result(code=RetCode.DATA_ERROR, message=f"attempt to get detail of dataset: {dataset_id}")
return construct_json_result(code=RetCode.DATA_ERROR, message="Can't find this dataset!")
return construct_json_result(data=dataset, code=RetCode.SUCCESS)
except Exception as e: except Exception as e:
return construct_json_result(e) return construct_json_result(e)


# ------------------------------ update a dataset -------------------------------------------- # ------------------------------ update a dataset --------------------------------------------

@manager.route('/<dataset_id>', methods=['PUT']) @manager.route('/<dataset_id>', methods=['PUT'])
@login_required @login_required
@validate_request("name")
def update_dataset(dataset_id): def update_dataset(dataset_id):
return construct_json_result(code=RetCode.DATA_ERROR, message=f"attempt to update dataset: {dataset_id}")
req = request.json
try:
# the request cannot be empty
if not req:
return construct_json_result(code=RetCode.DATA_ERROR, message="Please input at least one parameter that "
"you want to update!")
# check whether the dataset can be found
if not KnowledgebaseService.query(created_by=current_user.id, id=dataset_id):
return construct_json_result(message=f'Only the owner of knowledgebase is authorized for this operation!',
code=RetCode.OPERATING_ERROR)

exist, dataset = KnowledgebaseService.get_by_id(dataset_id)
# check whether there is this dataset
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR, message="This dataset cannot be found!")

if 'name' in req:
name = req["name"].strip()
# check whether there is duplicate name
if name.lower() != dataset.name.lower() \
and len(KnowledgebaseService.query(name=name, tenant_id=current_user.id,
status=StatusEnum.VALID.value)) > 1:
return construct_json_result(code=RetCode.DATA_ERROR, message=f"The name: {name.lower()} is already used by other "
f"datasets. Please choose a different name.")


dataset_updating_data = {}
chunk_num = req.get("chunk_num")
# modify the value of 11 parameters


# 2 parameters: embedding id and chunk method
# only if chunk_num is 0, the user can update the embedding id
if req.get('embedding_model_id'):
if chunk_num == 0:
dataset_updating_data['embd_id'] = req['embedding_model_id']
else:
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
"dataset, so you cannot change the embedding "
"model.")
# only if chunk_num is 0, the user can update the chunk_method
if req.get("chunk_method"):
if chunk_num == 0:
dataset_updating_data['parser_id'] = req["chunk_method"]
else:
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document "
"in this dataset, so you cannot "
"change the chunk method.")
# convert the photo parameter to avatar
if req.get("photo"):
dataset_updating_data['avatar'] = req["photo"]


# layout_recognize
if 'layout_recognize' in req:
if 'parser_config' not in dataset_updating_data:
dataset_updating_data['parser_config'] = {}
dataset_updating_data['parser_config']['layout_recognize'] = req['layout_recognize']


# TODO: updating use_raptor needs to construct a class


# 6 parameters
for key in ['name', 'language', 'description', 'permission', 'id', 'token_num']:
if key in req:
dataset_updating_data[key] = req.get(key)


# update
if not KnowledgebaseService.update_by_id(dataset.id, dataset_updating_data):
return construct_json_result(code=RetCode.OPERATING_ERROR, message="Failed to update! "
"Please check the status of RAGFlow "
"server and try again!")


exist, dataset = KnowledgebaseService.get_by_id(dataset.id)
if not exist:
return construct_json_result(code=RetCode.DATA_ERROR, message="Failed to get the dataset "
"using the dataset ID.")

return construct_json_result(data=dataset.to_json(), code=RetCode.SUCCESS)
except Exception as e:
return construct_error_response(e)

+ 2
- 1
docs/references/ragflow_api.md Bestand weergeven

{ {
"code": 0, "code": 0,
"data": { "data": {
"dataset_name": "kb1"
"dataset_name": "kb1",
"dataset_id": "375e8ada2d3c11ef98f93043d7ee537e"
}, },
"message": "success" "message": "success"
} }

+ 15
- 45
sdk/python/ragflow/ragflow.py Bestand weergeven

import requests import requests
import json import json


from httpx import HTTPError



class RAGFlow: class RAGFlow:
def __init__(self, user_key, base_url, version = 'v1'):
def __init__(self, user_key, base_url, version='v1'):
''' '''
api_url: http://<host_address>/api/v1 api_url: http://<host_address>/api/v1
dataset_url: http://<host_address>/api/v1/dataset dataset_url: http://<host_address>/api/v1/dataset


def delete_dataset(self, dataset_name): def delete_dataset(self, dataset_name):
dataset_id = self.find_dataset_id_by_name(dataset_name) dataset_id = self.find_dataset_id_by_name(dataset_name)
if not dataset_id:
return {"success": False, "message": "Dataset not found."}


res = requests.delete(f"{self.dataset_url}/{dataset_id}", headers=self.authorization_header)
if res.status_code == 200:
return {"success": True, "message": "Dataset deleted successfully!"}
else:
return {"success": False, "message": f"Other status code: {res.status_code}"}
endpoint = f"{self.dataset_url}/{dataset_id}"
res = requests.delete(endpoint, headers=self.authorization_header)
return res.json()


def find_dataset_id_by_name(self, dataset_name): def find_dataset_id_by_name(self, dataset_name):
res = requests.get(self.dataset_url, headers=self.authorization_header) res = requests.get(self.dataset_url, headers=self.authorization_header)
"orderby": orderby, "orderby": orderby,
"desc": desc "desc": desc
} }
try:
response = requests.get(url=self.dataset_url, params=params, headers=self.authorization_header)
response.raise_for_status() # if it is not 200
original_data = response.json()
# TODO: format the data
# print(original_data)
# # Process the original data into the desired format
# formatted_data = {
# "datasets": [
# {
# "id": dataset["id"],
# "created": dataset["create_time"], # Adjust the key based on the actual response
# "fileCount": dataset["doc_num"], # Adjust the key based on the actual response
# "name": dataset["name"]
# }
# for dataset in original_data
# ]
# }
return response.status_code, original_data
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error occurred: {err}")
response = requests.get(url=self.dataset_url, params=params, headers=self.authorization_header)
return response.json()


def get_dataset(self, dataset_id):
def get_dataset(self, dataset_name):
dataset_id = self.find_dataset_id_by_name(dataset_name)
endpoint = f"{self.dataset_url}/{dataset_id}" endpoint = f"{self.dataset_url}/{dataset_id}"
response = requests.get(endpoint)
if response.status_code == 200:
return response.json()
else:
return None
response = requests.get(endpoint, headers=self.authorization_header)
return response.json()

def update_dataset(self, dataset_name, **params):
dataset_id = self.find_dataset_id_by_name(dataset_name)


def update_dataset(self, dataset_id, params):
endpoint = f"{self.dataset_url}/{dataset_id}" endpoint = f"{self.dataset_url}/{dataset_id}"
response = requests.put(endpoint, json=params)
if response.status_code == 200:
return True
else:
return False
response = requests.put(endpoint, json=params, headers=self.authorization_header)
return response.json()

+ 1
- 1
sdk/python/test/common.py Bestand weergeven





API_KEY = 'ImFmNWQ3YTY0Mjg5NjExZWZhNTdjMzA0M2Q3ZWU1MzdlIg.ZmldwA.9oP9pVtuEQSpg-Z18A2eOkWO-3E'
API_KEY = 'ImFhMmJhZmUwMmQxNzExZWZhZDdmMzA0M2Q3ZWU1MzdlIg.ZnDsIQ.u-0-_qCRU6a4WICxyAPsjaafyOo'
HOST_ADDRESS = 'http://127.0.0.1:9380' HOST_ADDRESS = 'http://127.0.0.1:9380'

+ 156
- 47
sdk/python/test/test_dataset.py Bestand weergeven

from api.settings import RetCode
from test_sdkbase import TestSdk from test_sdkbase import TestSdk
from ragflow import RAGFlow from ragflow import RAGFlow
import pytest import pytest
4. update the kb 4. update the kb
5. delete the kb 5. delete the kb
""" """

def setup_method(self):
"""
Delete all the datasets.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
listed_data = ragflow.list_dataset()
listed_data = listed_data['data']

listed_names = {d['name'] for d in listed_data}
for name in listed_names:
ragflow.delete_dataset(name)

# -----------------------create_dataset--------------------------------- # -----------------------create_dataset---------------------------------
def test_create_dataset_with_success(self): def test_create_dataset_with_success(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
# create a kb # create a kb
res = ragflow.create_dataset("kb1") res = ragflow.create_dataset("kb1")
assert res['code'] == 0 and res['message'] == 'success'
assert res['code'] == RetCode.SUCCESS and res['message'] == 'success'


def test_create_dataset_with_empty_name(self): def test_create_dataset_with_empty_name(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset("") res = ragflow.create_dataset("")
assert res['message'] == 'Empty dataset name' and res['code'] == 102
assert res['message'] == 'Empty dataset name' and res['code'] == RetCode.DATA_ERROR


def test_create_dataset_with_name_exceeding_limit(self): def test_create_dataset_with_name_exceeding_limit(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['message'] == f"Dataset name: {name} with length {len(name)} exceeds {NAME_LENGTH_LIMIT}!" assert (res['message'] == f"Dataset name: {name} with length {len(name)} exceeds {NAME_LENGTH_LIMIT}!"
and res['code'] == 102)
and res['code'] == RetCode.DATA_ERROR)


def test_create_dataset_name_with_space_in_the_middle(self): def test_create_dataset_name_with_space_in_the_middle(self):
""" """
name = "k b" name = "k b"
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_name_with_space_in_the_head(self): def test_create_dataset_name_with_space_in_the_head(self):
""" """
name = " kb" name = " kb"
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_name_with_space_in_the_tail(self): def test_create_dataset_name_with_space_in_the_tail(self):
""" """
name = "kb " name = "kb "
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_name_with_space_in_the_head_and_tail_and_length_exceed_limit(self): def test_create_dataset_name_with_space_in_the_head_and_tail_and_length_exceed_limit(self):
""" """
name = " " + "k" * NAME_LENGTH_LIMIT + " " name = " " + "k" * NAME_LENGTH_LIMIT + " "
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_with_two_same_name(self): def test_create_dataset_with_two_same_name(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset("kb") res = ragflow.create_dataset("kb")
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')
res = ragflow.create_dataset("kb") res = ragflow.create_dataset("kb")
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_with_only_space_in_the_name(self): def test_create_dataset_with_only_space_in_the_name(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset(" ") res = ragflow.create_dataset(" ")
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_with_space_number_exceeding_limit(self): def test_create_dataset_with_space_number_exceeding_limit(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
name = " " * NAME_LENGTH_LIMIT name = " " * NAME_LENGTH_LIMIT
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_with_name_having_return(self): def test_create_dataset_with_name_having_return(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
name = "kb\n" name = "kb\n"
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


def test_create_dataset_with_name_having_the_null_character(self): def test_create_dataset_with_name_having_the_null_character(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
name = "kb\0" name = "kb\0"
res = ragflow.create_dataset(name) res = ragflow.create_dataset(name)
assert (res['code'] == 0 and res['message'] == 'success')
assert (res['code'] == RetCode.SUCCESS and res['message'] == 'success')


# -----------------------list_dataset--------------------------------- # -----------------------list_dataset---------------------------------
def test_list_dataset_success(self): def test_list_dataset_success(self):
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
# Call the list_datasets method # Call the list_datasets method
response = ragflow.list_dataset() response = ragflow.list_dataset()

code, datasets = response

assert code == 200
assert response['code'] == RetCode.SUCCESS


def test_list_dataset_with_checking_size_and_name(self): def test_list_dataset_with_checking_size_and_name(self):
""" """
dataset_name = response['data']['dataset_name'] dataset_name = response['data']['dataset_name']
real_name_to_create.add(dataset_name) real_name_to_create.add(dataset_name)


status_code, listed_data = ragflow.list_dataset(0, 3)
listed_data = listed_data['data']
response = ragflow.list_dataset(0, 3)
listed_data = response['data']


listed_names = {d['name'] for d in listed_data} listed_names = {d['name'] for d in listed_data}
assert listed_names == real_name_to_create assert listed_names == real_name_to_create
assert status_code == 200
assert response['code'] == RetCode.SUCCESS
assert len(listed_data) == len(datasets_to_create) assert len(listed_data) == len(datasets_to_create)


def test_list_dataset_with_getting_empty_result(self): def test_list_dataset_with_getting_empty_result(self):
dataset_name = response['data']['dataset_name'] dataset_name = response['data']['dataset_name']
real_name_to_create.add(dataset_name) real_name_to_create.add(dataset_name)


status_code, listed_data = ragflow.list_dataset(0, 0)
listed_data = listed_data['data']
response = ragflow.list_dataset(0, 0)
listed_data = response['data']


listed_names = {d['name'] for d in listed_data} listed_names = {d['name'] for d in listed_data}

assert listed_names == real_name_to_create assert listed_names == real_name_to_create
assert status_code == 200
assert response['code'] == RetCode.SUCCESS
assert len(listed_data) == 0 assert len(listed_data) == 0


def test_list_dataset_with_creating_100_knowledge_bases(self): def test_list_dataset_with_creating_100_knowledge_bases(self):
dataset_name = response['data']['dataset_name'] dataset_name = response['data']['dataset_name']
real_name_to_create.add(dataset_name) real_name_to_create.add(dataset_name)


status_code, listed_data = ragflow.list_dataset(0, 100)
listed_data = listed_data['data']
res = ragflow.list_dataset(0, 100)
listed_data = res['data']


listed_names = {d['name'] for d in listed_data} listed_names = {d['name'] for d in listed_data}
assert listed_names == real_name_to_create assert listed_names == real_name_to_create
assert status_code == 200
assert res['code'] == RetCode.SUCCESS
assert len(listed_data) == 100 assert len(listed_data) == 100


def test_list_dataset_with_showing_one_dataset(self): def test_list_dataset_with_showing_one_dataset(self):
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
response = ragflow.list_dataset(0, 1) response = ragflow.list_dataset(0, 1)
code, response = response
datasets = response['data'] datasets = response['data']
assert len(datasets) == 1
assert len(datasets) == 1 and response['code'] == RetCode.SUCCESS


def test_list_dataset_failure(self): def test_list_dataset_failure(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
response = ragflow.list_dataset(-1, -1) response = ragflow.list_dataset(-1, -1)
_, res = response
assert "IndexError" in res['message']
assert "IndexError" in response['message'] and response['code'] == RetCode.EXCEPTION_ERROR


def test_list_dataset_for_empty_datasets(self): def test_list_dataset_for_empty_datasets(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
response = ragflow.list_dataset() response = ragflow.list_dataset()
code, response = response
datasets = response['data'] datasets = response['data']
assert len(datasets) == 0
assert len(datasets) == 0 and response['code'] == RetCode.SUCCESS


# TODO: have to set the limitation of the number of datasets # TODO: have to set the limitation of the number of datasets


res = ragflow.create_dataset("kb0") res = ragflow.create_dataset("kb0")
real_dataset_name = res['data']['dataset_name'] real_dataset_name = res['data']['dataset_name']
# delete this dataset # delete this dataset
result = ragflow.delete_dataset(real_dataset_name)
assert result["success"] is True
res = ragflow.delete_dataset(real_dataset_name)
assert res['code'] == RetCode.SUCCESS and 'successfully' in res['message']


def test_delete_dataset_with_not_existing_dataset(self): def test_delete_dataset_with_not_existing_dataset(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.delete_dataset("weird_dataset") res = ragflow.delete_dataset("weird_dataset")
assert res["success"] is False
assert res['code'] == RetCode.OPERATING_ERROR and res['message'] == 'The dataset cannot be found for your current account.'


def test_delete_dataset_with_creating_100_datasets_and_deleting_100_datasets(self): def test_delete_dataset_with_creating_100_datasets_and_deleting_100_datasets(self):
""" """


for name in real_name_to_create: for name in real_name_to_create:
res = ragflow.delete_dataset(name) res = ragflow.delete_dataset(name)
assert res["success"] is True
assert res['code'] == RetCode.SUCCESS and 'successfully' in res['message']


def test_delete_dataset_with_space_in_the_middle_of_the_name(self): def test_delete_dataset_with_space_in_the_middle_of_the_name(self):
""" """
Test deleting a dataset when its name has space in the middle. Test deleting a dataset when its name has space in the middle.
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("k b")
res = ragflow.delete_dataset("k b") res = ragflow.delete_dataset("k b")
print(res)
assert res["success"] is True
assert res['code'] == RetCode.SUCCESS and 'successfully' in res['message']


def test_delete_dataset_with_space_in_the_head_of_the_name(self): def test_delete_dataset_with_space_in_the_head_of_the_name(self):
""" """
Test deleting a dataset when its name has space in the head. Test deleting a dataset when its name has space in the head.
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset(" kb")
res = ragflow.delete_dataset(" kb") res = ragflow.delete_dataset(" kb")
assert res["success"] is False
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'The dataset cannot be found for your current account.')


def test_delete_dataset_with_space_in_the_tail_of_the_name(self): def test_delete_dataset_with_space_in_the_tail_of_the_name(self):
""" """
Test deleting a dataset when its name has space in the tail. Test deleting a dataset when its name has space in the tail.
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("kb ")
res = ragflow.delete_dataset("kb ") res = ragflow.delete_dataset("kb ")
assert res["success"] is False
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'The dataset cannot be found for your current account.')


def test_delete_dataset_with_only_space_in_the_name(self): def test_delete_dataset_with_only_space_in_the_name(self):
""" """
Test deleting a dataset when its name only has space. Test deleting a dataset when its name only has space.
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset(" ")
res = ragflow.delete_dataset(" ") res = ragflow.delete_dataset(" ")
assert res["success"] is False
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'The dataset cannot be found for your current account.')


def test_delete_dataset_with_only_exceeding_limit_space_in_the_name(self): def test_delete_dataset_with_only_exceeding_limit_space_in_the_name(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
name = " " * (NAME_LENGTH_LIMIT + 1) name = " " * (NAME_LENGTH_LIMIT + 1)
ragflow.create_dataset(name)
res = ragflow.delete_dataset(name) res = ragflow.delete_dataset(name)
assert res["success"] is False
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'The dataset cannot be found for your current account.')


def test_delete_dataset_with_name_with_space_in_the_head_and_tail_and_length_exceed_limit(self): def test_delete_dataset_with_name_with_space_in_the_head_and_tail_and_length_exceed_limit(self):
""" """
""" """
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
name = " " + "k" * NAME_LENGTH_LIMIT + " " name = " " + "k" * NAME_LENGTH_LIMIT + " "
ragflow.create_dataset(name)
res = ragflow.delete_dataset(name) res = ragflow.delete_dataset(name)
assert res["success"] is False
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'The dataset cannot be found for your current account.')

# ---------------------------------get_dataset-----------------------------------------

def test_get_dataset_with_success(self):
"""
Test getting a dataset which exists.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
response = ragflow.create_dataset("test")
dataset_name = response['data']['dataset_name']
res = ragflow.get_dataset(dataset_name)
assert res['code'] == RetCode.SUCCESS and res['data']['name'] == dataset_name

def test_get_dataset_with_failure(self):
"""
Test getting a dataset which does not exist.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.get_dataset("weird_dataset")
assert res['code'] == RetCode.DATA_ERROR and res['message'] == "Can't find this dataset!"

# ---------------------------------update a dataset-----------------------------------

def test_update_dataset_without_existing_dataset(self):
"""
Test updating a dataset which does not exist.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
params = {
'name': 'new_name3',
'description': 'new_description',
"permission": 'me',
"parser_id": 'naive',
"language": 'English'
}
res = ragflow.update_dataset("weird_dataset", **params)
assert (res['code'] == RetCode.OPERATING_ERROR
and res['message'] == 'Only the owner of knowledgebase is authorized for this operation!')

def test_update_dataset_with_updating_six_parameters(self):
"""
Test updating a dataset when updating six parameters.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("new_name1")
params = {
'name': 'new_name',
'description': 'new_description1',
"permission": 'me',
"parser_id": 'naive',
"language": 'English'
}
res = ragflow.update_dataset("new_name1", **params)
assert res['code'] == RetCode.SUCCESS
assert (res['data']['description'] == 'new_description1'
and res['data']['name'] == 'new_name' and res['data']['permission'] == 'me'
and res['data']['language'] == 'English' and res['data']['parser_id'] == 'naive')

def test_update_dataset_with_updating_two_parameters(self):
"""
Test updating a dataset when updating two parameters.
"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("new_name2")
params = {
"name": "new_name3",
"language": 'English'
}
res = ragflow.update_dataset("new_name2", **params)
assert (res['code'] == RetCode.SUCCESS and res['data']['name'] == "new_name3"
and res['data']['language'] == 'English')

def test_update_dataset_with_updating_layout_recognize(self):
"""Test updating a dataset with only updating the layout_recognize"""
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("test_update_dataset_with_updating_layout_recognize")
params = {
"layout_recognize": False
}
res = ragflow.update_dataset("test_update_dataset_with_updating_layout_recognize", **params)
assert res['code'] == RetCode.SUCCESS and res['data']['parser_config']['layout_recognize'] is False

def test_update_dataset_with_empty_parameter(self):
ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
ragflow.create_dataset("test_update_dataset_with_empty_parameter")
params = {}
res = ragflow.update_dataset("test_update_dataset_with_empty_parameter", **params)
assert (res['code'] == RetCode.DATA_ERROR
and res['message'] == 'Please input at least one parameter that you want to update!')

# ---------------------------------mix the different methods--------------------------


# ---------------------------------mix the different methods--------------------
def test_create_and_delete_dataset_together(self): def test_create_and_delete_dataset_together(self):
""" """
Test creating 1 dataset, and then deleting 1 dataset. Test creating 1 dataset, and then deleting 1 dataset.
# create 1 dataset # create 1 dataset
ragflow = RAGFlow(API_KEY, HOST_ADDRESS) ragflow = RAGFlow(API_KEY, HOST_ADDRESS)
res = ragflow.create_dataset("ddd") res = ragflow.create_dataset("ddd")
assert res['code'] == 0 and res['message'] == 'success'
assert res['code'] == RetCode.SUCCESS and res['message'] == 'success'


# delete 1 dataset # delete 1 dataset
res = ragflow.delete_dataset("ddd") res = ragflow.delete_dataset("ddd")
assert res["success"] is True
assert res["code"] == RetCode.SUCCESS


# create 10 datasets # create 10 datasets
datasets_to_create = ["dataset1"] * 10 datasets_to_create = ["dataset1"] * 10
# delete 10 datasets # delete 10 datasets
for name in real_name_to_create: for name in real_name_to_create:
res = ragflow.delete_dataset(name) res = ragflow.delete_dataset(name)
assert res["success"] is True
assert res["code"] == RetCode.SUCCESS



Laden…
Annuleren
Opslaan