| 
                        123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 | 
                        - from datetime import UTC, datetime, timedelta
 - from unittest.mock import patch
 - 
 - import jwt
 - import pytest
 - from werkzeug.exceptions import Unauthorized
 - 
 - from libs.passport import PassportService
 - 
 - 
 - class TestPassportService:
 -     """Test PassportService JWT operations"""
 - 
 -     @pytest.fixture
 -     def passport_service(self):
 -         """Create PassportService instance with test secret key"""
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = "test-secret-key-for-testing"
 -             return PassportService()
 - 
 -     @pytest.fixture
 -     def another_passport_service(self):
 -         """Create another PassportService instance with different secret key"""
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = "another-secret-key-for-testing"
 -             return PassportService()
 - 
 -     # Core functionality tests
 -     def test_should_issue_and_verify_token(self, passport_service):
 -         """Test complete JWT lifecycle: issue and verify"""
 -         payload = {"user_id": "123", "app_code": "test-app"}
 -         token = passport_service.issue(payload)
 - 
 -         # Verify token format
 -         assert isinstance(token, str)
 -         assert len(token.split(".")) == 3  # JWT format: header.payload.signature
 - 
 -         # Verify token content
 -         decoded = passport_service.verify(token)
 -         assert decoded == payload
 - 
 -     def test_should_handle_different_payload_types(self, passport_service):
 -         """Test issuing and verifying tokens with different payload types"""
 -         test_cases = [
 -             {"string": "value"},
 -             {"number": 42},
 -             {"float": 3.14},
 -             {"boolean": True},
 -             {"null": None},
 -             {"array": [1, 2, 3]},
 -             {"nested": {"key": "value"}},
 -             {"unicode": "中文测试"},
 -             {"emoji": "🔐"},
 -             {},  # Empty payload
 -         ]
 - 
 -         for payload in test_cases:
 -             token = passport_service.issue(payload)
 -             decoded = passport_service.verify(token)
 -             assert decoded == payload
 - 
 -     # Security tests
 -     def test_should_reject_modified_token(self, passport_service):
 -         """Test that any modification to token invalidates it"""
 -         token = passport_service.issue({"user": "test"})
 - 
 -         # Test multiple modification points
 -         test_positions = [0, len(token) // 3, len(token) // 2, len(token) - 1]
 - 
 -         for pos in test_positions:
 -             if pos < len(token) and token[pos] != ".":
 -                 # Change one character
 -                 tampered = token[:pos] + ("X" if token[pos] != "X" else "Y") + token[pos + 1 :]
 -                 with pytest.raises(Unauthorized):
 -                     passport_service.verify(tampered)
 - 
 -     def test_should_reject_token_with_different_secret_key(self, passport_service, another_passport_service):
 -         """Test key isolation - token from one service should not work with another"""
 -         payload = {"user_id": "123", "app_code": "test-app"}
 -         token = passport_service.issue(payload)
 - 
 -         with pytest.raises(Unauthorized) as exc_info:
 -             another_passport_service.verify(token)
 -         assert str(exc_info.value) == "401 Unauthorized: Invalid token signature."
 - 
 -     def test_should_use_hs256_algorithm(self, passport_service):
 -         """Test that HS256 algorithm is used for signing"""
 -         payload = {"test": "data"}
 -         token = passport_service.issue(payload)
 - 
 -         # Decode header without relying on JWT internals
 -         # Use jwt.get_unverified_header which is a public API
 -         header = jwt.get_unverified_header(token)
 -         assert header["alg"] == "HS256"
 - 
 -     def test_should_reject_token_with_wrong_algorithm(self, passport_service):
 -         """Test rejection of token signed with different algorithm"""
 -         payload = {"user_id": "123"}
 - 
 -         # Create token with different algorithm
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = "test-secret-key-for-testing"
 -             # Create token with HS512 instead of HS256
 -             wrong_alg_token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS512")
 - 
 -         # Should fail because service expects HS256
 -         # InvalidAlgorithmError is now caught by PyJWTError handler
 -         with pytest.raises(Unauthorized) as exc_info:
 -             passport_service.verify(wrong_alg_token)
 -         assert str(exc_info.value) == "401 Unauthorized: Invalid token."
 - 
 -     # Exception handling tests
 -     def test_should_handle_invalid_tokens(self, passport_service):
 -         """Test handling of various invalid token formats"""
 -         invalid_tokens = [
 -             ("not.a.token", "Invalid token."),
 -             ("invalid-jwt-format", "Invalid token."),
 -             ("xxx.yyy.zzz", "Invalid token."),
 -             ("a.b", "Invalid token."),  # Missing signature
 -             ("", "Invalid token."),  # Empty string
 -             ("   ", "Invalid token."),  # Whitespace
 -             (None, "Invalid token."),  # None value
 -             # Malformed base64
 -             ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.INVALID_BASE64!@#$.signature", "Invalid token."),
 -         ]
 - 
 -         for invalid_token, expected_message in invalid_tokens:
 -             with pytest.raises(Unauthorized) as exc_info:
 -                 passport_service.verify(invalid_token)
 -             assert expected_message in str(exc_info.value)
 - 
 -     def test_should_reject_expired_token(self, passport_service):
 -         """Test rejection of expired token"""
 -         past_time = datetime.now(UTC) - timedelta(hours=1)
 -         payload = {"user_id": "123", "exp": past_time.timestamp()}
 - 
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = "test-secret-key-for-testing"
 -             token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS256")
 - 
 -         with pytest.raises(Unauthorized) as exc_info:
 -             passport_service.verify(token)
 -         assert str(exc_info.value) == "401 Unauthorized: Token has expired."
 - 
 -     # Configuration tests
 -     def test_should_handle_empty_secret_key(self):
 -         """Test behavior when SECRET_KEY is empty"""
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = ""
 -             service = PassportService()
 - 
 -             # Empty secret key should still work but is insecure
 -             payload = {"test": "data"}
 -             token = service.issue(payload)
 -             decoded = service.verify(token)
 -             assert decoded == payload
 - 
 -     def test_should_handle_none_secret_key(self):
 -         """Test behavior when SECRET_KEY is None"""
 -         with patch("libs.passport.dify_config") as mock_config:
 -             mock_config.SECRET_KEY = None
 -             service = PassportService()
 - 
 -             payload = {"test": "data"}
 -             # JWT library will raise TypeError when secret is None
 -             with pytest.raises((TypeError, jwt.exceptions.InvalidKeyError)):
 -                 service.issue(payload)
 - 
 -     # Boundary condition tests
 -     def test_should_handle_large_payload(self, passport_service):
 -         """Test handling of large payload"""
 -         # Test with 100KB instead of 1MB for faster tests
 -         large_data = "x" * (100 * 1024)
 -         payload = {"data": large_data}
 - 
 -         token = passport_service.issue(payload)
 -         decoded = passport_service.verify(token)
 - 
 -         assert decoded["data"] == large_data
 - 
 -     def test_should_handle_special_characters_in_payload(self, passport_service):
 -         """Test handling of special characters in payload"""
 -         special_payloads = [
 -             {"special": "!@#$%^&*()"},
 -             {"quotes": 'He said "Hello"'},
 -             {"backslash": "path\\to\\file"},
 -             {"newline": "line1\nline2"},
 -             {"unicode": "🔐🔑🛡️"},
 -             {"mixed": "Test123!@#中文🔐"},
 -         ]
 - 
 -         for payload in special_payloads:
 -             token = passport_service.issue(payload)
 -             decoded = passport_service.verify(token)
 -             assert decoded == payload
 - 
 -     def test_should_catch_generic_pyjwt_errors(self, passport_service):
 -         """Test that generic PyJWTError exceptions are caught and converted to Unauthorized"""
 -         # Mock jwt.decode to raise a generic PyJWTError
 -         with patch("libs.passport.jwt.decode") as mock_decode:
 -             mock_decode.side_effect = jwt.exceptions.PyJWTError("Generic JWT error")
 - 
 -             with pytest.raises(Unauthorized) as exc_info:
 -                 passport_service.verify("some-token")
 -             assert str(exc_info.value) == "401 Unauthorized: Invalid token."
 
 
  |