浏览代码

Refa: dataset operations to simplify error handling (#8132)

### What problem does this PR solve?

- Consolidate database operations within single try-except blocks in the
methods

### Type of change

- [x] Refactoring
tags/v0.19.1
Liu An 4 个月前
父节点
当前提交
968ffc7ef3
没有帐户链接到提交者的电子邮件
共有 1 个文件被更改,包括 68 次插入106 次删除
  1. 68
    106
      api/apps/sdk/dataset.py

+ 68
- 106
api/apps/sdk/dataset.py 查看文件

try: try:
if KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value): if KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value):
return get_error_operating_result(message=f"Dataset name '{req['name']}' already exists") return get_error_operating_result(message=f"Dataset name '{req['name']}' already exists")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"])
req["id"] = get_uuid()
req["tenant_id"] = tenant_id
req["created_by"] = tenant_id
req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"])
req["id"] = get_uuid()
req["tenant_id"] = tenant_id
req["created_by"] = tenant_id


try:
ok, t = TenantService.get_by_id(tenant_id) ok, t = TenantService.get_by_id(tenant_id)
if not ok: if not ok:
return get_error_permission_result(message="Tenant not found") return get_error_permission_result(message="Tenant not found")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


if not req.get("embd_id"):
req["embd_id"] = t.embd_id
else:
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
if not ok:
return err
if not req.get("embd_id"):
req["embd_id"] = t.embd_id
else:
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
if not ok:
return err


try:
if not KnowledgebaseService.save(**req): if not KnowledgebaseService.save(**req):
return get_error_data_result(message="Create dataset error.(Database error)") return get_error_data_result(message="Create dataset error.(Database error)")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


try:
ok, k = KnowledgebaseService.get_by_id(req["id"]) ok, k = KnowledgebaseService.get_by_id(req["id"])
if not ok: if not ok:
return get_error_data_result(message="Dataset created failed") return get_error_data_result(message="Dataset created failed")

response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)
except OperationalError as e: except OperationalError as e:
logging.exception(e) logging.exception(e)
return get_error_data_result(message="Database operation failed") return get_error_data_result(message="Database operation failed")


response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)



@manager.route("/datasets", methods=["DELETE"]) # noqa: F821 @manager.route("/datasets", methods=["DELETE"]) # noqa: F821
@token_required @token_required
if err is not None: if err is not None:
return get_error_argument_result(err) return get_error_argument_result(err)


kb_id_instance_pairs = []
if req["ids"] is None:
try:
try:
kb_id_instance_pairs = []
if req["ids"] is None:
kbs = KnowledgebaseService.query(tenant_id=tenant_id) kbs = KnowledgebaseService.query(tenant_id=tenant_id)
for kb in kbs: for kb in kbs:
kb_id_instance_pairs.append((kb.id, kb)) kb_id_instance_pairs.append((kb.id, kb))
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")
else:
error_kb_ids = []
for kb_id in req["ids"]:
try:

else:
error_kb_ids = []
for kb_id in req["ids"]:
kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id) kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id)
if kb is None: if kb is None:
error_kb_ids.append(kb_id) error_kb_ids.append(kb_id)
continue continue
kb_id_instance_pairs.append((kb_id, kb)) kb_id_instance_pairs.append((kb_id, kb))
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")
if len(error_kb_ids) > 0:
return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""")

errors = []
success_count = 0
for kb_id, kb in kb_id_instance_pairs:
try:
if len(error_kb_ids) > 0:
return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""")

errors = []
success_count = 0
for kb_id, kb in kb_id_instance_pairs:
for doc in DocumentService.query(kb_id=kb_id): for doc in DocumentService.query(kb_id=kb_id):
if not DocumentService.remove_document(doc, tenant_id): if not DocumentService.remove_document(doc, tenant_id):
errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'") errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
errors.append(f"Delete dataset error for {kb_id}") errors.append(f"Delete dataset error for {kb_id}")
continue continue
success_count += 1 success_count += 1
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


