| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- #
- # Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
-
-
- import logging
- import os
-
- from flask import request
- from peewee import OperationalError
-
- from api import settings
- from api.db import FileSource, StatusEnum
- from api.db.db_models import File
- from api.db.services.document_service import DocumentService
- from api.db.services.file2document_service import File2DocumentService
- from api.db.services.file_service import FileService
- from api.db.services.knowledgebase_service import KnowledgebaseService
- from api.db.services.user_service import TenantService
- from api.utils import get_uuid
- from api.utils.api_utils import (
- deep_merge,
- get_error_argument_result,
- get_error_data_result,
- get_error_operating_result,
- get_error_permission_result,
- get_parser_config,
- get_result,
- remap_dictionary_keys,
- token_required,
- verify_embedding_availability,
- )
- from api.utils.validation_utils import (
- CreateDatasetReq,
- DeleteDatasetReq,
- ListDatasetReq,
- UpdateDatasetReq,
- validate_and_parse_json_request,
- validate_and_parse_request_args,
- )
- from rag.nlp import search
- from rag.settings import PAGERANK_FLD
-
-
- @manager.route("/datasets", methods=["POST"]) # noqa: F821
- @token_required
- def create(tenant_id):
- """
- Create a new dataset.
- ---
- tags:
- - Datasets
- security:
- - ApiKeyAuth: []
- parameters:
- - in: header
- name: Authorization
- type: string
- required: true
- description: Bearer token for authentication.
- - in: body
- name: body
- description: Dataset creation parameters.
- required: true
- schema:
- type: object
- required:
- - name
- properties:
- name:
- type: string
- description: Name of the dataset.
- avatar:
- type: string
- description: Base64 encoding of the avatar.
- description:
- type: string
- description: Description of the dataset.
- embedding_model:
- type: string
- description: Embedding model Name.
- permission:
- type: string
- enum: ['me', 'team']
- description: Dataset permission.
- chunk_method:
- type: string
- enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
- "picture", "presentation", "qa", "table", "tag"
- ]
- description: Chunking method.
- parser_config:
- type: object
- description: Parser configuration.
- responses:
- 200:
- description: Successful operation.
- schema:
- type: object
- properties:
- data:
- type: object
- """
- # Field name transformations during model dump:
- # | Original | Dump Output |
- # |----------------|-------------|
- # | embedding_model| embd_id |
- # | chunk_method | parser_id |
- req, err = validate_and_parse_json_request(request, CreateDatasetReq)
- if err is not None:
- return get_error_argument_result(err)
-
- try:
- if KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value):
- return get_error_operating_result(message=f"Dataset name '{req['name']}' already exists")
-
- req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"])
- req["id"] = get_uuid()
- req["tenant_id"] = tenant_id
- req["created_by"] = tenant_id
-
- ok, t = TenantService.get_by_id(tenant_id)
- if not ok:
- return get_error_permission_result(message="Tenant not found")
-
- if not req.get("embd_id"):
- req["embd_id"] = t.embd_id
- else:
- ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
- if not ok:
- return err
-
- if not KnowledgebaseService.save(**req):
- return get_error_data_result(message="Create dataset error.(Database error)")
-
- ok, k = KnowledgebaseService.get_by_id(req["id"])
- if not ok:
- return get_error_data_result(message="Dataset created failed")
-
- response_data = remap_dictionary_keys(k.to_dict())
- return get_result(data=response_data)
- except OperationalError as e:
- logging.exception(e)
- return get_error_data_result(message="Database operation failed")
-
-
- @manager.route("/datasets", methods=["DELETE"]) # noqa: F821
- @token_required
- def delete(tenant_id):
- """
- Delete datasets.
- ---
- tags:
- - Datasets
- security:
- - ApiKeyAuth: []
- parameters:
- - in: header
- name: Authorization
- type: string
- required: true
- description: Bearer token for authentication.
- - in: body
- name: body
- description: Dataset deletion parameters.
- required: true
- schema:
- type: object
- required:
- - ids
- properties:
- ids:
- type: array or null
- items:
- type: string
- description: |
- Specifies the datasets to delete:
- - If `null`, all datasets will be deleted.
- - If an array of IDs, only the specified datasets will be deleted.
- - If an empty array, no datasets will be deleted.
- responses:
- 200:
- description: Successful operation.
- schema:
- type: object
- """
- req, err = validate_and_parse_json_request(request, DeleteDatasetReq)
- if err is not None:
- return get_error_argument_result(err)
-
- try:
- kb_id_instance_pairs = []
- if req["ids"] is None:
- kbs = KnowledgebaseService.query(tenant_id=tenant_id)
- for kb in kbs:
- kb_id_instance_pairs.append((kb.id, kb))
-
- else:
- error_kb_ids = []
- for kb_id in req["ids"]:
- kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id)
- if kb is None:
- error_kb_ids.append(kb_id)
- continue
- kb_id_instance_pairs.append((kb_id, kb))
- if len(error_kb_ids) > 0:
- return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""")
-
- errors = []
- success_count = 0
- for kb_id, kb in kb_id_instance_pairs:
- for doc in DocumentService.query(kb_id=kb_id):
- if not DocumentService.remove_document(doc, tenant_id):
- errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'")
- continue
- f2d = File2DocumentService.get_by_document_id(doc.id)
- FileService.filter_delete(
- [
- File.source_type == FileSource.KNOWLEDGEBASE,
- File.id == f2d[0].file_id,
- ]
- )
- File2DocumentService.delete_by_document_id(doc.id)
- FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.type == "folder", File.name == kb.name])
- if not KnowledgebaseService.delete_by_id(kb_id):
- errors.append(f"Delete dataset error for {kb_id}")
- continue
- success_count += 1
-
- if not errors:
- return get_result()
-
- error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..."
- if success_count == 0:
- return get_error_data_result(message=error_message)
-
- return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message)
- except OperationalError as e:
- logging.exception(e)
- return get_error_data_result(message="Database operation failed")
-
-
- @manager.route("/datasets/<dataset_id>", methods=["PUT"]) # noqa: F821
- @token_required
- def update(tenant_id, dataset_id):
- """
- Update a dataset.
- ---
- tags:
- - Datasets
- security:
- - ApiKeyAuth: []
- parameters:
- - in: path
- name: dataset_id
- type: string
- required: true
- description: ID of the dataset to update.
- - in: header
- name: Authorization
- type: string
- required: true
- description: Bearer token for authentication.
- - in: body
- name: body
- description: Dataset update parameters.
- required: true
- schema:
- type: object
- properties:
- name:
- type: string
- description: New name of the dataset.
- avatar:
- type: string
- description: Updated base64 encoding of the avatar.
- description:
- type: string
- description: Updated description of the dataset.
- embedding_model:
- type: string
- description: Updated embedding model Name.
- permission:
- type: string
- enum: ['me', 'team']
- description: Updated dataset permission.
- chunk_method:
- type: string
- enum: ["naive", "book", "email", "laws", "manual", "one", "paper",
- "picture", "presentation", "qa", "table", "tag"
- ]
- description: Updated chunking method.
- pagerank:
- type: integer
- description: Updated page rank.
- parser_config:
- type: object
- description: Updated parser configuration.
- responses:
- 200:
- description: Successful operation.
- schema:
- type: object
- """
- # Field name transformations during model dump:
- # | Original | Dump Output |
- # |----------------|-------------|
- # | embedding_model| embd_id |
- # | chunk_method | parser_id |
- extras = {"dataset_id": dataset_id}
- req, err = validate_and_parse_json_request(request, UpdateDatasetReq, extras=extras, exclude_unset=True)
- if err is not None:
- return get_error_argument_result(err)
-
- if not req:
- return get_error_argument_result(message="No properties were modified")
-
- try:
- kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id)
- if kb is None:
- return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'")
-
- if req.get("parser_config"):
- req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"])
-
- if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id:
- if not req.get("parser_config"):
- req["parser_config"] = get_parser_config(chunk_method, None)
- elif "parser_config" in req and not req["parser_config"]:
- del req["parser_config"]
-
- if "name" in req and req["name"].lower() != kb.name.lower():
- exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value)
- if exists:
- return get_error_data_result(message=f"Dataset name '{req['name']}' already exists")
-
- if "embd_id" in req:
- if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id:
- return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}")
- ok, err = verify_embedding_availability(req["embd_id"], tenant_id)
- if not ok:
- return err
-
- if "pagerank" in req and req["pagerank"] != kb.pagerank:
- if os.environ.get("DOC_ENGINE", "elasticsearch") == "infinity":
- return get_error_argument_result(message="'pagerank' can only be set when doc_engine is elasticsearch")
-
- if req["pagerank"] > 0:
- settings.docStoreConn.update({"kb_id": kb.id}, {PAGERANK_FLD: req["pagerank"]}, search.index_name(kb.tenant_id), kb.id)
- else:
- # Elasticsearch requires PAGERANK_FLD be non-zero!
- settings.docStoreConn.update({"exists": PAGERANK_FLD}, {"remove": PAGERANK_FLD}, search.index_name(kb.tenant_id), kb.id)
-
- if not KnowledgebaseService.update_by_id(kb.id, req):
- return get_error_data_result(message="Update dataset error.(Database error)")
-
- ok, k = KnowledgebaseService.get_by_id(kb.id)
- if not ok:
- return get_error_data_result(message="Dataset created failed")
-
- response_data = remap_dictionary_keys(k.to_dict())
- return get_result(data=response_data)
- except OperationalError as e:
- logging.exception(e)
- return get_error_data_result(message="Database operation failed")
-
-
- @manager.route("/datasets", methods=["GET"]) # noqa: F821
- @token_required
- def list_datasets(tenant_id):
- """
- List datasets.
- ---
- tags:
- - Datasets
- security:
- - ApiKeyAuth: []
- parameters:
- - in: query
- name: id
- type: string
- required: false
- description: Dataset ID to filter.
- - in: query
- name: name
- type: string
- required: false
- description: Dataset name to filter.
- - in: query
- name: page
- type: integer
- required: false
- default: 1
- description: Page number.
- - in: query
- name: page_size
- type: integer
- required: false
- default: 30
- description: Number of items per page.
- - in: query
- name: orderby
- type: string
- required: false
- default: "create_time"
- description: Field to order by.
- - in: query
- name: desc
- type: boolean
- required: false
- default: true
- description: Order in descending.
- - in: header
- name: Authorization
- type: string
- required: true
- description: Bearer token for authentication.
- responses:
- 200:
- description: Successful operation.
- schema:
- type: array
- items:
- type: object
- """
- args, err = validate_and_parse_request_args(request, ListDatasetReq)
- if err is not None:
- return get_error_argument_result(err)
-
- try:
- kb_id = request.args.get("id")
- name = args.get("name")
- if kb_id:
- kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id)
-
- if not kbs:
- return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'")
- if name:
- kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id)
- if not kbs:
- return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'")
-
- tenants = TenantService.get_joined_tenants_by_user_id(tenant_id)
- kbs = KnowledgebaseService.get_list(
- [m["tenant_id"] for m in tenants],
- tenant_id,
- args["page"],
- args["page_size"],
- args["orderby"],
- args["desc"],
- kb_id,
- name,
- )
-
- response_data_list = []
- for kb in kbs:
- response_data_list.append(remap_dictionary_keys(kb))
- return get_result(data=response_data_list)
- except OperationalError as e:
- logging.exception(e)
- return get_error_data_result(message="Database operation failed")
|