| @@ -10,6 +10,7 @@ import httpx | |||
| from configs import dify_config | |||
| from core.file import file_manager | |||
| from core.helper import ssrf_proxy | |||
| from core.variables.segments import ArrayFileSegment, FileSegment | |||
| from core.workflow.entities.variable_pool import VariablePool | |||
| from .entities import ( | |||
| @@ -57,7 +58,7 @@ class Executor: | |||
| params: list[tuple[str, str]] | None | |||
| content: str | bytes | None | |||
| data: Mapping[str, Any] | None | |||
| files: Mapping[str, tuple[str | None, bytes, str]] | None | |||
| files: list[tuple[str, tuple[str | None, bytes, str]]] | None | |||
| json: Any | |||
| headers: dict[str, str] | |||
| auth: HttpRequestNodeAuthorization | |||
| @@ -207,17 +208,38 @@ class Executor: | |||
| self.variable_pool.convert_template(item.key).text: item.file | |||
| for item in filter(lambda item: item.type == "file", data) | |||
| } | |||
| files: dict[str, Any] = {} | |||
| files = {k: self.variable_pool.get_file(selector) for k, selector in file_selectors.items()} | |||
| files = {k: v for k, v in files.items() if v is not None} | |||
| files = {k: variable.value for k, variable in files.items() if variable is not None} | |||
| files = { | |||
| k: (v.filename, file_manager.download(v), v.mime_type or "application/octet-stream") | |||
| for k, v in files.items() | |||
| if v.related_id is not None | |||
| } | |||
| # get files from file_selectors, add support for array file variables | |||
| files_list = [] | |||
| for key, selector in file_selectors.items(): | |||
| segment = self.variable_pool.get(selector) | |||
| if isinstance(segment, FileSegment): | |||
| files_list.append((key, [segment.value])) | |||
| elif isinstance(segment, ArrayFileSegment): | |||
| files_list.append((key, list(segment.value))) | |||
| # get files from file_manager | |||
| files: dict[str, list[tuple[str | None, bytes, str]]] = {} | |||
| for key, files_in_segment in files_list: | |||
| for file in files_in_segment: | |||
| if file.related_id is not None: | |||
| file_tuple = ( | |||
| file.filename, | |||
| file_manager.download(file), | |||
| file.mime_type or "application/octet-stream", | |||
| ) | |||
| if key not in files: | |||
| files[key] = [] | |||
| files[key].append(file_tuple) | |||
| # convert files to list for httpx request | |||
| if files: | |||
| self.files = [] | |||
| for key, file_tuples in files.items(): | |||
| for file_tuple in file_tuples: | |||
| self.files.append((key, file_tuple)) | |||
| self.data = form_data | |||
| self.files = files or None | |||
| def _assembling_headers(self) -> dict[str, Any]: | |||
| authorization = deepcopy(self.auth) | |||
| @@ -344,10 +366,16 @@ class Executor: | |||
| body_string = "" | |||
| if self.files: | |||
| for k, v in self.files.items(): | |||
| for key, (filename, content, mime_type) in self.files: | |||
| body_string += f"--{boundary}\r\n" | |||
| body_string += f'Content-Disposition: form-data; name="{k}"\r\n\r\n' | |||
| body_string += f"{v[1]}\r\n" | |||
| body_string += f'Content-Disposition: form-data; name="{key}"\r\n\r\n' | |||
| # decode content | |||
| try: | |||
| body_string += content.decode("utf-8") | |||
| except UnicodeDecodeError: | |||
| # fix: decode binary content | |||
| pass | |||
| body_string += "\r\n" | |||
| body_string += f"--{boundary}--\r\n" | |||
| elif self.node_data.body: | |||
| if self.content: | |||
| @@ -2,7 +2,7 @@ import httpx | |||
| from core.app.entities.app_invoke_entities import InvokeFrom | |||
| from core.file import File, FileTransferMethod, FileType | |||
| from core.variables import FileVariable | |||
| from core.variables import ArrayFileVariable, FileVariable | |||
| from core.workflow.entities.variable_pool import VariablePool | |||
| from core.workflow.graph_engine import Graph, GraphInitParams, GraphRuntimeState | |||
| from core.workflow.nodes.answer import AnswerStreamGenerateRoute | |||
| @@ -183,7 +183,7 @@ def test_http_request_node_form_with_file(monkeypatch): | |||
| def attr_checker(*args, **kwargs): | |||
| assert kwargs["data"] == {"name": "test"} | |||
| assert kwargs["files"] == {"file": (None, b"test", "application/octet-stream")} | |||
| assert kwargs["files"] == [("file", (None, b"test", "application/octet-stream"))] | |||
| return httpx.Response(200, content=b"") | |||
| monkeypatch.setattr( | |||
| @@ -194,3 +194,131 @@ def test_http_request_node_form_with_file(monkeypatch): | |||
| assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED | |||
| assert result.outputs is not None | |||
| assert result.outputs["body"] == "" | |||
| def test_http_request_node_form_with_multiple_files(monkeypatch): | |||
| data = HttpRequestNodeData( | |||
| title="test", | |||
| method="post", | |||
| url="http://example.org/upload", | |||
| authorization=HttpRequestNodeAuthorization(type="no-auth"), | |||
| headers="", | |||
| params="", | |||
| body=HttpRequestNodeBody( | |||
| type="form-data", | |||
| data=[ | |||
| BodyData( | |||
| key="files", | |||
| type="file", | |||
| file=["1111", "files"], | |||
| ), | |||
| BodyData( | |||
| key="name", | |||
| type="text", | |||
| value="test", | |||
| ), | |||
| ], | |||
| ), | |||
| ) | |||
| variable_pool = VariablePool( | |||
| system_variables={}, | |||
| user_inputs={}, | |||
| ) | |||
| files = [ | |||
| File( | |||
| tenant_id="1", | |||
| type=FileType.IMAGE, | |||
| transfer_method=FileTransferMethod.LOCAL_FILE, | |||
| related_id="file1", | |||
| filename="image1.jpg", | |||
| mime_type="image/jpeg", | |||
| storage_key="", | |||
| ), | |||
| File( | |||
| tenant_id="1", | |||
| type=FileType.DOCUMENT, | |||
| transfer_method=FileTransferMethod.LOCAL_FILE, | |||
| related_id="file2", | |||
| filename="document.pdf", | |||
| mime_type="application/pdf", | |||
| storage_key="", | |||
| ), | |||
| ] | |||
| variable_pool.add( | |||
| ["1111", "files"], | |||
| ArrayFileVariable( | |||
| name="files", | |||
| value=files, | |||
| ), | |||
| ) | |||
| node = HttpRequestNode( | |||
| id="1", | |||
| config={ | |||
| "id": "1", | |||
| "data": data.model_dump(), | |||
| }, | |||
| graph_init_params=GraphInitParams( | |||
| tenant_id="1", | |||
| app_id="1", | |||
| workflow_type=WorkflowType.WORKFLOW, | |||
| workflow_id="1", | |||
| graph_config={}, | |||
| user_id="1", | |||
| user_from=UserFrom.ACCOUNT, | |||
| invoke_from=InvokeFrom.SERVICE_API, | |||
| call_depth=0, | |||
| ), | |||
| graph=Graph( | |||
| root_node_id="1", | |||
| answer_stream_generate_routes=AnswerStreamGenerateRoute( | |||
| answer_dependencies={}, | |||
| answer_generate_route={}, | |||
| ), | |||
| end_stream_param=EndStreamParam( | |||
| end_dependencies={}, | |||
| end_stream_variable_selector_mapping={}, | |||
| ), | |||
| ), | |||
| graph_runtime_state=GraphRuntimeState( | |||
| variable_pool=variable_pool, | |||
| start_at=0, | |||
| ), | |||
| ) | |||
| monkeypatch.setattr( | |||
| "core.workflow.nodes.http_request.executor.file_manager.download", | |||
| lambda file: b"test_image_data" if file.mime_type == "image/jpeg" else b"test_pdf_data", | |||
| ) | |||
| def attr_checker(*args, **kwargs): | |||
| assert kwargs["data"] == {"name": "test"} | |||
| assert len(kwargs["files"]) == 2 | |||
| assert kwargs["files"][0][0] == "files" | |||
| assert kwargs["files"][1][0] == "files" | |||
| file_tuples = [f[1] for f in kwargs["files"]] | |||
| file_contents = [f[1] for f in file_tuples] | |||
| file_types = [f[2] for f in file_tuples] | |||
| assert b"test_image_data" in file_contents | |||
| assert b"test_pdf_data" in file_contents | |||
| assert "image/jpeg" in file_types | |||
| assert "application/pdf" in file_types | |||
| return httpx.Response(200, content=b'{"status":"success"}') | |||
| monkeypatch.setattr( | |||
| "core.helper.ssrf_proxy.post", | |||
| attr_checker, | |||
| ) | |||
| result = node._run() | |||
| assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED | |||
| assert result.outputs is not None | |||
| assert result.outputs["body"] == '{"status":"success"}' | |||
| print(result.outputs["body"]) | |||