浏览代码

more assert (#24996)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
tags/2.0.0-beta.2^2
Asuka Minato 1 个月前
父节点
当前提交
16a3e21410
没有帐户链接到提交者的电子邮件

+ 6
- 3
api/controllers/console/billing/billing.py 查看文件

from flask_login import current_user
from flask_restx import Resource, reqparse from flask_restx import Resource, reqparse


from controllers.console import api from controllers.console import api
from controllers.console.wraps import account_initialization_required, only_edition_cloud, setup_required from controllers.console.wraps import account_initialization_required, only_edition_cloud, setup_required
from libs.login import login_required
from libs.login import current_user, login_required
from models.model import Account
from services.billing_service import BillingService from services.billing_service import BillingService




parser.add_argument("plan", type=str, required=True, location="args", choices=["professional", "team"]) parser.add_argument("plan", type=str, required=True, location="args", choices=["professional", "team"])
parser.add_argument("interval", type=str, required=True, location="args", choices=["month", "year"]) parser.add_argument("interval", type=str, required=True, location="args", choices=["month", "year"])
args = parser.parse_args() args = parser.parse_args()
assert isinstance(current_user, Account)


BillingService.is_tenant_owner_or_admin(current_user) BillingService.is_tenant_owner_or_admin(current_user)
assert current_user.current_tenant_id is not None
return BillingService.get_subscription( return BillingService.get_subscription(
args["plan"], args["interval"], current_user.email, current_user.current_tenant_id args["plan"], args["interval"], current_user.email, current_user.current_tenant_id
) )
@account_initialization_required @account_initialization_required
@only_edition_cloud @only_edition_cloud
def get(self): def get(self):
assert isinstance(current_user, Account)
BillingService.is_tenant_owner_or_admin(current_user) BillingService.is_tenant_owner_or_admin(current_user)
assert current_user.current_tenant_id is not None
return BillingService.get_invoices(current_user.email, current_user.current_tenant_id) return BillingService.get_invoices(current_user.email, current_user.current_tenant_id)





+ 3
- 2
api/services/agent_service.py 查看文件

from typing import Any, Optional from typing import Any, Optional


import pytz import pytz
from flask_login import current_user


import contexts import contexts
from core.app.app_config.easy_ui_based_app.agent.manager import AgentConfigManager from core.app.app_config.easy_ui_based_app.agent.manager import AgentConfigManager
from core.plugin.impl.exc import PluginDaemonClientSideError from core.plugin.impl.exc import PluginDaemonClientSideError
from core.tools.tool_manager import ToolManager from core.tools.tool_manager import ToolManager
from extensions.ext_database import db from extensions.ext_database import db
from libs.login import current_user
from models.account import Account from models.account import Account
from models.model import App, Conversation, EndUser, Message, MessageAgentThought from models.model import App, Conversation, EndUser, Message, MessageAgentThought


executor = executor.name executor = executor.name
else: else:
executor = "Unknown" executor = "Unknown"

assert isinstance(current_user, Account)
assert current_user.timezone is not None
timezone = pytz.timezone(current_user.timezone) timezone = pytz.timezone(current_user.timezone)


app_model_config = app_model.app_model_config app_model_config = app_model.app_model_config

+ 30
- 1
api/services/annotation_service.py 查看文件

from typing import Optional from typing import Optional


import pandas as pd import pandas as pd
from flask_login import current_user
from sqlalchemy import or_, select from sqlalchemy import or_, select
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import NotFound from werkzeug.exceptions import NotFound
from extensions.ext_database import db from extensions.ext_database import db
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from libs.login import current_user
from models.account import Account
from models.model import App, AppAnnotationHitHistory, AppAnnotationSetting, Message, MessageAnnotation from models.model import App, AppAnnotationHitHistory, AppAnnotationSetting, Message, MessageAnnotation
from services.feature_service import FeatureService from services.feature_service import FeatureService
from tasks.annotation.add_annotation_to_index_task import add_annotation_to_index_task from tasks.annotation.add_annotation_to_index_task import add_annotation_to_index_task
@classmethod @classmethod
def up_insert_app_annotation_from_message(cls, args: dict, app_id: str) -> MessageAnnotation: def up_insert_app_annotation_from_message(cls, args: dict, app_id: str) -> MessageAnnotation:
# get app info # get app info
assert isinstance(current_user, Account)
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
db.session.commit() db.session.commit()
# if annotation reply is enabled , add annotation to index # if annotation reply is enabled , add annotation to index
annotation_setting = db.session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app_id).first() annotation_setting = db.session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app_id).first()
assert current_user.current_tenant_id is not None
if annotation_setting: if annotation_setting:
add_annotation_to_index_task.delay( add_annotation_to_index_task.delay(
annotation.id, annotation.id,
enable_app_annotation_job_key = f"enable_app_annotation_job_{str(job_id)}" enable_app_annotation_job_key = f"enable_app_annotation_job_{str(job_id)}"
# send batch add segments task # send batch add segments task
redis_client.setnx(enable_app_annotation_job_key, "waiting") redis_client.setnx(enable_app_annotation_job_key, "waiting")
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
enable_annotation_reply_task.delay( enable_annotation_reply_task.delay(
str(job_id), str(job_id),
app_id, app_id,


@classmethod @classmethod
def disable_app_annotation(cls, app_id: str): def disable_app_annotation(cls, app_id: str):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
disable_app_annotation_key = f"disable_app_annotation_{str(app_id)}" disable_app_annotation_key = f"disable_app_annotation_{str(app_id)}"
cache_result = redis_client.get(disable_app_annotation_key) cache_result = redis_client.get(disable_app_annotation_key)
if cache_result is not None: if cache_result is not None:
@classmethod @classmethod
def get_annotation_list_by_app_id(cls, app_id: str, page: int, limit: int, keyword: str): def get_annotation_list_by_app_id(cls, app_id: str, page: int, limit: int, keyword: str):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def export_annotation_list_by_app_id(cls, app_id: str): def export_annotation_list_by_app_id(cls, app_id: str):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def insert_app_annotation_directly(cls, args: dict, app_id: str) -> MessageAnnotation: def insert_app_annotation_directly(cls, args: dict, app_id: str) -> MessageAnnotation:
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def update_app_annotation_directly(cls, args: dict, app_id: str, annotation_id: str): def update_app_annotation_directly(cls, args: dict, app_id: str, annotation_id: str):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def delete_app_annotation(cls, app_id: str, annotation_id: str): def delete_app_annotation(cls, app_id: str, annotation_id: str):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def delete_app_annotations_in_batch(cls, app_id: str, annotation_ids: list[str]): def delete_app_annotations_in_batch(cls, app_id: str, annotation_ids: list[str]):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
@classmethod @classmethod
def batch_import_app_annotations(cls, app_id, file: FileStorage): def batch_import_app_annotations(cls, app_id, file: FileStorage):
# get app info # get app info
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")


@classmethod @classmethod
def get_annotation_hit_histories(cls, app_id: str, annotation_id: str, page, limit): def get_annotation_hit_histories(cls, app_id: str, annotation_id: str, page, limit):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
# get app info # get app info
app = ( app = (
db.session.query(App) db.session.query(App)


@classmethod @classmethod
def get_app_annotation_setting_by_app_id(cls, app_id: str): def get_app_annotation_setting_by_app_id(cls, app_id: str):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
# get app info # get app info
app = ( app = (
db.session.query(App) db.session.query(App)


@classmethod @classmethod
def update_app_annotation_setting(cls, app_id: str, annotation_setting_id: str, args: dict): def update_app_annotation_setting(cls, app_id: str, annotation_setting_id: str, args: dict):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
# get app info # get app info
app = ( app = (
db.session.query(App) db.session.query(App)


@classmethod @classmethod
def clear_all_annotations(cls, app_id: str): def clear_all_annotations(cls, app_id: str):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
app = ( app = (
db.session.query(App) db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") .where(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")

+ 8
- 2
api/services/app_service.py 查看文件

import logging import logging
from typing import Optional, TypedDict, cast from typing import Optional, TypedDict, cast


from flask_login import current_user
from flask_sqlalchemy.pagination import Pagination from flask_sqlalchemy.pagination import Pagination


from configs import dify_config from configs import dify_config
from events.app_event import app_was_created from events.app_event import app_was_created
from extensions.ext_database import db from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from libs.login import current_user
from models.account import Account from models.account import Account
from models.model import App, AppMode, AppModelConfig, Site from models.model import App, AppMode, AppModelConfig, Site
from models.tools import ApiToolProvider from models.tools import ApiToolProvider
""" """
Get App Get App
""" """
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
# get original app model config # get original app model config
if app.mode == AppMode.AGENT_CHAT.value or app.is_agent: if app.mode == AppMode.AGENT_CHAT.value or app.is_agent:
model_config = app.app_model_config model_config = app.app_model_config
:param args: request args :param args: request args
:return: App instance :return: App instance
""" """
assert current_user is not None
app.name = args["name"] app.name = args["name"]
app.description = args["description"] app.description = args["description"]
app.icon_type = args["icon_type"] app.icon_type = args["icon_type"]
:param name: new name :param name: new name
:return: App instance :return: App instance
""" """
assert current_user is not None
app.name = name app.name = name
app.updated_by = current_user.id app.updated_by = current_user.id
app.updated_at = naive_utc_now() app.updated_at = naive_utc_now()
:param icon_background: new icon_background :param icon_background: new icon_background
:return: App instance :return: App instance
""" """
assert current_user is not None
app.icon = icon app.icon = icon
app.icon_background = icon_background app.icon_background = icon_background
app.updated_by = current_user.id app.updated_by = current_user.id
""" """
if enable_site == app.enable_site: if enable_site == app.enable_site:
return app return app
assert current_user is not None
app.enable_site = enable_site app.enable_site = enable_site
app.updated_by = current_user.id app.updated_by = current_user.id
app.updated_at = naive_utc_now() app.updated_at = naive_utc_now()
""" """
if enable_api == app.enable_api: if enable_api == app.enable_api:
return app return app
assert current_user is not None


app.enable_api = enable_api app.enable_api = enable_api
app.updated_by = current_user.id app.updated_by = current_user.id

+ 1
- 1
api/services/billing_service.py 查看文件

return response.json() return response.json()


@staticmethod @staticmethod
def is_tenant_owner_or_admin(current_user):
def is_tenant_owner_or_admin(current_user: Account):
tenant_id = current_user.current_tenant_id tenant_id = current_user.current_tenant_id


join: Optional[TenantAccountJoin] = ( join: Optional[TenantAccountJoin] = (

+ 47
- 2
api/services/dataset_service.py 查看文件

from collections import Counter from collections import Counter
from typing import Any, Literal, Optional from typing import Any, Literal, Optional


from flask_login import current_user
import sqlalchemy as sa
from sqlalchemy import exists, func, select from sqlalchemy import exists, func, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from werkzeug.exceptions import NotFound from werkzeug.exceptions import NotFound
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
from libs import helper from libs import helper
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from libs.login import current_user
from models.account import Account, TenantAccountRole from models.account import Account, TenantAccountRole
from models.dataset import ( from models.dataset import (
AppDatasetJoin, AppDatasetJoin,
data: Update data dictionary data: Update data dictionary
filtered_data: Filtered update data to modify filtered_data: Filtered update data to modify
""" """
# assert isinstance(current_user, Account) and current_user.current_tenant_id is not None
try: try:
model_manager = ModelManager() model_manager = ModelManager()
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
embedding_model = model_manager.get_model_instance( embedding_model = model_manager.get_model_instance(
tenant_id=current_user.current_tenant_id, tenant_id=current_user.current_tenant_id,
provider=data["embedding_model_provider"], provider=data["embedding_model_provider"],
data: Update data dictionary data: Update data dictionary
filtered_data: Filtered update data to modify filtered_data: Filtered update data to modify
""" """
# assert isinstance(current_user, Account) and current_user.current_tenant_id is not None

model_manager = ModelManager() model_manager = ModelManager()
try: try:
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
embedding_model = model_manager.get_model_instance( embedding_model = model_manager.get_model_instance(
tenant_id=current_user.current_tenant_id, tenant_id=current_user.current_tenant_id,
provider=data["embedding_model_provider"], provider=data["embedding_model_provider"],


@staticmethod @staticmethod
def get_dataset_auto_disable_logs(dataset_id: str): def get_dataset_auto_disable_logs(dataset_id: str):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None
features = FeatureService.get_features(current_user.current_tenant_id) features = FeatureService.get_features(current_user.current_tenant_id)
if not features.billing.enabled or features.billing.subscription.plan == "sandbox": if not features.billing.enabled or features.billing.subscription.plan == "sandbox":
return { return {


@staticmethod @staticmethod
def get_batch_documents(dataset_id: str, batch: str) -> list[Document]: def get_batch_documents(dataset_id: str, batch: str) -> list[Document]:
assert isinstance(current_user, Account)

documents = ( documents = (
db.session.query(Document) db.session.query(Document)
.where( .where(


@staticmethod @staticmethod
def rename_document(dataset_id: str, document_id: str, name: str) -> Document: def rename_document(dataset_id: str, document_id: str, name: str) -> Document:
assert isinstance(current_user, Account)

dataset = DatasetService.get_dataset(dataset_id) dataset = DatasetService.get_dataset(dataset_id)
if not dataset: if not dataset:
raise ValueError("Dataset not found.") raise ValueError("Dataset not found.")
if document.indexing_status not in {"waiting", "parsing", "cleaning", "splitting", "indexing"}: if document.indexing_status not in {"waiting", "parsing", "cleaning", "splitting", "indexing"}:
raise DocumentIndexingError() raise DocumentIndexingError()
# update document to be paused # update document to be paused
assert current_user is not None
document.is_paused = True document.is_paused = True
document.paused_by = current_user.id document.paused_by = current_user.id
document.paused_at = naive_utc_now() document.paused_at = naive_utc_now()
# check doc_form # check doc_form
DatasetService.check_doc_form(dataset, knowledge_config.doc_form) DatasetService.check_doc_form(dataset, knowledge_config.doc_form)
# check document limit # check document limit
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

features = FeatureService.get_features(current_user.current_tenant_id) features = FeatureService.get_features(current_user.current_tenant_id)


if features.billing.enabled: if features.billing.enabled:


@staticmethod @staticmethod
def get_tenant_documents_count(): def get_tenant_documents_count():
assert isinstance(current_user, Account)

documents_count = ( documents_count = (
db.session.query(Document) db.session.query(Document)
.where( .where(
dataset_process_rule: Optional[DatasetProcessRule] = None, dataset_process_rule: Optional[DatasetProcessRule] = None,
created_from: str = "web", created_from: str = "web",
): ):
assert isinstance(current_user, Account)

DatasetService.check_dataset_model_setting(dataset) DatasetService.check_dataset_model_setting(dataset)
document = DocumentService.get_document(dataset.id, document_data.original_document_id) document = DocumentService.get_document(dataset.id, document_data.original_document_id)
if document is None: if document is None:
data_source_binding = ( data_source_binding = (
db.session.query(DataSourceOauthBinding) db.session.query(DataSourceOauthBinding)
.where( .where(
db.and_(
sa.and_(
DataSourceOauthBinding.tenant_id == current_user.current_tenant_id, DataSourceOauthBinding.tenant_id == current_user.current_tenant_id,
DataSourceOauthBinding.provider == "notion", DataSourceOauthBinding.provider == "notion",
DataSourceOauthBinding.disabled == False, DataSourceOauthBinding.disabled == False,


@staticmethod @staticmethod
def save_document_without_dataset_id(tenant_id: str, knowledge_config: KnowledgeConfig, account: Account): def save_document_without_dataset_id(tenant_id: str, knowledge_config: KnowledgeConfig, account: Account):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

features = FeatureService.get_features(current_user.current_tenant_id) features = FeatureService.get_features(current_user.current_tenant_id)


if features.billing.enabled: if features.billing.enabled:


@classmethod @classmethod
def create_segment(cls, args: dict, document: Document, dataset: Dataset): def create_segment(cls, args: dict, document: Document, dataset: Dataset):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

content = args["content"] content = args["content"]
doc_id = str(uuid.uuid4()) doc_id = str(uuid.uuid4())
segment_hash = helper.generate_text_hash(content) segment_hash = helper.generate_text_hash(content)


@classmethod @classmethod
def multi_create_segment(cls, segments: list, document: Document, dataset: Dataset): def multi_create_segment(cls, segments: list, document: Document, dataset: Dataset):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

lock_name = f"multi_add_segment_lock_document_id_{document.id}" lock_name = f"multi_add_segment_lock_document_id_{document.id}"
increment_word_count = 0 increment_word_count = 0
with redis_client.lock(lock_name, timeout=600): with redis_client.lock(lock_name, timeout=600):


@classmethod @classmethod
def update_segment(cls, args: SegmentUpdateArgs, segment: DocumentSegment, document: Document, dataset: Dataset): def update_segment(cls, args: SegmentUpdateArgs, segment: DocumentSegment, document: Document, dataset: Dataset):
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

indexing_cache_key = f"segment_{segment.id}_indexing" indexing_cache_key = f"segment_{segment.id}_indexing"
cache_result = redis_client.get(indexing_cache_key) cache_result = redis_client.get(indexing_cache_key)
if cache_result is not None: if cache_result is not None:


@classmethod @classmethod
def delete_segments(cls, segment_ids: list, document: Document, dataset: Dataset): def delete_segments(cls, segment_ids: list, document: Document, dataset: Dataset):
assert isinstance(current_user, Account)
segments = ( segments = (
db.session.query(DocumentSegment.index_node_id, DocumentSegment.word_count) db.session.query(DocumentSegment.index_node_id, DocumentSegment.word_count)
.where( .where(
def update_segments_status( def update_segments_status(
cls, segment_ids: list, action: Literal["enable", "disable"], dataset: Dataset, document: Document cls, segment_ids: list, action: Literal["enable", "disable"], dataset: Dataset, document: Document
): ):
assert current_user is not None

# Check if segment_ids is not empty to avoid WHERE false condition # Check if segment_ids is not empty to avoid WHERE false condition
if not segment_ids or len(segment_ids) == 0: if not segment_ids or len(segment_ids) == 0:
return return
def create_child_chunk( def create_child_chunk(
cls, content: str, segment: DocumentSegment, document: Document, dataset: Dataset cls, content: str, segment: DocumentSegment, document: Document, dataset: Dataset
) -> ChildChunk: ) -> ChildChunk:
assert isinstance(current_user, Account)

lock_name = f"add_child_lock_{segment.id}" lock_name = f"add_child_lock_{segment.id}"
with redis_client.lock(lock_name, timeout=20): with redis_client.lock(lock_name, timeout=20):
index_node_id = str(uuid.uuid4()) index_node_id = str(uuid.uuid4())
document: Document, document: Document,
dataset: Dataset, dataset: Dataset,
) -> list[ChildChunk]: ) -> list[ChildChunk]:
assert isinstance(current_user, Account)

child_chunks = ( child_chunks = (
db.session.query(ChildChunk) db.session.query(ChildChunk)
.where( .where(
document: Document, document: Document,
dataset: Dataset, dataset: Dataset,
) -> ChildChunk: ) -> ChildChunk:
assert current_user is not None

try: try:
child_chunk.content = content child_chunk.content = content
child_chunk.word_count = len(content) child_chunk.word_count = len(content)
def get_child_chunks( def get_child_chunks(
cls, segment_id: str, document_id: str, dataset_id: str, page: int, limit: int, keyword: Optional[str] = None cls, segment_id: str, document_id: str, dataset_id: str, page: int, limit: int, keyword: Optional[str] = None
): ):
assert isinstance(current_user, Account)

query = ( query = (
select(ChildChunk) select(ChildChunk)
.filter_by( .filter_by(

+ 4
- 1
api/services/file_service.py 查看文件

import uuid import uuid
from typing import Any, Literal, Union from typing import Any, Literal, Union


from flask_login import current_user
from werkzeug.exceptions import NotFound from werkzeug.exceptions import NotFound


from configs import dify_config from configs import dify_config
from extensions.ext_storage import storage from extensions.ext_storage import storage
from libs.datetime_utils import naive_utc_now from libs.datetime_utils import naive_utc_now
from libs.helper import extract_tenant_id from libs.helper import extract_tenant_id
from libs.login import current_user
from models.account import Account from models.account import Account
from models.enums import CreatorUserRole from models.enums import CreatorUserRole
from models.model import EndUser, UploadFile from models.model import EndUser, UploadFile


@staticmethod @staticmethod
def upload_text(text: str, text_name: str) -> UploadFile: def upload_text(text: str, text_name: str) -> UploadFile:
assert isinstance(current_user, Account)
assert current_user.current_tenant_id is not None

if len(text_name) > 200: if len(text_name) > 200:
text_name = text_name[:200] text_name = text_name[:200]
# user uuid as file name # user uuid as file name

+ 3
- 2
api/tests/test_containers_integration_tests/services/test_agent_service.py 查看文件

import json import json
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker


from core.plugin.impl.exc import PluginDaemonClientSideError from core.plugin.impl.exc import PluginDaemonClientSideError
from models.account import Account
from models.model import AppModelConfig, Conversation, EndUser, Message, MessageAgentThought from models.model import AppModelConfig, Conversation, EndUser, Message, MessageAgentThought
from services.account_service import AccountService, TenantService from services.account_service import AccountService, TenantService
from services.agent_service import AgentService from services.agent_service import AgentService
patch("services.agent_service.PluginAgentClient") as mock_plugin_agent_client, patch("services.agent_service.PluginAgentClient") as mock_plugin_agent_client,
patch("services.agent_service.ToolManager") as mock_tool_manager, patch("services.agent_service.ToolManager") as mock_tool_manager,
patch("services.agent_service.AgentConfigManager") as mock_agent_config_manager, patch("services.agent_service.AgentConfigManager") as mock_agent_config_manager,
patch("services.agent_service.current_user") as mock_current_user,
patch("services.agent_service.current_user", create_autospec(Account, instance=True)) as mock_current_user,
patch("services.app_service.FeatureService") as mock_feature_service, patch("services.app_service.FeatureService") as mock_feature_service,
patch("services.app_service.EnterpriseService") as mock_enterprise_service, patch("services.app_service.EnterpriseService") as mock_enterprise_service,
patch("services.app_service.ModelManager") as mock_model_manager, patch("services.app_service.ModelManager") as mock_model_manager,

+ 5
- 2
api/tests/test_containers_integration_tests/services/test_annotation_service.py 查看文件

from unittest.mock import patch
from unittest.mock import create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker
from werkzeug.exceptions import NotFound from werkzeug.exceptions import NotFound


from models.account import Account
from models.model import MessageAnnotation from models.model import MessageAnnotation
from services.annotation_service import AppAnnotationService from services.annotation_service import AppAnnotationService
from services.app_service import AppService from services.app_service import AppService
patch("services.annotation_service.enable_annotation_reply_task") as mock_enable_task, patch("services.annotation_service.enable_annotation_reply_task") as mock_enable_task,
patch("services.annotation_service.disable_annotation_reply_task") as mock_disable_task, patch("services.annotation_service.disable_annotation_reply_task") as mock_disable_task,
patch("services.annotation_service.batch_import_annotations_task") as mock_batch_import_task, patch("services.annotation_service.batch_import_annotations_task") as mock_batch_import_task,
patch("services.annotation_service.current_user") as mock_current_user,
patch(
"services.annotation_service.current_user", create_autospec(Account, instance=True)
) as mock_current_user,
): ):
# Setup default mock returns # Setup default mock returns
mock_account_feature_service.get_features.return_value.billing.enabled = False mock_account_feature_service.get_features.return_value.billing.enabled = False

+ 36
- 10
api/tests/test_containers_integration_tests/services/test_app_service.py 查看文件

from unittest.mock import patch
from unittest.mock import create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker


from constants.model_template import default_app_templates from constants.model_template import default_app_templates
from models.account import Account
from models.model import App, Site from models.model import App, Site
from services.account_service import AccountService, TenantService from services.account_service import AccountService, TenantService
from services.app_service import AppService from services.app_service import AppService
app_service = AppService() app_service = AppService()
created_app = app_service.create_app(tenant.id, app_args, account) created_app = app_service.create_app(tenant.id, app_args, account)


# Get app using the service
retrieved_app = app_service.get_app(created_app)
# Get app using the service - needs current_user mock
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
retrieved_app = app_service.get_app(created_app)


# Verify retrieved app matches created app # Verify retrieved app matches created app
assert retrieved_app.id == created_app.id assert retrieved_app.id == created_app.id
"use_icon_as_answer_icon": True, "use_icon_as_answer_icon": True,
} }


with patch("flask_login.utils._get_user", return_value=account):
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app(app, update_args) updated_app = app_service.update_app(app, update_args)


# Verify updated fields # Verify updated fields


# Update app name # Update app name
new_name = "New App Name" new_name = "New App Name"
with patch("flask_login.utils._get_user", return_value=account):
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_name(app, new_name) updated_app = app_service.update_app_name(app, new_name)


assert updated_app.name == new_name assert updated_app.name == new_name
# Update app icon # Update app icon
new_icon = "🌟" new_icon = "🌟"
new_icon_background = "#FFD93D" new_icon_background = "#FFD93D"
with patch("flask_login.utils._get_user", return_value=account):
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_icon(app, new_icon, new_icon_background) updated_app = app_service.update_app_icon(app, new_icon, new_icon_background)


assert updated_app.icon == new_icon assert updated_app.icon == new_icon
original_site_status = app.enable_site original_site_status = app.enable_site


# Update site status to disabled # Update site status to disabled
with patch("flask_login.utils._get_user", return_value=account):
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_site_status(app, False) updated_app = app_service.update_app_site_status(app, False)
assert updated_app.enable_site is False assert updated_app.enable_site is False
assert updated_app.updated_by == account.id assert updated_app.updated_by == account.id


# Update site status back to enabled # Update site status back to enabled
with patch("flask_login.utils._get_user", return_value=account):
with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_site_status(updated_app, True) updated_app = app_service.update_app_site_status(updated_app, True)
assert updated_app.enable_site is True assert updated_app.enable_site is True
assert updated_app.updated_by == account.id assert updated_app.updated_by == account.id
original_api_status = app.enable_api original_api_status = app.enable_api


# Update API status to disabled # Update API status to disabled
with patch("flask_login.utils._get_user", return_value=account):
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.id = account.id
mock_current_user.current_tenant_id = account.current_tenant_id

with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_api_status(app, False) updated_app = app_service.update_app_api_status(app, False)
assert updated_app.enable_api is False assert updated_app.enable_api is False
assert updated_app.updated_by == account.id assert updated_app.updated_by == account.id


# Update API status back to enabled # Update API status back to enabled
with patch("flask_login.utils._get_user", return_value=account):
with patch("services.app_service.current_user", mock_current_user):
updated_app = app_service.update_app_api_status(updated_app, True) updated_app = app_service.update_app_api_status(updated_app, True)
assert updated_app.enable_api is True assert updated_app.enable_api is True
assert updated_app.updated_by == account.id assert updated_app.updated_by == account.id

+ 16
- 13
api/tests/test_containers_integration_tests/services/test_file_service.py 查看文件

import hashlib import hashlib
from io import BytesIO from io import BytesIO
from unittest.mock import patch
from unittest.mock import create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker
text = "This is a test text content" text = "This is a test text content"
text_name = "test_text.txt" text_name = "test_text.txt"


# Mock current_user
with patch("services.file_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())
# Mock current_user using create_autospec
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())


with patch("services.file_service.current_user", mock_current_user):
upload_file = FileService.upload_text(text=text, text_name=text_name) upload_file = FileService.upload_text(text=text, text_name=text_name)


assert upload_file is not None assert upload_file is not None
text = "test content" text = "test content"
long_name = "a" * 250 # Longer than 200 characters long_name = "a" * 250 # Longer than 200 characters


# Mock current_user
with patch("services.file_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())
# Mock current_user using create_autospec
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())


with patch("services.file_service.current_user", mock_current_user):
upload_file = FileService.upload_text(text=text, text_name=long_name) upload_file = FileService.upload_text(text=text, text_name=long_name)


# Verify name was truncated # Verify name was truncated
text = "" text = ""
text_name = "empty.txt" text_name = "empty.txt"


# Mock current_user
with patch("services.file_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())
# Mock current_user using create_autospec
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = str(fake.uuid4())
mock_current_user.id = str(fake.uuid4())


with patch("services.file_service.current_user", mock_current_user):
upload_file = FileService.upload_text(text=text, text_name=text_name) upload_file = FileService.upload_text(text=text, text_name=text_name)


assert upload_file is not None assert upload_file is not None

+ 4
- 2
api/tests/test_containers_integration_tests/services/test_metadata_service.py 查看文件

from unittest.mock import patch
from unittest.mock import create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker
def mock_external_service_dependencies(self): def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies.""" """Mock setup for external service dependencies."""
with ( with (
patch("services.metadata_service.current_user") as mock_current_user,
patch(
"services.metadata_service.current_user", create_autospec(Account, instance=True)
) as mock_current_user,
patch("services.metadata_service.redis_client") as mock_redis_client, patch("services.metadata_service.redis_client") as mock_redis_client,
patch("services.dataset_service.DocumentService") as mock_document_service, patch("services.dataset_service.DocumentService") as mock_document_service,
): ):

+ 2
- 2
api/tests/test_containers_integration_tests/services/test_tag_service.py 查看文件

from unittest.mock import patch
from unittest.mock import create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker
def mock_external_service_dependencies(self): def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies.""" """Mock setup for external service dependencies."""
with ( with (
patch("services.tag_service.current_user") as mock_current_user,
patch("services.tag_service.current_user", create_autospec(Account, instance=True)) as mock_current_user,
): ):
# Setup default mock returns # Setup default mock returns
mock_current_user.current_tenant_id = "test-tenant-id" mock_current_user.current_tenant_id = "test-tenant-id"

+ 40
- 27
api/tests/test_containers_integration_tests/services/test_website_service.py 查看文件

from datetime import datetime from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, create_autospec, patch


import pytest import pytest
from faker import Faker from faker import Faker
fake = Faker() fake = Faker()


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="firecrawl", provider="firecrawl",
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="watercrawl", provider="watercrawl",
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request for single page crawling # Create API request for single page crawling
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="jinareader", provider="jinareader",
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request with invalid provider # Create API request with invalid provider
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="invalid_provider", provider="invalid_provider",
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlStatusApiRequest(provider="firecrawl", job_id="test_job_id_123") api_request = WebsiteCrawlStatusApiRequest(provider="firecrawl", job_id="test_job_id_123")


account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlStatusApiRequest(provider="watercrawl", job_id="watercrawl_job_123") api_request = WebsiteCrawlStatusApiRequest(provider="watercrawl", job_id="watercrawl_job_123")


account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlStatusApiRequest(provider="jinareader", job_id="jina_job_123") api_request = WebsiteCrawlStatusApiRequest(provider="jinareader", job_id="jina_job_123")


account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request with invalid provider # Create API request with invalid provider
api_request = WebsiteCrawlStatusApiRequest(provider="invalid_provider", job_id="test_job_id_123") api_request = WebsiteCrawlStatusApiRequest(provider="invalid_provider", job_id="test_job_id_123")


account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Mock missing credentials # Mock missing credentials
mock_external_service_dependencies["api_key_auth_service"].get_auth_credentials.return_value = None mock_external_service_dependencies["api_key_auth_service"].get_auth_credentials.return_value = None


account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Mock missing API key in config # Mock missing API key in config
mock_external_service_dependencies["api_key_auth_service"].get_auth_credentials.return_value = { mock_external_service_dependencies["api_key_auth_service"].get_auth_credentials.return_value = {
"config": {"base_url": "https://api.example.com"} "config": {"base_url": "https://api.example.com"}
account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies) account = self._create_test_account(db_session_with_containers, mock_external_service_dependencies)


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request for sub-page crawling # Create API request for sub-page crawling
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="jinareader", provider="jinareader",
mock_external_service_dependencies["requests"].get.return_value = mock_failed_response mock_external_service_dependencies["requests"].get.return_value = mock_failed_response


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlApiRequest( api_request = WebsiteCrawlApiRequest(
provider="jinareader", provider="jinareader",
mock_external_service_dependencies["firecrawl_app"].return_value = mock_firecrawl_instance mock_external_service_dependencies["firecrawl_app"].return_value = mock_firecrawl_instance


# Mock current_user for the test # Mock current_user for the test
with patch("services.website_service.current_user") as mock_current_user:
mock_current_user.current_tenant_id = account.current_tenant.id
mock_current_user = create_autospec(Account, instance=True)
mock_current_user.current_tenant_id = account.current_tenant.id


with patch("services.website_service.current_user", mock_current_user):
# Create API request # Create API request
api_request = WebsiteCrawlStatusApiRequest(provider="firecrawl", job_id="active_job_123") api_request = WebsiteCrawlStatusApiRequest(provider="firecrawl", job_id="active_job_123")



+ 6
- 3
api/tests/unit_tests/services/test_dataset_service_update_dataset.py 查看文件

from typing import Any, Optional from typing import Any, Optional


# Mock redis_client before importing dataset_service # Mock redis_client before importing dataset_service
from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec, patch


import pytest import pytest


from core.model_runtime.entities.model_entities import ModelType from core.model_runtime.entities.model_entities import ModelType
from models.account import Account
from models.dataset import Dataset, ExternalKnowledgeBindings from models.dataset import Dataset, ExternalKnowledgeBindings
from services.dataset_service import DatasetService from services.dataset_service import DatasetService
from services.errors.account import NoPermissionError from services.errors.account import NoPermissionError
@staticmethod @staticmethod
def create_current_user_mock(tenant_id: str = "tenant-123") -> Mock: def create_current_user_mock(tenant_id: str = "tenant-123") -> Mock:
"""Create a mock current user.""" """Create a mock current user."""
current_user = Mock()
current_user = create_autospec(Account, instance=True)
current_user.current_tenant_id = tenant_id current_user.current_tenant_id = tenant_id
return current_user return current_user


"services.dataset_service.DatasetCollectionBindingService.get_dataset_collection_binding" "services.dataset_service.DatasetCollectionBindingService.get_dataset_collection_binding"
) as mock_get_binding, ) as mock_get_binding,
patch("services.dataset_service.deal_dataset_vector_index_task") as mock_task, patch("services.dataset_service.deal_dataset_vector_index_task") as mock_task,
patch("services.dataset_service.current_user") as mock_current_user,
patch(
"services.dataset_service.current_user", create_autospec(Account, instance=True)
) as mock_current_user,
): ):
mock_current_user.current_tenant_id = "tenant-123" mock_current_user.current_tenant_id = "tenant-123"
yield { yield {

+ 10
- 7
api/tests/unit_tests/services/test_metadata_bug_complete.py 查看文件

from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec, patch


import pytest import pytest
from flask_restx import reqparse from flask_restx import reqparse
from werkzeug.exceptions import BadRequest from werkzeug.exceptions import BadRequest


from models.account import Account
from services.entities.knowledge_entities.knowledge_entities import MetadataArgs from services.entities.knowledge_entities.knowledge_entities import MetadataArgs
from services.metadata_service import MetadataService from services.metadata_service import MetadataService


mock_metadata_args.name = None mock_metadata_args.name = None
mock_metadata_args.type = "string" mock_metadata_args.type = "string"


with patch("services.metadata_service.current_user") as mock_user:
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"
mock_user = create_autospec(Account, instance=True)
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"


with patch("services.metadata_service.current_user", mock_user):
# Should crash with TypeError # Should crash with TypeError
with pytest.raises(TypeError, match="object of type 'NoneType' has no len"): with pytest.raises(TypeError, match="object of type 'NoneType' has no len"):
MetadataService.create_metadata("dataset-123", mock_metadata_args) MetadataService.create_metadata("dataset-123", mock_metadata_args)


# Test update method as well # Test update method as well
with patch("services.metadata_service.current_user") as mock_user:
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"
mock_user = create_autospec(Account, instance=True)
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"


with patch("services.metadata_service.current_user", mock_user):
with pytest.raises(TypeError, match="object of type 'NoneType' has no len"): with pytest.raises(TypeError, match="object of type 'NoneType' has no len"):
MetadataService.update_metadata_name("dataset-123", "metadata-456", None) MetadataService.update_metadata_name("dataset-123", "metadata-456", None)



+ 14
- 10
api/tests/unit_tests/services/test_metadata_nullable_bug.py 查看文件

from unittest.mock import Mock, patch
from unittest.mock import Mock, create_autospec, patch


import pytest import pytest
from flask_restx import reqparse from flask_restx import reqparse


from models.account import Account
from services.entities.knowledge_entities.knowledge_entities import MetadataArgs from services.entities.knowledge_entities.knowledge_entities import MetadataArgs
from services.metadata_service import MetadataService from services.metadata_service import MetadataService


mock_metadata_args.name = None # This will cause len() to crash mock_metadata_args.name = None # This will cause len() to crash
mock_metadata_args.type = "string" mock_metadata_args.type = "string"


with patch("services.metadata_service.current_user") as mock_user:
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"
mock_user = create_autospec(Account, instance=True)
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"


with patch("services.metadata_service.current_user", mock_user):
# This should crash with TypeError when calling len(None) # This should crash with TypeError when calling len(None)
with pytest.raises(TypeError, match="object of type 'NoneType' has no len"): with pytest.raises(TypeError, match="object of type 'NoneType' has no len"):
MetadataService.create_metadata("dataset-123", mock_metadata_args) MetadataService.create_metadata("dataset-123", mock_metadata_args)


def test_metadata_service_update_with_none_name_crashes(self): def test_metadata_service_update_with_none_name_crashes(self):
"""Test that MetadataService.update_metadata_name crashes when name is None.""" """Test that MetadataService.update_metadata_name crashes when name is None."""
with patch("services.metadata_service.current_user") as mock_user:
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"
mock_user = create_autospec(Account, instance=True)
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"


with patch("services.metadata_service.current_user", mock_user):
# This should crash with TypeError when calling len(None) # This should crash with TypeError when calling len(None)
with pytest.raises(TypeError, match="object of type 'NoneType' has no len"): with pytest.raises(TypeError, match="object of type 'NoneType' has no len"):
MetadataService.update_metadata_name("dataset-123", "metadata-456", None) MetadataService.update_metadata_name("dataset-123", "metadata-456", None)
mock_metadata_args.name = None # From args["name"] mock_metadata_args.name = None # From args["name"]
mock_metadata_args.type = None # From args["type"] mock_metadata_args.type = None # From args["type"]


with patch("services.metadata_service.current_user") as mock_user:
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"
mock_user = create_autospec(Account, instance=True)
mock_user.current_tenant_id = "tenant-123"
mock_user.id = "user-456"


with patch("services.metadata_service.current_user", mock_user):
# Step 4: Service layer crashes on len(None) # Step 4: Service layer crashes on len(None)
with pytest.raises(TypeError, match="object of type 'NoneType' has no len"): with pytest.raises(TypeError, match="object of type 'NoneType' has no len"):
MetadataService.create_metadata("dataset-123", mock_metadata_args) MetadataService.create_metadata("dataset-123", mock_metadata_args)

正在加载...
取消
保存