You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_passport.py 8.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from datetime import UTC, datetime, timedelta
  2. from unittest.mock import patch
  3. import jwt
  4. import pytest
  5. from werkzeug.exceptions import Unauthorized
  6. from libs.passport import PassportService
  7. class TestPassportService:
  8. """Test PassportService JWT operations"""
  9. @pytest.fixture
  10. def passport_service(self):
  11. """Create PassportService instance with test secret key"""
  12. with patch("libs.passport.dify_config") as mock_config:
  13. mock_config.SECRET_KEY = "test-secret-key-for-testing"
  14. return PassportService()
  15. @pytest.fixture
  16. def another_passport_service(self):
  17. """Create another PassportService instance with different secret key"""
  18. with patch("libs.passport.dify_config") as mock_config:
  19. mock_config.SECRET_KEY = "another-secret-key-for-testing"
  20. return PassportService()
  21. # Core functionality tests
  22. def test_should_issue_and_verify_token(self, passport_service):
  23. """Test complete JWT lifecycle: issue and verify"""
  24. payload = {"user_id": "123", "app_code": "test-app"}
  25. token = passport_service.issue(payload)
  26. # Verify token format
  27. assert isinstance(token, str)
  28. assert len(token.split(".")) == 3 # JWT format: header.payload.signature
  29. # Verify token content
  30. decoded = passport_service.verify(token)
  31. assert decoded == payload
  32. def test_should_handle_different_payload_types(self, passport_service):
  33. """Test issuing and verifying tokens with different payload types"""
  34. test_cases = [
  35. {"string": "value"},
  36. {"number": 42},
  37. {"float": 3.14},
  38. {"boolean": True},
  39. {"null": None},
  40. {"array": [1, 2, 3]},
  41. {"nested": {"key": "value"}},
  42. {"unicode": "中文测试"},
  43. {"emoji": "🔐"},
  44. {}, # Empty payload
  45. ]
  46. for payload in test_cases:
  47. token = passport_service.issue(payload)
  48. decoded = passport_service.verify(token)
  49. assert decoded == payload
  50. # Security tests
  51. def test_should_reject_modified_token(self, passport_service):
  52. """Test that any modification to token invalidates it"""
  53. token = passport_service.issue({"user": "test"})
  54. # Test multiple modification points
  55. test_positions = [0, len(token) // 3, len(token) // 2, len(token) - 1]
  56. for pos in test_positions:
  57. if pos < len(token) and token[pos] != ".":
  58. # Change one character
  59. tampered = token[:pos] + ("X" if token[pos] != "X" else "Y") + token[pos + 1 :]
  60. with pytest.raises(Unauthorized):
  61. passport_service.verify(tampered)
  62. def test_should_reject_token_with_different_secret_key(self, passport_service, another_passport_service):
  63. """Test key isolation - token from one service should not work with another"""
  64. payload = {"user_id": "123", "app_code": "test-app"}
  65. token = passport_service.issue(payload)
  66. with pytest.raises(Unauthorized) as exc_info:
  67. another_passport_service.verify(token)
  68. assert str(exc_info.value) == "401 Unauthorized: Invalid token signature."
  69. def test_should_use_hs256_algorithm(self, passport_service):
  70. """Test that HS256 algorithm is used for signing"""
  71. payload = {"test": "data"}
  72. token = passport_service.issue(payload)
  73. # Decode header without relying on JWT internals
  74. # Use jwt.get_unverified_header which is a public API
  75. header = jwt.get_unverified_header(token)
  76. assert header["alg"] == "HS256"
  77. def test_should_reject_token_with_wrong_algorithm(self, passport_service):
  78. """Test rejection of token signed with different algorithm"""
  79. payload = {"user_id": "123"}
  80. # Create token with different algorithm
  81. with patch("libs.passport.dify_config") as mock_config:
  82. mock_config.SECRET_KEY = "test-secret-key-for-testing"
  83. # Create token with HS512 instead of HS256
  84. wrong_alg_token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS512")
  85. # Should fail because service expects HS256
  86. # InvalidAlgorithmError is now caught by PyJWTError handler
  87. with pytest.raises(Unauthorized) as exc_info:
  88. passport_service.verify(wrong_alg_token)
  89. assert str(exc_info.value) == "401 Unauthorized: Invalid token."
  90. # Exception handling tests
  91. def test_should_handle_invalid_tokens(self, passport_service):
  92. """Test handling of various invalid token formats"""
  93. invalid_tokens = [
  94. ("not.a.token", "Invalid token."),
  95. ("invalid-jwt-format", "Invalid token."),
  96. ("xxx.yyy.zzz", "Invalid token."),
  97. ("a.b", "Invalid token."), # Missing signature
  98. ("", "Invalid token."), # Empty string
  99. (" ", "Invalid token."), # Whitespace
  100. (None, "Invalid token."), # None value
  101. # Malformed base64
  102. ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.INVALID_BASE64!@#$.signature", "Invalid token."),
  103. ]
  104. for invalid_token, expected_message in invalid_tokens:
  105. with pytest.raises(Unauthorized) as exc_info:
  106. passport_service.verify(invalid_token)
  107. assert expected_message in str(exc_info.value)
  108. def test_should_reject_expired_token(self, passport_service):
  109. """Test rejection of expired token"""
  110. past_time = datetime.now(UTC) - timedelta(hours=1)
  111. payload = {"user_id": "123", "exp": past_time.timestamp()}
  112. with patch("libs.passport.dify_config") as mock_config:
  113. mock_config.SECRET_KEY = "test-secret-key-for-testing"
  114. token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS256")
  115. with pytest.raises(Unauthorized) as exc_info:
  116. passport_service.verify(token)
  117. assert str(exc_info.value) == "401 Unauthorized: Token has expired."
  118. # Configuration tests
  119. def test_should_handle_empty_secret_key(self):
  120. """Test behavior when SECRET_KEY is empty"""
  121. with patch("libs.passport.dify_config") as mock_config:
  122. mock_config.SECRET_KEY = ""
  123. service = PassportService()
  124. # Empty secret key should still work but is insecure
  125. payload = {"test": "data"}
  126. token = service.issue(payload)
  127. decoded = service.verify(token)
  128. assert decoded == payload
  129. def test_should_handle_none_secret_key(self):
  130. """Test behavior when SECRET_KEY is None"""
  131. with patch("libs.passport.dify_config") as mock_config:
  132. mock_config.SECRET_KEY = None
  133. service = PassportService()
  134. payload = {"test": "data"}
  135. # JWT library will raise TypeError when secret is None
  136. with pytest.raises((TypeError, jwt.exceptions.InvalidKeyError)):
  137. service.issue(payload)
  138. # Boundary condition tests
  139. def test_should_handle_large_payload(self, passport_service):
  140. """Test handling of large payload"""
  141. # Test with 100KB instead of 1MB for faster tests
  142. large_data = "x" * (100 * 1024)
  143. payload = {"data": large_data}
  144. token = passport_service.issue(payload)
  145. decoded = passport_service.verify(token)
  146. assert decoded["data"] == large_data
  147. def test_should_handle_special_characters_in_payload(self, passport_service):
  148. """Test handling of special characters in payload"""
  149. special_payloads = [
  150. {"special": "!@#$%^&*()"},
  151. {"quotes": 'He said "Hello"'},
  152. {"backslash": "path\\to\\file"},
  153. {"newline": "line1\nline2"},
  154. {"unicode": "🔐🔑🛡️"},
  155. {"mixed": "Test123!@#中文🔐"},
  156. ]
  157. for payload in special_payloads:
  158. token = passport_service.issue(payload)
  159. decoded = passport_service.verify(token)
  160. assert decoded == payload
  161. def test_should_catch_generic_pyjwt_errors(self, passport_service):
  162. """Test that generic PyJWTError exceptions are caught and converted to Unauthorized"""
  163. # Mock jwt.decode to raise a generic PyJWTError
  164. with patch("libs.passport.jwt.decode") as mock_decode:
  165. mock_decode.side_effect = jwt.exceptions.PyJWTError("Generic JWT error")
  166. with pytest.raises(Unauthorized) as exc_info:
  167. passport_service.verify("some-token")
  168. assert str(exc_info.value) == "401 Unauthorized: Invalid token."