瀏覽代碼

fix: standardize authentication error messages to prevent user enumeration (#24324)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
tags/1.8.0
-LAN- 2 月之前
父節點
當前提交
cfb8d224da
沒有連結到貢獻者的電子郵件帳戶。

+ 6
- 0
api/controllers/console/auth/error.py 查看文件

code = 400 code = 400




class AuthenticationFailedError(BaseHTTPException):
error_code = "authentication_failed"
description = "Invalid email or password."
code = 401


class EmailPasswordLoginLimitError(BaseHTTPException): class EmailPasswordLoginLimitError(BaseHTTPException):
error_code = "email_code_login_limit" error_code = "email_code_login_limit"
description = "Too many incorrect password attempts. Please try again later." description = "Too many incorrect password attempts. Please try again later."

+ 3
- 2
api/controllers/console/auth/login.py 查看文件

from constants.languages import languages from constants.languages import languages
from controllers.console import api from controllers.console import api
from controllers.console.auth.error import ( from controllers.console.auth.error import (
AuthenticationFailedError,
EmailCodeError, EmailCodeError,
EmailOrPasswordMismatchError,
EmailPasswordLoginLimitError, EmailPasswordLoginLimitError,
InvalidEmailError, InvalidEmailError,
InvalidTokenError, InvalidTokenError,
raise AccountBannedError() raise AccountBannedError()
except services.errors.account.AccountPasswordError: except services.errors.account.AccountPasswordError:
AccountService.add_login_error_rate_limit(args["email"]) AccountService.add_login_error_rate_limit(args["email"])
raise EmailOrPasswordMismatchError()
raise AuthenticationFailedError()
except services.errors.account.AccountNotFoundError: except services.errors.account.AccountNotFoundError:
if FeatureService.get_system_features().is_allow_register: if FeatureService.get_system_features().is_allow_register:
token = AccountService.send_reset_password_email(email=args["email"], language=language) token = AccountService.send_reset_password_email(email=args["email"], language=language)
account = AccountService.get_user_through_email(args["email"]) account = AccountService.get_user_through_email(args["email"])
except AccountRegisterError as are: except AccountRegisterError as are:
raise AccountInFreezeError() raise AccountInFreezeError()

if account is None: if account is None:
if FeatureService.get_system_features().is_allow_register: if FeatureService.get_system_features().is_allow_register:
token = AccountService.send_reset_password_email(email=args["email"], language=language) token = AccountService.send_reset_password_email(email=args["email"], language=language)

+ 4
- 3
api/controllers/web/forgot_password.py 查看文件

from sqlalchemy.orm import Session from sqlalchemy.orm import Session


from controllers.console.auth.error import ( from controllers.console.auth.error import (
AuthenticationFailedError,
EmailCodeError, EmailCodeError,
EmailPasswordResetLimitError, EmailPasswordResetLimitError,
InvalidEmailError, InvalidEmailError,
InvalidTokenError, InvalidTokenError,
PasswordMismatchError, PasswordMismatchError,
) )
from controllers.console.error import AccountNotFound, EmailSendIpLimitError
from controllers.console.error import EmailSendIpLimitError
from controllers.console.wraps import email_password_login_enabled, only_edition_enterprise, setup_required from controllers.console.wraps import email_password_login_enabled, only_edition_enterprise, setup_required
from controllers.web import api from controllers.web import api
from extensions.ext_database import db from extensions.ext_database import db
account = session.execute(select(Account).filter_by(email=args["email"])).scalar_one_or_none() account = session.execute(select(Account).filter_by(email=args["email"])).scalar_one_or_none()
token = None token = None
if account is None: if account is None:
raise AccountNotFound()
raise AuthenticationFailedError()
else: else:
token = AccountService.send_reset_password_email(account=account, email=args["email"], language=language) token = AccountService.send_reset_password_email(account=account, email=args["email"], language=language)


if account: if account:
self._update_existing_account(account, password_hashed, salt, session) self._update_existing_account(account, password_hashed, salt, session)
else: else:
raise AccountNotFound()
raise AuthenticationFailedError()


return {"result": "success"} return {"result": "success"}



+ 10
- 6
api/controllers/web/login.py 查看文件

from jwt import InvalidTokenError # type: ignore from jwt import InvalidTokenError # type: ignore


import services import services
from controllers.console.auth.error import EmailCodeError, EmailOrPasswordMismatchError, InvalidEmailError
from controllers.console.error import AccountBannedError, AccountNotFound
from controllers.console.auth.error import (
AuthenticationFailedError,
EmailCodeError,
InvalidEmailError,
)
from controllers.console.error import AccountBannedError
from controllers.console.wraps import only_edition_enterprise, setup_required from controllers.console.wraps import only_edition_enterprise, setup_required
from controllers.web import api from controllers.web import api
from libs.helper import email from libs.helper import email
except services.errors.account.AccountLoginError: except services.errors.account.AccountLoginError:
raise AccountBannedError() raise AccountBannedError()
except services.errors.account.AccountPasswordError: except services.errors.account.AccountPasswordError:
raise EmailOrPasswordMismatchError()
raise AuthenticationFailedError()
except services.errors.account.AccountNotFoundError: except services.errors.account.AccountNotFoundError:
raise AccountNotFound()
raise AuthenticationFailedError()


token = WebAppAuthService.login(account=account) token = WebAppAuthService.login(account=account)
return {"result": "success", "data": {"access_token": token}} return {"result": "success", "data": {"access_token": token}}


account = WebAppAuthService.get_user_through_email(args["email"]) account = WebAppAuthService.get_user_through_email(args["email"])
if account is None: if account is None:
raise AccountNotFound()
raise AuthenticationFailedError()
else: else:
token = WebAppAuthService.send_email_code_login_email(account=account, language=language) token = WebAppAuthService.send_email_code_login_email(account=account, language=language)


WebAppAuthService.revoke_email_code_login_token(args["token"]) WebAppAuthService.revoke_email_code_login_token(args["token"])
account = WebAppAuthService.get_user_through_email(user_email) account = WebAppAuthService.get_user_through_email(user_email)
if not account: if not account:
raise AccountNotFound()
raise AuthenticationFailedError()


token = WebAppAuthService.login(account=account) token = WebAppAuthService.login(account=account)
AccountService.reset_login_error_rate_limit(args["email"]) AccountService.reset_login_error_rate_limit(args["email"])

+ 134
- 0
api/tests/unit_tests/controllers/console/auth/test_authentication_security.py 查看文件

"""Test authentication security to prevent user enumeration."""

from unittest.mock import MagicMock, patch

import pytest
from flask import Flask
from flask_restx import Api

import services.errors.account
from controllers.console.auth.error import AuthenticationFailedError
from controllers.console.auth.login import LoginApi
from controllers.console.error import AccountNotFound


class TestAuthenticationSecurity:
"""Test authentication endpoints for security against user enumeration."""

def setup_method(self):
"""Set up test fixtures."""
self.app = Flask(__name__)
self.api = Api(self.app)
self.api.add_resource(LoginApi, "/login")
self.client = self.app.test_client()
self.app.config["TESTING"] = True

@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.login.FeatureService.get_system_features")
@patch("controllers.console.auth.login.AccountService.is_login_error_rate_limit")
@patch("controllers.console.auth.login.AccountService.authenticate")
@patch("controllers.console.auth.login.AccountService.send_reset_password_email")
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
@patch("controllers.console.auth.login.RegisterService.get_invitation_if_token_valid")
def test_login_invalid_email_with_registration_allowed(
self, mock_get_invitation, mock_send_email, mock_authenticate, mock_is_rate_limit, mock_features, mock_db
):
"""Test that invalid email sends reset password email when registration is allowed."""
# Arrange
mock_is_rate_limit.return_value = False
mock_get_invitation.return_value = None
mock_authenticate.side_effect = services.errors.account.AccountNotFoundError("Account not found")
mock_db.session.query.return_value.first.return_value = MagicMock() # Mock setup exists
mock_features.return_value.is_allow_register = True
mock_send_email.return_value = "token123"

# Act
with self.app.test_request_context(
"/login", method="POST", json={"email": "nonexistent@example.com", "password": "WrongPass123!"}
):
login_api = LoginApi()
result = login_api.post()

# Assert
assert result == {"result": "fail", "data": "token123", "code": "account_not_found"}
mock_send_email.assert_called_once_with(email="nonexistent@example.com", language="en-US")

@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.login.AccountService.is_login_error_rate_limit")
@patch("controllers.console.auth.login.AccountService.authenticate")
@patch("controllers.console.auth.login.AccountService.add_login_error_rate_limit")
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
@patch("controllers.console.auth.login.RegisterService.get_invitation_if_token_valid")
def test_login_wrong_password_returns_error(
self, mock_get_invitation, mock_add_rate_limit, mock_authenticate, mock_is_rate_limit, mock_db
):
"""Test that wrong password returns AuthenticationFailedError."""
# Arrange
mock_is_rate_limit.return_value = False
mock_get_invitation.return_value = None
mock_authenticate.side_effect = services.errors.account.AccountPasswordError("Wrong password")
mock_db.session.query.return_value.first.return_value = MagicMock() # Mock setup exists

# Act
with self.app.test_request_context(
"/login", method="POST", json={"email": "existing@example.com", "password": "WrongPass123!"}
):
login_api = LoginApi()

# Assert
with pytest.raises(AuthenticationFailedError) as exc_info:
login_api.post()

assert exc_info.value.error_code == "authentication_failed"
assert exc_info.value.description == "Invalid email or password."
mock_add_rate_limit.assert_called_once_with("existing@example.com")

@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.login.FeatureService.get_system_features")
@patch("controllers.console.auth.login.AccountService.is_login_error_rate_limit")
@patch("controllers.console.auth.login.AccountService.authenticate")
@patch("controllers.console.auth.login.dify_config.BILLING_ENABLED", False)
@patch("controllers.console.auth.login.RegisterService.get_invitation_if_token_valid")
def test_login_invalid_email_with_registration_disabled(
self, mock_get_invitation, mock_authenticate, mock_is_rate_limit, mock_features, mock_db
):
"""Test that invalid email raises AccountNotFound when registration is disabled."""
# Arrange
mock_is_rate_limit.return_value = False
mock_get_invitation.return_value = None
mock_authenticate.side_effect = services.errors.account.AccountNotFoundError("Account not found")
mock_db.session.query.return_value.first.return_value = MagicMock() # Mock setup exists
mock_features.return_value.is_allow_register = False

# Act
with self.app.test_request_context(
"/login", method="POST", json={"email": "nonexistent@example.com", "password": "WrongPass123!"}
):
login_api = LoginApi()

# Assert
with pytest.raises(AccountNotFound) as exc_info:
login_api.post()

assert exc_info.value.error_code == "account_not_found"

@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.login.FeatureService.get_system_features")
@patch("controllers.console.auth.login.AccountService.get_user_through_email")
@patch("controllers.console.auth.login.AccountService.send_reset_password_email")
def test_reset_password_with_existing_account(self, mock_send_email, mock_get_user, mock_features, mock_db):
"""Test that reset password returns success with token for existing accounts."""
# Mock the setup check
mock_db.session.query.return_value.first.return_value = MagicMock() # Mock setup exists

# Test with existing account
mock_get_user.return_value = MagicMock(email="existing@example.com")
mock_send_email.return_value = "token123"

with self.app.test_request_context("/reset-password", method="POST", json={"email": "existing@example.com"}):
from controllers.console.auth.login import ResetPasswordSendEmailApi

api = ResetPasswordSendEmailApi()
result = api.post()

assert result == {"result": "success", "data": "token123"}

Loading…
取消
儲存