|
|
|
@@ -262,26 +262,6 @@ def test_sse_client_queue_cleanup(): |
|
|
|
# Note: In real implementation, cleanup should put None to signal shutdown |
|
|
|
|
|
|
|
|
|
|
|
def test_sse_client_url_processing(): |
|
|
|
"""Test SSE client URL processing functions.""" |
|
|
|
from core.mcp.client.sse_client import remove_request_params |
|
|
|
|
|
|
|
# Test URL with parameters |
|
|
|
url_with_params = "http://example.com/sse?param1=value1¶m2=value2" |
|
|
|
cleaned_url = remove_request_params(url_with_params) |
|
|
|
assert cleaned_url == "http://example.com/sse" |
|
|
|
|
|
|
|
# Test URL without parameters |
|
|
|
url_without_params = "http://example.com/sse" |
|
|
|
cleaned_url = remove_request_params(url_without_params) |
|
|
|
assert cleaned_url == "http://example.com/sse" |
|
|
|
|
|
|
|
# Test URL with path and parameters |
|
|
|
complex_url = "http://example.com/path/to/sse?session=123&token=abc" |
|
|
|
cleaned_url = remove_request_params(complex_url) |
|
|
|
assert cleaned_url == "http://example.com/path/to/sse" |
|
|
|
|
|
|
|
|
|
|
|
def test_sse_client_headers_propagation(): |
|
|
|
"""Test that custom headers are properly propagated in SSE client.""" |
|
|
|
test_url = "http://test.example/sse" |