| from extensions.ext_database import db | from extensions.ext_database import db | ||||
| from fields.member_fields import account_with_role_list_fields | from fields.member_fields import account_with_role_list_fields | ||||
| from libs.login import login_required | from libs.login import login_required | ||||
| from models.account import Account | |||||
| from models.account import Account, TenantAccountRole | |||||
| from services.account_service import RegisterService, TenantService | from services.account_service import RegisterService, TenantService | ||||
| from services.errors.account import AccountAlreadyInTenantError | from services.errors.account import AccountAlreadyInTenantError | ||||
| invitee_emails = args['emails'] | invitee_emails = args['emails'] | ||||
| invitee_role = args['role'] | invitee_role = args['role'] | ||||
| interface_language = args['language'] | interface_language = args['language'] | ||||
| if invitee_role not in ['admin', 'normal']: | |||||
| if invitee_role not in [TenantAccountRole.ADMIN, TenantAccountRole.NORMAL]: | |||||
| return {'code': 'invalid-role', 'message': 'Invalid role'}, 400 | return {'code': 'invalid-role', 'message': 'Invalid role'}, 400 | ||||
| inviter = current_user | inviter = current_user |
| from core.model_runtime.errors.validate import CredentialsValidateFailedError | from core.model_runtime.errors.validate import CredentialsValidateFailedError | ||||
| from core.model_runtime.utils.encoders import jsonable_encoder | from core.model_runtime.utils.encoders import jsonable_encoder | ||||
| from libs.login import login_required | from libs.login import login_required | ||||
| from models.account import TenantAccountRole | |||||
| from services.model_provider_service import ModelProviderService | from services.model_provider_service import ModelProviderService | ||||
| @login_required | @login_required | ||||
| @account_initialization_required | @account_initialization_required | ||||
| def post(self, provider: str): | def post(self, provider: str): | ||||
| if current_user.current_tenant.current_role not in ['admin', 'owner']: | |||||
| if not TenantAccountRole.is_privileged_role(current_user.current_tenant.current_role): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| tenant_id = current_user.current_tenant_id | tenant_id = current_user.current_tenant_id | ||||
| @login_required | @login_required | ||||
| @account_initialization_required | @account_initialization_required | ||||
| def delete(self, provider: str): | def delete(self, provider: str): | ||||
| if current_user.current_tenant.current_role not in ['admin', 'owner']: | |||||
| if not TenantAccountRole.is_privileged_role(current_user.current_tenant.current_role): | |||||
| raise Forbidden() | raise Forbidden() | ||||
| tenant_id = current_user.current_tenant_id | tenant_id = current_user.current_tenant_id |
| return db.session.query(ai).filter( | return db.session.query(ai).filter( | ||||
| ai.account_id == self.id | ai.account_id == self.id | ||||
| ).all() | ).all() | ||||
| # check current_user.current_tenant.current_role in ['admin', 'owner'] | # check current_user.current_tenant.current_role in ['admin', 'owner'] | ||||
| @property | @property | ||||
| def is_admin_or_owner(self): | def is_admin_or_owner(self): | ||||
| return self._current_tenant.current_role in ['admin', 'owner'] | |||||
| return TenantAccountRole.is_privileged_role(self._current_tenant.current_role) | |||||
| class TenantStatus(str, enum.Enum): | class TenantStatus(str, enum.Enum): | ||||
| ARCHIVE = 'archive' | ARCHIVE = 'archive' | ||||
| class TenantAccountRole(str, enum.Enum): | |||||
| OWNER = 'owner' | |||||
| ADMIN = 'admin' | |||||
| NORMAL = 'normal' | |||||
| @staticmethod | |||||
| def is_privileged_role(role: str) -> bool: | |||||
| return role and role in {TenantAccountRole.ADMIN, TenantAccountRole.OWNER} | |||||
| class Tenant(db.Model): | class Tenant(db.Model): | ||||
| __tablename__ = 'tenants' | __tablename__ = 'tenants' | ||||
| __table_args__ = ( | __table_args__ = ( | ||||
| Account.id == TenantAccountJoin.account_id, | Account.id == TenantAccountJoin.account_id, | ||||
| TenantAccountJoin.tenant_id == self.id | TenantAccountJoin.tenant_id == self.id | ||||
| ).all() | ).all() | ||||
| @property | @property | ||||
| def custom_config_dict(self) -> dict: | def custom_config_dict(self) -> dict: | ||||
| return json.loads(self.custom_config) if self.custom_config else {} | return json.loads(self.custom_config) if self.custom_config else {} | ||||
| @custom_config_dict.setter | @custom_config_dict.setter | ||||
| def custom_config_dict(self, value: dict): | def custom_config_dict(self, value: dict): | ||||
| self.custom_config = json.dumps(value) | self.custom_config = json.dumps(value) |
| def check_member_permission(tenant: Tenant, operator: Account, member: Account, action: str) -> None: | def check_member_permission(tenant: Tenant, operator: Account, member: Account, action: str) -> None: | ||||
| """Check member permission""" | """Check member permission""" | ||||
| perms = { | perms = { | ||||
| 'add': ['owner', 'admin'], | |||||
| 'remove': ['owner'], | |||||
| 'update': ['owner'] | |||||
| 'add': [TenantAccountRole.OWNER, TenantAccountRole.ADMIN], | |||||
| 'remove': [TenantAccountRole.OWNER], | |||||
| 'update': [TenantAccountRole.OWNER] | |||||
| } | } | ||||
| if action not in ['add', 'remove', 'update']: | if action not in ['add', 'remove', 'update']: | ||||
| raise InvalidActionError("Invalid action.") | raise InvalidActionError("Invalid action.") |
| import requests | import requests | ||||
| from extensions.ext_database import db | from extensions.ext_database import db | ||||
| from models.account import TenantAccountJoin | |||||
| from models.account import TenantAccountJoin, TenantAccountRole | |||||
| class BillingService: | class BillingService: | ||||
| TenantAccountJoin.account_id == current_user.id | TenantAccountJoin.account_id == current_user.id | ||||
| ).first() | ).first() | ||||
| if join.role not in ['owner', 'admin']: | |||||
| if TenantAccountRole.is_privileged_role(join.role): | |||||
| raise ValueError('Only team owner or team admin can perform this action') | raise ValueError('Only team owner or team admin can perform this action') |
| from models.account import TenantAccountRole | |||||
| def test_account_is_privileged_role() -> None: | |||||
| assert TenantAccountRole.ADMIN == 'admin' | |||||
| assert TenantAccountRole.OWNER == 'owner' | |||||
| assert TenantAccountRole.NORMAL == 'normal' | |||||
| assert TenantAccountRole.is_privileged_role(TenantAccountRole.ADMIN) | |||||
| assert TenantAccountRole.is_privileged_role(TenantAccountRole.OWNER) | |||||
| assert not TenantAccountRole.is_privileged_role(TenantAccountRole.NORMAL) | |||||
| assert not TenantAccountRole.is_privileged_role('') |