選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

utils.py 5.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import json
  2. from collections.abc import Generator
  3. from contextlib import AbstractContextManager
  4. import httpx
  5. import httpx_sse
  6. from httpx_sse import connect_sse
  7. from configs import dify_config
  8. from core.mcp.types import ErrorData, JSONRPCError
  9. from core.model_runtime.utils.encoders import jsonable_encoder
  10. HTTP_REQUEST_NODE_SSL_VERIFY = dify_config.HTTP_REQUEST_NODE_SSL_VERIFY
  11. STATUS_FORCELIST = [429, 500, 502, 503, 504]
  12. def create_ssrf_proxy_mcp_http_client(
  13. headers: dict[str, str] | None = None,
  14. timeout: httpx.Timeout | None = None,
  15. ) -> httpx.Client:
  16. """Create an HTTPX client with SSRF proxy configuration for MCP connections.
  17. Args:
  18. headers: Optional headers to include in the client
  19. timeout: Optional timeout configuration
  20. Returns:
  21. Configured httpx.Client with proxy settings
  22. """
  23. if dify_config.SSRF_PROXY_ALL_URL:
  24. return httpx.Client(
  25. verify=HTTP_REQUEST_NODE_SSL_VERIFY,
  26. headers=headers or {},
  27. timeout=timeout,
  28. follow_redirects=True,
  29. proxy=dify_config.SSRF_PROXY_ALL_URL,
  30. )
  31. elif dify_config.SSRF_PROXY_HTTP_URL and dify_config.SSRF_PROXY_HTTPS_URL:
  32. proxy_mounts = {
  33. "http://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTP_URL, verify=HTTP_REQUEST_NODE_SSL_VERIFY),
  34. "https://": httpx.HTTPTransport(
  35. proxy=dify_config.SSRF_PROXY_HTTPS_URL, verify=HTTP_REQUEST_NODE_SSL_VERIFY
  36. ),
  37. }
  38. return httpx.Client(
  39. verify=HTTP_REQUEST_NODE_SSL_VERIFY,
  40. headers=headers or {},
  41. timeout=timeout,
  42. follow_redirects=True,
  43. mounts=proxy_mounts,
  44. )
  45. else:
  46. return httpx.Client(
  47. verify=HTTP_REQUEST_NODE_SSL_VERIFY,
  48. headers=headers or {},
  49. timeout=timeout,
  50. follow_redirects=True,
  51. )
  52. def ssrf_proxy_sse_connect(url: str, **kwargs) -> AbstractContextManager[httpx_sse.EventSource]:
  53. """Connect to SSE endpoint with SSRF proxy protection.
  54. This function creates an SSE connection using the configured proxy settings
  55. to prevent SSRF attacks when connecting to external endpoints. It returns
  56. a context manager that yields an EventSource object for SSE streaming.
  57. The function handles HTTP client creation and cleanup automatically, but
  58. also accepts a pre-configured client via kwargs.
  59. Args:
  60. url (str): The SSE endpoint URL to connect to
  61. **kwargs: Additional arguments passed to the SSE connection, including:
  62. - client (httpx.Client, optional): Pre-configured HTTP client.
  63. If not provided, one will be created with SSRF protection.
  64. - method (str, optional): HTTP method to use, defaults to "GET"
  65. - headers (dict, optional): HTTP headers to include in the request
  66. - timeout (httpx.Timeout, optional): Timeout configuration for the connection
  67. Returns:
  68. AbstractContextManager[httpx_sse.EventSource]: A context manager that yields an EventSource
  69. object for SSE streaming. The EventSource provides access to server-sent events.
  70. Example:
  71. ```python
  72. with ssrf_proxy_sse_connect(url, headers=headers) as event_source:
  73. for sse in event_source.iter_sse():
  74. print(sse.event, sse.data)
  75. ```
  76. Note:
  77. If a client is not provided in kwargs, one will be automatically created
  78. with SSRF protection based on the application's configuration. If an
  79. exception occurs during connection, any automatically created client
  80. will be cleaned up automatically.
  81. """
  82. # Extract client if provided, otherwise create one
  83. client = kwargs.pop("client", None)
  84. if client is None:
  85. # Create client with SSRF proxy configuration
  86. timeout = kwargs.pop(
  87. "timeout",
  88. httpx.Timeout(
  89. timeout=dify_config.SSRF_DEFAULT_TIME_OUT,
  90. connect=dify_config.SSRF_DEFAULT_CONNECT_TIME_OUT,
  91. read=dify_config.SSRF_DEFAULT_READ_TIME_OUT,
  92. write=dify_config.SSRF_DEFAULT_WRITE_TIME_OUT,
  93. ),
  94. )
  95. headers = kwargs.pop("headers", {})
  96. client = create_ssrf_proxy_mcp_http_client(headers=headers, timeout=timeout)
  97. client_provided = False
  98. else:
  99. client_provided = True
  100. # Extract method if provided, default to GET
  101. method = kwargs.pop("method", "GET")
  102. try:
  103. return connect_sse(client, method, url, **kwargs)
  104. except Exception:
  105. # If we created the client, we need to clean it up on error
  106. if not client_provided:
  107. client.close()
  108. raise
  109. def create_mcp_error_response(
  110. request_id: int | str | None, code: int, message: str, data=None
  111. ) -> Generator[bytes, None, None]:
  112. """Create MCP error response"""
  113. error_data = ErrorData(code=code, message=message, data=data)
  114. json_response = JSONRPCError(
  115. jsonrpc="2.0",
  116. id=request_id or 1,
  117. error=error_data,
  118. )
  119. json_data = json.dumps(jsonable_encoder(json_response))
  120. sse_content = json_data.encode()
  121. yield sse_content