Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>tags/1.9.0
| @@ -246,6 +246,43 @@ class TestEmailI18nService: | |||
| sent_email = mock_sender.sent_emails[0] | |||
| assert sent_email["subject"] == "Reset Your Dify Password" | |||
| def test_subject_format_keyerror_fallback_path( | |||
| self, | |||
| mock_renderer: MockEmailRenderer, | |||
| mock_sender: MockEmailSender, | |||
| ): | |||
| """Trigger subject KeyError and cover except branch.""" | |||
| # Config with subject that references an unknown key (no {application_title} to avoid second format) | |||
| config = EmailI18nConfig( | |||
| templates={ | |||
| EmailType.INVITE_MEMBER: { | |||
| EmailLanguage.EN_US: EmailTemplate( | |||
| subject="Invite: {unknown_placeholder}", | |||
| template_path="invite_member_en.html", | |||
| branded_template_path="branded/invite_member_en.html", | |||
| ), | |||
| } | |||
| } | |||
| ) | |||
| branding_service = MockBrandingService(enabled=False) | |||
| service = EmailI18nService( | |||
| config=config, | |||
| renderer=mock_renderer, | |||
| branding_service=branding_service, | |||
| sender=mock_sender, | |||
| ) | |||
| # Will raise KeyError on subject.format(**full_context), then hit except branch and skip fallback | |||
| service.send_email( | |||
| email_type=EmailType.INVITE_MEMBER, | |||
| language_code="en-US", | |||
| to="test@example.com", | |||
| ) | |||
| assert len(mock_sender.sent_emails) == 1 | |||
| # Subject is left unformatted due to KeyError fallback path without application_title | |||
| assert mock_sender.sent_emails[0]["subject"] == "Invite: {unknown_placeholder}" | |||
| def test_send_change_email_old_phase( | |||
| self, | |||
| email_config: EmailI18nConfig, | |||
| @@ -0,0 +1,122 @@ | |||
| from flask import Blueprint, Flask | |||
| from flask_restx import Resource | |||
| from werkzeug.exceptions import BadRequest, Unauthorized | |||
| from core.errors.error import AppInvokeQuotaExceededError | |||
| from libs.external_api import ExternalApi | |||
| def _create_api_app(): | |||
| app = Flask(__name__) | |||
| bp = Blueprint("t", __name__) | |||
| api = ExternalApi(bp) | |||
| @api.route("/bad-request") | |||
| class Bad(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| raise BadRequest("invalid input") | |||
| @api.route("/unauth") | |||
| class Unauth(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| raise Unauthorized("auth required") | |||
| @api.route("/value-error") | |||
| class ValErr(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| raise ValueError("boom") | |||
| @api.route("/quota") | |||
| class Quota(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| raise AppInvokeQuotaExceededError("quota exceeded") | |||
| @api.route("/general") | |||
| class Gen(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| raise RuntimeError("oops") | |||
| # Note: We avoid altering default_mediatype to keep normal error paths | |||
| # Special 400 message rewrite | |||
| @api.route("/json-empty") | |||
| class JsonEmpty(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| e = BadRequest() | |||
| # Force the specific message the handler rewrites | |||
| e.description = "Failed to decode JSON object: Expecting value: line 1 column 1 (char 0)" | |||
| raise e | |||
| # 400 mapping payload path | |||
| @api.route("/param-errors") | |||
| class ParamErrors(Resource): # type: ignore | |||
| def get(self): # type: ignore | |||
| e = BadRequest() | |||
| # Coerce a mapping description to trigger param error shaping | |||
| e.description = {"field": "is required"} # type: ignore[assignment] | |||
| raise e | |||
| app.register_blueprint(bp, url_prefix="/api") | |||
| return app | |||
| def test_external_api_error_handlers_basic_paths(): | |||
| app = _create_api_app() | |||
| client = app.test_client() | |||
| # 400 | |||
| res = client.get("/api/bad-request") | |||
| assert res.status_code == 400 | |||
| data = res.get_json() | |||
| assert data["code"] == "bad_request" | |||
| assert data["status"] == 400 | |||
| # 401 | |||
| res = client.get("/api/unauth") | |||
| assert res.status_code == 401 | |||
| assert "WWW-Authenticate" in res.headers | |||
| # 400 ValueError | |||
| res = client.get("/api/value-error") | |||
| assert res.status_code == 400 | |||
| assert res.get_json()["code"] == "invalid_param" | |||
| # 500 general | |||
| res = client.get("/api/general") | |||
| assert res.status_code == 500 | |||
| assert res.get_json()["status"] == 500 | |||
| def test_external_api_json_message_and_bad_request_rewrite(): | |||
| app = _create_api_app() | |||
| client = app.test_client() | |||
| # JSON empty special rewrite | |||
| res = client.get("/api/json-empty") | |||
| assert res.status_code == 400 | |||
| assert res.get_json()["message"] == "Invalid JSON payload received or JSON payload is empty." | |||
| def test_external_api_param_mapping_and_quota_and_exc_info_none(): | |||
| # Force exc_info() to return (None,None,None) only during request | |||
| import libs.external_api as ext | |||
| orig_exc_info = ext.sys.exc_info | |||
| try: | |||
| ext.sys.exc_info = lambda: (None, None, None) # type: ignore[assignment] | |||
| app = _create_api_app() | |||
| client = app.test_client() | |||
| # Param errors mapping payload path | |||
| res = client.get("/api/param-errors") | |||
| assert res.status_code == 400 | |||
| data = res.get_json() | |||
| assert data["code"] == "invalid_param" | |||
| assert data["params"] == "field" | |||
| # Quota path — depending on Flask-RESTX internals it may be handled | |||
| res = client.get("/api/quota") | |||
| assert res.status_code in (400, 429) | |||
| finally: | |||
| ext.sys.exc_info = orig_exc_info # type: ignore[assignment] | |||
| @@ -0,0 +1,19 @@ | |||
| import pytest | |||
| from libs.oauth import OAuth | |||
| def test_oauth_base_methods_raise_not_implemented(): | |||
| oauth = OAuth(client_id="id", client_secret="sec", redirect_uri="uri") | |||
| with pytest.raises(NotImplementedError): | |||
| oauth.get_authorization_url() | |||
| with pytest.raises(NotImplementedError): | |||
| oauth.get_access_token("code") | |||
| with pytest.raises(NotImplementedError): | |||
| oauth.get_raw_user_info("token") | |||
| with pytest.raises(NotImplementedError): | |||
| oauth._transform_user_info({}) # type: ignore[name-defined] | |||
| @@ -0,0 +1,53 @@ | |||
| from unittest.mock import MagicMock, patch | |||
| import pytest | |||
| from python_http_client.exceptions import UnauthorizedError | |||
| from libs.sendgrid import SendGridClient | |||
| def _mail(to: str = "user@example.com") -> dict: | |||
| return {"to": to, "subject": "Hi", "html": "<b>Hi</b>"} | |||
| @patch("libs.sendgrid.sendgrid.SendGridAPIClient") | |||
| def test_sendgrid_success(mock_client_cls: MagicMock): | |||
| mock_client = MagicMock() | |||
| mock_client_cls.return_value = mock_client | |||
| # nested attribute access: client.mail.send.post | |||
| mock_client.client.mail.send.post.return_value = MagicMock(status_code=202, body=b"", headers={}) | |||
| sg = SendGridClient(sendgrid_api_key="key", _from="noreply@example.com") | |||
| sg.send(_mail()) | |||
| mock_client_cls.assert_called_once() | |||
| mock_client.client.mail.send.post.assert_called_once() | |||
| @patch("libs.sendgrid.sendgrid.SendGridAPIClient") | |||
| def test_sendgrid_missing_to_raises(mock_client_cls: MagicMock): | |||
| sg = SendGridClient(sendgrid_api_key="key", _from="noreply@example.com") | |||
| with pytest.raises(ValueError): | |||
| sg.send(_mail(to="")) | |||
| @patch("libs.sendgrid.sendgrid.SendGridAPIClient") | |||
| def test_sendgrid_auth_errors_reraise(mock_client_cls: MagicMock): | |||
| mock_client = MagicMock() | |||
| mock_client_cls.return_value = mock_client | |||
| mock_client.client.mail.send.post.side_effect = UnauthorizedError(401, "Unauthorized", b"{}", {}) | |||
| sg = SendGridClient(sendgrid_api_key="key", _from="noreply@example.com") | |||
| with pytest.raises(UnauthorizedError): | |||
| sg.send(_mail()) | |||
| @patch("libs.sendgrid.sendgrid.SendGridAPIClient") | |||
| def test_sendgrid_timeout_reraise(mock_client_cls: MagicMock): | |||
| mock_client = MagicMock() | |||
| mock_client_cls.return_value = mock_client | |||
| mock_client.client.mail.send.post.side_effect = TimeoutError("timeout") | |||
| sg = SendGridClient(sendgrid_api_key="key", _from="noreply@example.com") | |||
| with pytest.raises(TimeoutError): | |||
| sg.send(_mail()) | |||
| @@ -0,0 +1,100 @@ | |||
| from unittest.mock import MagicMock, patch | |||
| import pytest | |||
| from libs.smtp import SMTPClient | |||
| def _mail() -> dict: | |||
| return {"to": "user@example.com", "subject": "Hi", "html": "<b>Hi</b>"} | |||
| @patch("libs.smtp.smtplib.SMTP") | |||
| def test_smtp_plain_success(mock_smtp_cls: MagicMock): | |||
| mock_smtp = MagicMock() | |||
| mock_smtp_cls.return_value = mock_smtp | |||
| client = SMTPClient(server="smtp.example.com", port=25, username="", password="", _from="noreply@example.com") | |||
| client.send(_mail()) | |||
| mock_smtp_cls.assert_called_once_with("smtp.example.com", 25, timeout=10) | |||
| mock_smtp.sendmail.assert_called_once() | |||
| mock_smtp.quit.assert_called_once() | |||
| @patch("libs.smtp.smtplib.SMTP") | |||
| def test_smtp_tls_opportunistic_success(mock_smtp_cls: MagicMock): | |||
| mock_smtp = MagicMock() | |||
| mock_smtp_cls.return_value = mock_smtp | |||
| client = SMTPClient( | |||
| server="smtp.example.com", | |||
| port=587, | |||
| username="user", | |||
| password="pass", | |||
| _from="noreply@example.com", | |||
| use_tls=True, | |||
| opportunistic_tls=True, | |||
| ) | |||
| client.send(_mail()) | |||
| mock_smtp_cls.assert_called_once_with("smtp.example.com", 587, timeout=10) | |||
| assert mock_smtp.ehlo.call_count == 2 | |||
| mock_smtp.starttls.assert_called_once() | |||
| mock_smtp.login.assert_called_once_with("user", "pass") | |||
| mock_smtp.sendmail.assert_called_once() | |||
| mock_smtp.quit.assert_called_once() | |||
| @patch("libs.smtp.smtplib.SMTP_SSL") | |||
| def test_smtp_tls_ssl_branch_and_timeout(mock_smtp_ssl_cls: MagicMock): | |||
| # Cover SMTP_SSL branch and TimeoutError handling | |||
| mock_smtp = MagicMock() | |||
| mock_smtp.sendmail.side_effect = TimeoutError("timeout") | |||
| mock_smtp_ssl_cls.return_value = mock_smtp | |||
| client = SMTPClient( | |||
| server="smtp.example.com", | |||
| port=465, | |||
| username="", | |||
| password="", | |||
| _from="noreply@example.com", | |||
| use_tls=True, | |||
| opportunistic_tls=False, | |||
| ) | |||
| with pytest.raises(TimeoutError): | |||
| client.send(_mail()) | |||
| mock_smtp.quit.assert_called_once() | |||
| @patch("libs.smtp.smtplib.SMTP") | |||
| def test_smtp_generic_exception_propagates(mock_smtp_cls: MagicMock): | |||
| mock_smtp = MagicMock() | |||
| mock_smtp.sendmail.side_effect = RuntimeError("oops") | |||
| mock_smtp_cls.return_value = mock_smtp | |||
| client = SMTPClient(server="smtp.example.com", port=25, username="", password="", _from="noreply@example.com") | |||
| with pytest.raises(RuntimeError): | |||
| client.send(_mail()) | |||
| mock_smtp.quit.assert_called_once() | |||
| @patch("libs.smtp.smtplib.SMTP") | |||
| def test_smtp_smtplib_exception_in_login(mock_smtp_cls: MagicMock): | |||
| # Ensure we hit the specific SMTPException except branch | |||
| import smtplib | |||
| mock_smtp = MagicMock() | |||
| mock_smtp.login.side_effect = smtplib.SMTPException("login-fail") | |||
| mock_smtp_cls.return_value = mock_smtp | |||
| client = SMTPClient( | |||
| server="smtp.example.com", | |||
| port=25, | |||
| username="user", # non-empty to trigger login | |||
| password="pass", | |||
| _from="noreply@example.com", | |||
| ) | |||
| with pytest.raises(smtplib.SMTPException): | |||
| client.send(_mail()) | |||
| mock_smtp.quit.assert_called_once() | |||