浏览代码

Fix KB list bugs and add web api test (#3649)

### What problem does this PR solve?

1. Read KB list path parameter, page_number and page_size, which type
isn't int
2. Add cases on create / list / delete datasets.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Test cases

Signed-off-by: jinhai <haijin.chn@gmail.com>
tags/v0.14.1
Jin Hai 11 个月前
父节点
当前提交
b6f3f15f0b
没有帐户链接到提交者的电子邮件

+ 2
- 2
api/apps/kb_app.py 查看文件

@@ -125,8 +125,8 @@ def detail():
@manager.route('/list', methods=['GET'])
@login_required
def list_kbs():
page_number = request.args.get("page", 1)
items_per_page = request.args.get("page_size", 150)
page_number = int(request.args.get("page", 1))
items_per_page = int(request.args.get("page_size", 150))
orderby = request.args.get("orderby", "create_time")
desc = request.args.get("desc", True)
try:

+ 10
- 4
sdk/python/test/conftest.py 查看文件

@@ -7,10 +7,13 @@ import requests

HOST_ADDRESS = os.getenv('HOST_ADDRESS', 'http://127.0.0.1:9380')

def generate_random_email():
return 'user_' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))+'@1.com'
# def generate_random_email():
# return 'user_' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))+'@1.com'

EMAIL = generate_random_email()
def generate_email():
return 'user_123@1.com'

EMAIL = generate_email()
# password is "123"
PASSWORD='''ctAseGvejiaSWWZ88T/m4FQVOpQyUvP+x7sXtdv3feqZACiQleuewkUi35E16wSd5C5QcnkkcV9cYc8TKPTRZlxappDuirxghxoOvFcJxFU4ixLsD
fN33jCHRoDUW81IH9zjij/vaw8IbVyb6vuwg6MX6inOEBRRzVbRYxXOu1wkWY6SsI8X70oF9aeLFp/PzQpjoe/YbSqpTq8qqrmHzn9vO+yvyYyvmDsphXe
@@ -49,7 +52,10 @@ def get_api_key_fixture():

@pytest.fixture(scope="session")
def get_auth():
register()
try:
register()
except Exception as e:
print(e)
auth = login()
return auth


+ 25
- 1
sdk/python/test/test_frontend_api/common.py 查看文件

@@ -1,2 +1,26 @@
import os
HOST_ADDRESS=os.getenv('HOST_ADDRESS', 'http://127.0.0.1:9380')
import requests

HOST_ADDRESS = os.getenv('HOST_ADDRESS', 'http://127.0.0.1:9380')

def create_dataset(auth, dataset_name):
authorization = {"Authorization": auth}
url = f"{HOST_ADDRESS}/v1/kb/create"
json = {"name": dataset_name}
res = requests.post(url=url, headers=authorization, json=json)
return res.json()


def list_dataset(auth, page_number):
authorization = {"Authorization": auth}
url = f"{HOST_ADDRESS}/v1/kb/list?page={page_number}"
res = requests.get(url=url, headers=authorization)
return res.json()


def rm_dataset(auth, dataset_id):
authorization = {"Authorization": auth}
url = f"{HOST_ADDRESS}/v1/kb/rm"
json = {"kb_id": dataset_id}
res = requests.post(url=url, headers=authorization, json=json)
return res.json()

+ 60
- 7
sdk/python/test/test_frontend_api/test_dataset.py 查看文件

@@ -1,10 +1,63 @@
from common import HOST_ADDRESS
from common import HOST_ADDRESS, create_dataset, list_dataset, rm_dataset
import requests
def test_create_dataset(get_auth):
authorization={"Authorization": get_auth}


def test_dataset(get_auth):
# create dataset
res = create_dataset(get_auth, "test_create_dataset")
assert res.get("code") == 0, f"{res.get('message')}"

# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, page_number)
data = res.get("data")
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1

print(f"found {len(dataset_list)} datasets")
# delete dataset
for dataset_id in dataset_list:
res = rm_dataset(get_auth, dataset_id)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")


def test_dataset_1k_dataset(get_auth):
# create dataset
authorization = {"Authorization": get_auth}
url = f"{HOST_ADDRESS}/v1/kb/create"
json = {"name":"test_create_dataset"}
res = requests.post(url=url,headers=authorization,json=json)
res = res.json()
assert res.get("code") == 0,f"{res.get('message')}"
for i in range(1000):
res = create_dataset(get_auth, f"test_create_dataset_{i}")
assert res.get("code") == 0, f"{res.get('message')}"

# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, page_number)
data = res.get("data")
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1

print(f"found {len(dataset_list)} datasets")
# delete dataset
for dataset_id in dataset_list:
res = rm_dataset(get_auth, dataset_id)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")

# delete dataset
# create invalid name dataset
# update dataset with different parameters
# create duplicated name dataset
#

正在加载...
取消
保存