The `ChatMessageApi` (`POST /console/api/apps/{app_id}/chat-messages`) and
`ModelConfigResource` (`POST /console/api/apps/{app_id}/model-config`)
endpoints do not properly validate user permissions, allowing users without `editor`
permission to access restricted functionality.
This PR addresses this issue by adding proper permission check.
tags/1.9.0
| from flask import request | from flask import request | ||||
| from flask_restx import Resource, reqparse | from flask_restx import Resource, reqparse | ||||
| from werkzeug.exceptions import InternalServerError, NotFound | |||||
| from werkzeug.exceptions import Forbidden, InternalServerError, NotFound | |||||
| import services | import services | ||||
| from controllers.console import api | from controllers.console import api | ||||
| @account_initialization_required | @account_initialization_required | ||||
| @get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT]) | @get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT]) | ||||
| def post(self, app_model): | def post(self, app_model): | ||||
| if not isinstance(current_user, Account): | |||||
| raise Forbidden() | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | |||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| parser.add_argument("inputs", type=dict, required=True, location="json") | parser.add_argument("inputs", type=dict, required=True, location="json") | ||||
| parser.add_argument("query", type=str, required=True, location="json") | parser.add_argument("query", type=str, required=True, location="json") |
| def post(self, app_model): | def post(self, app_model): | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() |
| from typing import cast | from typing import cast | ||||
| from flask import request | from flask import request | ||||
| from flask_login import current_user | |||||
| from flask_restx import Resource | from flask_restx import Resource | ||||
| from werkzeug.exceptions import Forbidden | |||||
| from controllers.console import api | from controllers.console import api | ||||
| from controllers.console.app.wraps import get_app_model | from controllers.console.app.wraps import get_app_model | ||||
| from core.tools.utils.configuration import ToolParameterConfigurationManager | from core.tools.utils.configuration import ToolParameterConfigurationManager | ||||
| from events.app_event import app_model_config_was_updated | from events.app_event import app_model_config_was_updated | ||||
| from extensions.ext_database import db | from extensions.ext_database import db | ||||
| from libs.login import login_required | |||||
| from libs.login import current_user, login_required | |||||
| from models.account import Account | |||||
| from models.model import AppMode, AppModelConfig | from models.model import AppMode, AppModelConfig | ||||
| from services.app_model_config_service import AppModelConfigService | from services.app_model_config_service import AppModelConfigService | ||||
| @get_app_model(mode=[AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]) | @get_app_model(mode=[AppMode.AGENT_CHAT, AppMode.CHAT, AppMode.COMPLETION]) | ||||
| def post(self, app_model): | def post(self, app_model): | ||||
| """Modify app model config""" | """Modify app model config""" | ||||
| if not isinstance(current_user, Account): | |||||
| raise Forbidden() | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | |||||
| assert current_user.current_tenant_id is not None, "The tenant information should be loaded." | |||||
| # validate config | # validate config | ||||
| model_configuration = AppModelConfigService.validate_configuration( | model_configuration = AppModelConfigService.validate_configuration( | ||||
| tenant_id=current_user.current_tenant_id, | tenant_id=current_user.current_tenant_id, |
| """ | """ | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| # fetch draft workflow by app_model | # fetch draft workflow by app_model | ||||
| """ | """ | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| content_type = request.headers.get("Content-Type", "") | content_type = request.headers.get("Content-Type", "") | ||||
| """ | """ | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id) | AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id) | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| # fetch published workflow by app_model | # fetch published workflow by app_model | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| # Get default block configs | # Get default block configs | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # The role of the current user in the ta table must be admin, owner, or editor | # The role of the current user in the ta table must be admin, owner, or editor | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| if request.data: | if request.data: | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # Check permission | # Check permission | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| parser = reqparse.RequestParser() | parser = reqparse.RequestParser() | ||||
| if not isinstance(current_user, Account): | if not isinstance(current_user, Account): | ||||
| raise Forbidden() | raise Forbidden() | ||||
| # Check permission | # Check permission | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| workflow_service = WorkflowService() | workflow_service = WorkflowService() |
| @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) | @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) | ||||
| def wrapper(*args, **kwargs): | def wrapper(*args, **kwargs): | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| return f(*args, **kwargs) | return f(*args, **kwargs) | ||||
| def put(self, app_model: App, annotation_id): | def put(self, app_model: App, annotation_id): | ||||
| """Update an existing annotation.""" | """Update an existing annotation.""" | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| annotation_id = str(annotation_id) | annotation_id = str(annotation_id) | ||||
| """Delete an annotation.""" | """Delete an annotation.""" | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| annotation_id = str(annotation_id) | annotation_id = str(annotation_id) |
| def post(self, _, dataset_id): | def post(self, _, dataset_id): | ||||
| """Add a knowledge type tag.""" | """Add a knowledge type tag.""" | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not (current_user.is_editor or current_user.is_dataset_editor): | |||||
| if not (current_user.has_edit_permission or current_user.is_dataset_editor): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| args = tag_create_parser.parse_args() | args = tag_create_parser.parse_args() | ||||
| @validate_dataset_token | @validate_dataset_token | ||||
| def patch(self, _, dataset_id): | def patch(self, _, dataset_id): | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not (current_user.is_editor or current_user.is_dataset_editor): | |||||
| if not (current_user.has_edit_permission or current_user.is_dataset_editor): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| args = tag_update_parser.parse_args() | args = tag_update_parser.parse_args() | ||||
| def delete(self, _, dataset_id): | def delete(self, _, dataset_id): | ||||
| """Delete a knowledge type tag.""" | """Delete a knowledge type tag.""" | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not current_user.is_editor: | |||||
| if not current_user.has_edit_permission: | |||||
| raise Forbidden() | raise Forbidden() | ||||
| args = tag_delete_parser.parse_args() | args = tag_delete_parser.parse_args() | ||||
| TagService.delete_tag(args.get("tag_id")) | TagService.delete_tag(args.get("tag_id")) | ||||
| def post(self, _, dataset_id): | def post(self, _, dataset_id): | ||||
| # The role of the current user in the ta table must be admin, owner, editor, or dataset_operator | # The role of the current user in the ta table must be admin, owner, editor, or dataset_operator | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not (current_user.is_editor or current_user.is_dataset_editor): | |||||
| if not (current_user.has_edit_permission or current_user.is_dataset_editor): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| args = tag_binding_parser.parse_args() | args = tag_binding_parser.parse_args() | ||||
| def post(self, _, dataset_id): | def post(self, _, dataset_id): | ||||
| # The role of the current user in the ta table must be admin, owner, editor, or dataset_operator | # The role of the current user in the ta table must be admin, owner, editor, or dataset_operator | ||||
| assert isinstance(current_user, Account) | assert isinstance(current_user, Account) | ||||
| if not (current_user.is_editor or current_user.is_dataset_editor): | |||||
| if not (current_user.has_edit_permission or current_user.is_dataset_editor): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| args = tag_unbinding_parser.parse_args() | args = tag_unbinding_parser.parse_args() |
| from flask_login import UserMixin # type: ignore[import-untyped] | from flask_login import UserMixin # type: ignore[import-untyped] | ||||
| from sqlalchemy import DateTime, String, func, select | from sqlalchemy import DateTime, String, func, select | ||||
| from sqlalchemy.orm import Mapped, Session, mapped_column, reconstructor | from sqlalchemy.orm import Mapped, Session, mapped_column, reconstructor | ||||
| from typing_extensions import deprecated | |||||
| from models.base import Base | from models.base import Base | ||||
| return TenantAccountRole.is_admin_role(self.role) | return TenantAccountRole.is_admin_role(self.role) | ||||
| @property | @property | ||||
| @deprecated("Use has_edit_permission instead.") | |||||
| def is_editor(self): | def is_editor(self): | ||||
| """Determines if the account has edit permissions in their current tenant (workspace). | |||||
| This property checks if the current role has editing privileges, which includes: | |||||
| - `OWNER` | |||||
| - `ADMIN` | |||||
| - `EDITOR` | |||||
| Note: This checks for any role with editing permission, not just the 'EDITOR' role specifically. | |||||
| """ | |||||
| return self.has_edit_permission | |||||
| @property | |||||
| def has_edit_permission(self): | |||||
| """Determines if the account has editing permissions in their current tenant (workspace). | |||||
| This property checks if the current role has editing privileges, which includes: | |||||
| - `OWNER` | |||||
| - `ADMIN` | |||||
| - `EDITOR` | |||||
| """ | |||||
| return TenantAccountRole.is_editing_role(self.role) | return TenantAccountRole.is_editing_role(self.role) | ||||
| @property | @property |
| """Integration tests for ChatMessageApi permission verification.""" | |||||
| import uuid | |||||
| from unittest import mock | |||||
| import pytest | |||||
| from flask.testing import FlaskClient | |||||
| from controllers.console.app import completion as completion_api | |||||
| from controllers.console.app import wraps | |||||
| from libs.datetime_utils import naive_utc_now | |||||
| from models import Account, App, Tenant | |||||
| from models.account import TenantAccountRole | |||||
| from models.model import AppMode | |||||
| from services.app_generate_service import AppGenerateService | |||||
| class TestChatMessageApiPermissions: | |||||
| """Test permission verification for ChatMessageApi endpoint.""" | |||||
| @pytest.fixture | |||||
| def mock_app_model(self): | |||||
| """Create a mock App model for testing.""" | |||||
| app = App() | |||||
| app.id = str(uuid.uuid4()) | |||||
| app.mode = AppMode.CHAT.value | |||||
| app.tenant_id = str(uuid.uuid4()) | |||||
| app.status = "normal" | |||||
| return app | |||||
| @pytest.fixture | |||||
| def mock_account(self): | |||||
| """Create a mock Account for testing.""" | |||||
| account = Account() | |||||
| account.id = str(uuid.uuid4()) | |||||
| account.name = "Test User" | |||||
| account.email = "test@example.com" | |||||
| account.last_active_at = naive_utc_now() | |||||
| account.created_at = naive_utc_now() | |||||
| account.updated_at = naive_utc_now() | |||||
| # Create mock tenant | |||||
| tenant = Tenant() | |||||
| tenant.id = str(uuid.uuid4()) | |||||
| tenant.name = "Test Tenant" | |||||
| account._current_tenant = tenant | |||||
| return account | |||||
| @pytest.mark.parametrize( | |||||
| ("role", "status"), | |||||
| [ | |||||
| (TenantAccountRole.OWNER, 200), | |||||
| (TenantAccountRole.ADMIN, 200), | |||||
| (TenantAccountRole.EDITOR, 200), | |||||
| (TenantAccountRole.NORMAL, 403), | |||||
| (TenantAccountRole.DATASET_OPERATOR, 403), | |||||
| ], | |||||
| ) | |||||
| def test_post_with_owner_role_succeeds( | |||||
| self, | |||||
| test_client: FlaskClient, | |||||
| auth_header, | |||||
| monkeypatch, | |||||
| mock_app_model, | |||||
| mock_account, | |||||
| role: TenantAccountRole, | |||||
| status: int, | |||||
| ): | |||||
| """Test that OWNER role can access chat-messages endpoint.""" | |||||
| """Setup common mocks for testing.""" | |||||
| # Mock app loading | |||||
| mock_load_app_model = mock.Mock(return_value=mock_app_model) | |||||
| monkeypatch.setattr(wraps, "_load_app_model", mock_load_app_model) | |||||
| # Mock current user | |||||
| monkeypatch.setattr(completion_api, "current_user", mock_account) | |||||
| mock_generate = mock.Mock(return_value={"message": "Test response"}) | |||||
| monkeypatch.setattr(AppGenerateService, "generate", mock_generate) | |||||
| # Set user role to OWNER | |||||
| mock_account.role = role | |||||
| response = test_client.post( | |||||
| f"/console/api/apps/{mock_app_model.id}/chat-messages", | |||||
| headers=auth_header, | |||||
| json={ | |||||
| "inputs": {}, | |||||
| "query": "Hello, world!", | |||||
| "model_config": { | |||||
| "model": {"provider": "openai", "name": "gpt-4", "mode": "chat", "completion_params": {}} | |||||
| }, | |||||
| "response_mode": "blocking", | |||||
| }, | |||||
| ) | |||||
| assert response.status_code == status |
| """Integration tests for ModelConfigResource permission verification.""" | |||||
| import uuid | |||||
| from unittest import mock | |||||
| import pytest | |||||
| from flask.testing import FlaskClient | |||||
| from controllers.console.app import model_config as model_config_api | |||||
| from controllers.console.app import wraps | |||||
| from libs.datetime_utils import naive_utc_now | |||||
| from models import Account, App, Tenant | |||||
| from models.account import TenantAccountRole | |||||
| from models.model import AppMode | |||||
| from services.app_model_config_service import AppModelConfigService | |||||
| class TestModelConfigResourcePermissions: | |||||
| """Test permission verification for ModelConfigResource endpoint.""" | |||||
| @pytest.fixture | |||||
| def mock_app_model(self): | |||||
| """Create a mock App model for testing.""" | |||||
| app = App() | |||||
| app.id = str(uuid.uuid4()) | |||||
| app.mode = AppMode.CHAT.value | |||||
| app.tenant_id = str(uuid.uuid4()) | |||||
| app.status = "normal" | |||||
| app.app_model_config_id = str(uuid.uuid4()) | |||||
| return app | |||||
| @pytest.fixture | |||||
| def mock_account(self): | |||||
| """Create a mock Account for testing.""" | |||||
| account = Account() | |||||
| account.id = str(uuid.uuid4()) | |||||
| account.name = "Test User" | |||||
| account.email = "test@example.com" | |||||
| account.last_active_at = naive_utc_now() | |||||
| account.created_at = naive_utc_now() | |||||
| account.updated_at = naive_utc_now() | |||||
| # Create mock tenant | |||||
| tenant = Tenant() | |||||
| tenant.id = str(uuid.uuid4()) | |||||
| tenant.name = "Test Tenant" | |||||
| account._current_tenant = tenant | |||||
| return account | |||||
| @pytest.mark.parametrize( | |||||
| ("role", "status"), | |||||
| [ | |||||
| (TenantAccountRole.OWNER, 200), | |||||
| (TenantAccountRole.ADMIN, 200), | |||||
| (TenantAccountRole.EDITOR, 200), | |||||
| (TenantAccountRole.NORMAL, 403), | |||||
| (TenantAccountRole.DATASET_OPERATOR, 403), | |||||
| ], | |||||
| ) | |||||
| def test_post_with_owner_role_succeeds( | |||||
| self, | |||||
| test_client: FlaskClient, | |||||
| auth_header, | |||||
| monkeypatch, | |||||
| mock_app_model, | |||||
| mock_account, | |||||
| role: TenantAccountRole, | |||||
| status: int, | |||||
| ): | |||||
| """Test that OWNER role can access model-config endpoint.""" | |||||
| # Set user role to OWNER | |||||
| mock_account.role = role | |||||
| # Mock app loading | |||||
| mock_load_app_model = mock.Mock(return_value=mock_app_model) | |||||
| monkeypatch.setattr(wraps, "_load_app_model", mock_load_app_model) | |||||
| # Mock current user | |||||
| monkeypatch.setattr(model_config_api, "current_user", mock_account) | |||||
| # Mock AccountService.load_user to prevent authentication issues | |||||
| from services.account_service import AccountService | |||||
| mock_load_user = mock.Mock(return_value=mock_account) | |||||
| monkeypatch.setattr(AccountService, "load_user", mock_load_user) | |||||
| mock_validate_config = mock.Mock( | |||||
| return_value={ | |||||
| "model": {"provider": "openai", "name": "gpt-4", "mode": "chat", "completion_params": {}}, | |||||
| "pre_prompt": "You are a helpful assistant.", | |||||
| "user_input_form": [], | |||||
| "dataset_query_variable": "", | |||||
| "agent_mode": {"enabled": False, "tools": []}, | |||||
| } | |||||
| ) | |||||
| monkeypatch.setattr(AppModelConfigService, "validate_configuration", mock_validate_config) | |||||
| # Mock database operations | |||||
| mock_db_session = mock.Mock() | |||||
| mock_db_session.add = mock.Mock() | |||||
| mock_db_session.flush = mock.Mock() | |||||
| mock_db_session.commit = mock.Mock() | |||||
| monkeypatch.setattr(model_config_api.db, "session", mock_db_session) | |||||
| # Mock app_model_config_was_updated event | |||||
| mock_event = mock.Mock() | |||||
| mock_event.send = mock.Mock() | |||||
| monkeypatch.setattr(model_config_api, "app_model_config_was_updated", mock_event) | |||||
| response = test_client.post( | |||||
| f"/console/api/apps/{mock_app_model.id}/model-config", | |||||
| headers=auth_header, | |||||
| json={ | |||||
| "model": { | |||||
| "provider": "openai", | |||||
| "name": "gpt-4", | |||||
| "mode": "chat", | |||||
| "completion_params": {"temperature": 0.7, "max_tokens": 1000}, | |||||
| }, | |||||
| "user_input_form": [], | |||||
| "dataset_query_variable": "", | |||||
| "pre_prompt": "You are a helpful assistant.", | |||||
| "agent_mode": {"enabled": False, "tools": []}, | |||||
| }, | |||||
| ) | |||||
| assert response.status_code == status |
| Test getting user through non-existent email. | Test getting user through non-existent email. | ||||
| """ | """ | ||||
| fake = Faker() | fake = Faker() | ||||
| non_existent_email = fake.email() | |||||
| domain = f"test-{fake.random_letters(10)}.com" | |||||
| non_existent_email = fake.email(domain=domain) | |||||
| found_user = AccountService.get_user_through_email(non_existent_email) | found_user = AccountService.get_user_through_email(non_existent_email) | ||||
| assert found_user is None | assert found_user is None | ||||