if not errors:
return get_result()
if not errors:
return get_result()


error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..."
if success_count == 0:
return get_error_data_result(message=error_message)
error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..."
if success_count == 0:
return get_error_data_result(message=error_message)


return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message)
return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message)
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")




@manager.route("/datasets/<dataset_id>", methods=["PUT"]) # noqa: F821 @manager.route("/datasets/<dataset_id>", methods=["PUT"]) # noqa: F821
kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id) kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
if kb is None: if kb is None:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'") return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


if req.get("parser_config"):
req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])
if req.get("parser_config"):
req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])


if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id:
if not req.get("parser_config"):
req["parser_config"] = get_parser_config(chunk_method, None)
elif "parser_config" in req and not req["parser_config"]:
del req["parser_config"]
if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id:
if not req.get("parser_config"):
req["parser_config"] = get_parser_config(chunk_method, None)
elif "parser_config" in req and not req["parser_config"]:
del req["parser_config"]


if "name" in req and req["name"].lower() != kb.name.lower():
try:
if "name" in req and req["name"].lower() != kb.name.lower():
exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value) exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value)
if exists: if exists:
return get_error_data_result(message=f"Dataset name '{req['name']}' already exists") return get_error_data_result(message=f"Dataset name '{req['name']}' already exists")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")

if "embd_id" in req:
if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}")
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
if not ok:
return err


try:
if "embd_id" in req:
if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}")
ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
if not ok:
return err

if not KnowledgebaseService.update_by_id(kb.id, req): if not KnowledgebaseService.update_by_id(kb.id, req):
return get_error_data_result(message="Update dataset error.(Database error)") return get_error_data_result(message="Update dataset error.(Database error)")
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")


try:
ok, k = KnowledgebaseService.get_by_id(kb.id) ok, k = KnowledgebaseService.get_by_id(kb.id)
if not ok: if not ok:
return get_error_data_result(message="Dataset created failed") return get_error_data_result(message="Dataset created failed")

response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)
except OperationalError as e: except OperationalError as e:
logging.exception(e) logging.exception(e)
return get_error_data_result(message="Database operation failed") return get_error_data_result(message="Database operation failed")


response_data = remap_dictionary_keys(k.to_dict())
return get_result(data=response_data)



@manager.route("/datasets", methods=["GET"]) # noqa: F821 @manager.route("/datasets", methods=["GET"]) # noqa: F821
@token_required @token_required
if err is not None: if err is not None:
return get_error_argument_result(err) return get_error_argument_result(err)


kb_id = request.args.get("id")
name = args.get("name")
if kb_id:
try:
try:
kb_id = request.args.get("id")
name = args.get("name")
if kb_id:
kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id) kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id)
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")
if not kbs:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'")
if name:
try:

if not kbs:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'")
if name:
kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id) kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id)
except OperationalError as e:
logging.exception(e)
return get_error_data_result(message="Database operation failed")
if not kbs:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'")
if not kbs:
return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'")


try:
tenants = TenantService.get_joined_tenants_by_user_id(tenant_id) tenants = TenantService.get_joined_tenants_by_user_id(tenant_id)
kbs = KnowledgebaseService.get_list( kbs = KnowledgebaseService.get_list(
[m["tenant_id"] for m in tenants], [m["tenant_id"] for m in tenants],
kb_id, kb_id,
name, name,
) )

response_data_list = []
for kb in kbs:
response_data_list.append(remap_dictionary_keys(kb))
return get_result(data=response_data_list)
except OperationalError as e: except OperationalError as e:
logging.exception(e) logging.exception(e)
return get_error_data_result(message="Database operation failed") return get_error_data_result(message="Database operation failed")

response_data_list = []
for kb in kbs:
response_data_list.append(remap_dictionary_keys(kb))
return get_result(data=response_data_list)

正在加载...
取消
保存