Преглед на файлове

test(graph_engine): remove outdated tests

Signed-off-by: -LAN- <laipz8200@outlook.com>
tags/2.0.0-beta.1
-LAN- преди 2 месеца
родител
ревизия
635eff2e25
No account linked to committer's email address

+ 1
- 1
api/tests/unit_tests/core/workflow/graph_engine/test_complex_branch_workflow.py Целия файл

@@ -21,7 +21,6 @@ from .test_mock_config import MockConfigBuilder
from .test_table_runner import TableTestRunner, WorkflowTestCase


@pytest.mark.skip
class TestComplexBranchWorkflow:
"""Test suite for complex branch workflow with parallel execution."""

@@ -30,6 +29,7 @@ class TestComplexBranchWorkflow:
self.runner = TableTestRunner()
self.fixture_path = "test_complex_branch"

@pytest.mark.skip(reason="output in this workflow can be random")
def test_hello_branch_with_llm(self):
"""
Test when query contains 'hello' - should trigger true branch.

+ 7
- 3
api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py Целия файл

@@ -12,7 +12,7 @@ This module provides a robust table-driven testing framework with support for:

import logging
import time
from collections.abc import Callable
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
@@ -34,7 +34,11 @@ from core.workflow.entities.graph_init_params import GraphInitParams
from core.workflow.graph import Graph
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_events import GraphEngineEvent, GraphRunStartedEvent, GraphRunSucceededEvent
from core.workflow.graph_events import (
GraphEngineEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
from core.workflow.nodes.node_factory import DifyNodeFactory
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom
@@ -57,7 +61,7 @@ class WorkflowTestCase:
timeout: float = 30.0
mock_config: Optional[MockConfig] = None
use_auto_mock: bool = False
expected_event_sequence: Optional[list[type[GraphEngineEvent]]] = None
expected_event_sequence: Optional[Sequence[type[GraphEngineEvent]]] = None
tags: list[str] = field(default_factory=list)
skip: bool = False
skip_reason: str = ""

+ 6
- 7
api/tests/unit_tests/core/workflow/graph_engine/test_variable_aggregator.py Целия файл

@@ -9,13 +9,6 @@ from core.workflow.nodes.template_transform.template_transform_node import Templ
from .test_table_runner import TableTestRunner, WorkflowTestCase


def mock_template_transform_run(self):
"""Mock the TemplateTransformNode._run() method to return results based on node title."""
title = self._node_data.title
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs={}, outputs={"output": title})


@pytest.mark.skip
class TestVariableAggregator:
"""Test cases for the variable aggregator workflow."""

@@ -37,6 +30,12 @@ class TestVariableAggregator:
description: str,
) -> None:
"""Test all four combinations of switch1 and switch2."""

def mock_template_transform_run(self):
"""Mock the TemplateTransformNode._run() method to return results based on node title."""
title = self._node_data.title
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs={}, outputs={"output": title})

with patch.object(
TemplateTransformNode,
"_run",

+ 0
- 353
api/tests/unit_tests/core/workflow/nodes/http_request/test_http_request_node.py Целия файл

@@ -1,353 +0,0 @@
import httpx
import pytest

from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File, FileTransferMethod, FileType
from core.variables import ArrayFileVariable, FileVariable
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.nodes.answer.entities import AnswerStreamGenerateRoute
from core.workflow.nodes.end.entities import EndStreamParam
from core.workflow.nodes.http_request import (
BodyData,
HttpRequestNode,
HttpRequestNodeAuthorization,
HttpRequestNodeBody,
HttpRequestNodeData,
)
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom


@pytest.mark.skip(
reason="HTTP request tests use old Graph constructor incompatible with new queue-based engine - "
"needs rewrite for new architecture"
)
def test_http_request_node_binary_file(monkeypatch):
data = HttpRequestNodeData(
title="test",
method="post",
url="http://example.org/post",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="",
params="",
body=HttpRequestNodeBody(
type="binary",
data=[
BodyData(
key="file",
type="file",
value="",
file=["1111", "file"],
)
],
),
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
)
variable_pool.add(
["1111", "file"],
FileVariable(
name="file",
value=File(
tenant_id="1",
type=FileType.IMAGE,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="1111",
storage_key="",
),
),
)

node_config = {
"id": "1",
"data": data.model_dump(),
}

node = HttpRequestNode(
id="1",
config=node_config,
graph_init_params=GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config={},
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
),
graph=Graph(
root_node_id="1",
answer_stream_generate_routes=AnswerStreamGenerateRoute(
answer_dependencies={},
answer_generate_route={},
),
end_stream_param=EndStreamParam(
end_dependencies={},
end_stream_variable_selector_mapping={},
),
),
graph_runtime_state=GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
),
)

# Initialize node data
node.init_node_data(node_config["data"])
monkeypatch.setattr(
"core.workflow.nodes.http_request.executor.file_manager.download",
lambda *args, **kwargs: b"test",
)
monkeypatch.setattr(
"core.helper.ssrf_proxy.post",
lambda *args, **kwargs: httpx.Response(200, content=kwargs["content"]),
)
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["body"] == "test"


@pytest.mark.skip(
reason="HTTP request tests use old Graph constructor incompatible with new queue-based engine - "
"needs rewrite for new architecture"
)
def test_http_request_node_form_with_file(monkeypatch):
data = HttpRequestNodeData(
title="test",
method="post",
url="http://example.org/post",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="",
params="",
body=HttpRequestNodeBody(
type="form-data",
data=[
BodyData(
key="file",
type="file",
file=["1111", "file"],
),
BodyData(
key="name",
type="text",
value="test",
),
],
),
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
)
variable_pool.add(
["1111", "file"],
FileVariable(
name="file",
value=File(
tenant_id="1",
type=FileType.IMAGE,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="1111",
storage_key="",
),
),
)

node_config = {
"id": "1",
"data": data.model_dump(),
}

node = HttpRequestNode(
id="1",
config=node_config,
graph_init_params=GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config={},
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
),
graph=Graph(
root_node_id="1",
answer_stream_generate_routes=AnswerStreamGenerateRoute(
answer_dependencies={},
answer_generate_route={},
),
end_stream_param=EndStreamParam(
end_dependencies={},
end_stream_variable_selector_mapping={},
),
),
graph_runtime_state=GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
),
)

# Initialize node data
node.init_node_data(node_config["data"])

monkeypatch.setattr(
"core.workflow.nodes.http_request.executor.file_manager.download",
lambda *args, **kwargs: b"test",
)

def attr_checker(*args, **kwargs):
assert kwargs["data"] == {"name": "test"}
assert kwargs["files"] == [("file", (None, b"test", "application/octet-stream"))]
return httpx.Response(200, content=b"")

monkeypatch.setattr(
"core.helper.ssrf_proxy.post",
attr_checker,
)
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["body"] == ""


@pytest.mark.skip(
reason="HTTP request tests use old Graph constructor incompatible with new queue-based engine - "
"needs rewrite for new architecture"
)
def test_http_request_node_form_with_multiple_files(monkeypatch):
data = HttpRequestNodeData(
title="test",
method="post",
url="http://example.org/upload",
authorization=HttpRequestNodeAuthorization(type="no-auth"),
headers="",
params="",
body=HttpRequestNodeBody(
type="form-data",
data=[
BodyData(
key="files",
type="file",
file=["1111", "files"],
),
BodyData(
key="name",
type="text",
value="test",
),
],
),
)

variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
)

files = [
File(
tenant_id="1",
type=FileType.IMAGE,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="file1",
filename="image1.jpg",
mime_type="image/jpeg",
storage_key="",
),
File(
tenant_id="1",
type=FileType.DOCUMENT,
transfer_method=FileTransferMethod.LOCAL_FILE,
related_id="file2",
filename="document.pdf",
mime_type="application/pdf",
storage_key="",
),
]

variable_pool.add(
["1111", "files"],
ArrayFileVariable(
name="files",
value=files,
),
)

node_config = {
"id": "1",
"data": data.model_dump(),
}

node = HttpRequestNode(
id="1",
config=node_config,
graph_init_params=GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config={},
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
),
graph=Graph(
root_node_id="1",
answer_stream_generate_routes=AnswerStreamGenerateRoute(
answer_dependencies={},
answer_generate_route={},
),
end_stream_param=EndStreamParam(
end_dependencies={},
end_stream_variable_selector_mapping={},
),
),
graph_runtime_state=GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
),
)

# Initialize node data
node.init_node_data(node_config["data"])

monkeypatch.setattr(
"core.workflow.nodes.http_request.executor.file_manager.download",
lambda file: b"test_image_data" if file.mime_type == "image/jpeg" else b"test_pdf_data",
)

def attr_checker(*args, **kwargs):
assert kwargs["data"] == {"name": "test"}

assert len(kwargs["files"]) == 2
assert kwargs["files"][0][0] == "files"
assert kwargs["files"][1][0] == "files"

file_tuples = [f[1] for f in kwargs["files"]]
file_contents = [f[1] for f in file_tuples]
file_types = [f[2] for f in file_tuples]

assert b"test_image_data" in file_contents
assert b"test_pdf_data" in file_contents
assert "image/jpeg" in file_types
assert "application/pdf" in file_types

return httpx.Response(200, content=b'{"status":"success"}')

monkeypatch.setattr(
"core.helper.ssrf_proxy.post",
attr_checker,
)

result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["body"] == '{"status":"success"}'
print(result.outputs["body"])

+ 0
- 0
api/tests/unit_tests/core/workflow/nodes/iteration/__init__.py Целия файл


+ 0
- 909
api/tests/unit_tests/core/workflow/nodes/iteration/test_iteration.py Целия файл

@@ -1,909 +0,0 @@
import time
import uuid
from unittest.mock import patch

import pytest

from core.app.entities.app_invoke_entities import InvokeFrom
from core.variables.segments import ArrayAnySegment, ArrayStringSegment
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.node_events import NodeRunResult, StreamCompletedEvent
from core.workflow.nodes.iteration.entities import ErrorHandleMode
from core.workflow.nodes.iteration.iteration_node import IterationNode
from core.workflow.nodes.node_factory import DifyNodeFactory
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom


@pytest.mark.skip(
reason="Iteration nodes are part of Phase 3 container support - not yet implemented in new queue-based engine"
)
def test_run():
graph_config = {
"edges": [
{
"id": "start-source-pe-target",
"source": "start",
"target": "pe",
},
{
"id": "iteration-1-source-answer-3-target",
"source": "iteration-1",
"target": "answer-3",
},
{
"id": "tt-source-if-else-target",
"source": "tt",
"target": "if-else",
},
{
"id": "if-else-true-answer-2-target",
"source": "if-else",
"sourceHandle": "true",
"target": "answer-2",
},
{
"id": "if-else-false-answer-4-target",
"source": "if-else",
"sourceHandle": "false",
"target": "answer-4",
},
{
"id": "pe-source-iteration-1-target",
"source": "pe",
"target": "iteration-1",
},
],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "tt",
"title": "iteration",
"type": "iteration",
},
"id": "iteration-1",
},
{
"data": {
"answer": "{{#tt.output#}}",
"iteration_id": "iteration-1",
"title": "answer 2",
"type": "answer",
},
"id": "answer-2",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 123",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt",
},
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3",
},
{
"data": {
"conditions": [
{
"comparison_operator": "is",
"id": "1721916275284",
"value": "hi",
"variable_selector": ["sys", "query"],
}
],
"iteration_id": "iteration-1",
"logical_operator": "and",
"title": "if",
"type": "if-else",
},
"id": "if-else",
},
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4",
},
{
"data": {
"instruction": "test1",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "gpt-4o",
"provider": "openai",
},
"parameters": [
{"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
],
"query": ["sys", "query"],
"reasoning_mode": "prompt",
"title": "pe",
"type": "parameter-extractor",
},
"id": "pe",
},
],
}

init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)

# construct variable pool
pool = VariablePool(
system_variables=SystemVariable(
user_id="1",
files=[],
query="dify",
conversation_id="abababa",
),
user_inputs={},
environment_variables=[],
)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])

graph_runtime_state = GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter())
node_factory = DifyNodeFactory(
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)

node_config = {
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "tt",
"title": "迭代",
"type": "iteration",
},
"id": "iteration-1",
}

iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
config=node_config,
)

# Initialize node data
iteration_node.init_node_data(node_config["data"])

def tt_generator(self):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={"iterator_selector": "dify"},
outputs={"output": "dify 123"},
)

with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node
result = iteration_node._run()

count = 0
for item in result:
# print(type(item), item)
count += 1
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayStringSegment(value=["dify 123", "dify 123"])}

assert count == 20


@pytest.mark.skip(
reason="Iteration nodes are part of Phase 3 container support - not yet implemented in new queue-based engine"
)
def test_run_parallel():
graph_config = {
"edges": [
{
"id": "start-source-pe-target",
"source": "start",
"target": "pe",
},
{
"id": "iteration-1-source-answer-3-target",
"source": "iteration-1",
"target": "answer-3",
},
{
"id": "iteration-start-source-tt-target",
"source": "iteration-start",
"target": "tt",
},
{
"id": "iteration-start-source-tt-2-target",
"source": "iteration-start",
"target": "tt-2",
},
{
"id": "tt-source-if-else-target",
"source": "tt",
"target": "if-else",
},
{
"id": "tt-2-source-if-else-target",
"source": "tt-2",
"target": "if-else",
},
{
"id": "if-else-true-answer-2-target",
"source": "if-else",
"sourceHandle": "true",
"target": "answer-2",
},
{
"id": "if-else-false-answer-4-target",
"source": "if-else",
"sourceHandle": "false",
"target": "answer-4",
},
{
"id": "pe-source-iteration-1-target",
"source": "pe",
"target": "iteration-1",
},
],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "iteration",
"type": "iteration",
},
"id": "iteration-1",
},
{
"data": {
"answer": "{{#tt.output#}}",
"iteration_id": "iteration-1",
"title": "answer 2",
"type": "answer",
},
"id": "answer-2",
},
{
"data": {
"iteration_id": "iteration-1",
"title": "iteration-start",
"type": "iteration-start",
},
"id": "iteration-start",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 123",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 321",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt-2",
},
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3",
},
{
"data": {
"conditions": [
{
"comparison_operator": "is",
"id": "1721916275284",
"value": "hi",
"variable_selector": ["sys", "query"],
}
],
"iteration_id": "iteration-1",
"logical_operator": "and",
"title": "if",
"type": "if-else",
},
"id": "if-else",
},
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4",
},
{
"data": {
"instruction": "test1",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "gpt-4o",
"provider": "openai",
},
"parameters": [
{"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
],
"query": ["sys", "query"],
"reasoning_mode": "prompt",
"title": "pe",
"type": "parameter-extractor",
},
"id": "pe",
},
],
}

init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)

# construct variable pool
pool = VariablePool(
system_variables=SystemVariable(
user_id="1",
files=[],
query="dify",
conversation_id="abababa",
),
user_inputs={},
environment_variables=[],
)

graph_runtime_state = GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter())
node_factory = DifyNodeFactory(
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])

node_config = {
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "迭代",
"type": "iteration",
},
"id": "iteration-1",
}

iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
config=node_config,
)

# Initialize node data
iteration_node.init_node_data(node_config["data"])

def tt_generator(self):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={"iterator_selector": "dify"},
outputs={"output": "dify 123"},
)

with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node
result = iteration_node._run()

count = 0
for item in result:
count += 1
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayStringSegment(value=["dify 123", "dify 123"])}

assert count == 32


@pytest.mark.skip(
reason="Iteration nodes are part of Phase 3 container support - not yet implemented in new queue-based engine"
)
def test_iteration_run_in_parallel_mode():
graph_config = {
"edges": [
{
"id": "start-source-pe-target",
"source": "start",
"target": "pe",
},
{
"id": "iteration-1-source-answer-3-target",
"source": "iteration-1",
"target": "answer-3",
},
{
"id": "iteration-start-source-tt-target",
"source": "iteration-start",
"target": "tt",
},
{
"id": "iteration-start-source-tt-2-target",
"source": "iteration-start",
"target": "tt-2",
},
{
"id": "tt-source-if-else-target",
"source": "tt",
"target": "if-else",
},
{
"id": "tt-2-source-if-else-target",
"source": "tt-2",
"target": "if-else",
},
{
"id": "if-else-true-answer-2-target",
"source": "if-else",
"sourceHandle": "true",
"target": "answer-2",
},
{
"id": "if-else-false-answer-4-target",
"source": "if-else",
"sourceHandle": "false",
"target": "answer-4",
},
{
"id": "pe-source-iteration-1-target",
"source": "pe",
"target": "iteration-1",
},
],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "iteration",
"type": "iteration",
},
"id": "iteration-1",
},
{
"data": {
"answer": "{{#tt.output#}}",
"iteration_id": "iteration-1",
"title": "answer 2",
"type": "answer",
},
"id": "answer-2",
},
{
"data": {
"iteration_id": "iteration-1",
"title": "iteration-start",
"type": "iteration-start",
},
"id": "iteration-start",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 123",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }} 321",
"title": "template transform",
"type": "template-transform",
"variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
},
"id": "tt-2",
},
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3",
},
{
"data": {
"conditions": [
{
"comparison_operator": "is",
"id": "1721916275284",
"value": "hi",
"variable_selector": ["sys", "query"],
}
],
"iteration_id": "iteration-1",
"logical_operator": "and",
"title": "if",
"type": "if-else",
},
"id": "if-else",
},
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4",
},
{
"data": {
"instruction": "test1",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "gpt-4o",
"provider": "openai",
},
"parameters": [
{"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
],
"query": ["sys", "query"],
"reasoning_mode": "prompt",
"title": "pe",
"type": "parameter-extractor",
},
"id": "pe",
},
],
}

init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)

# construct variable pool
pool = VariablePool(
system_variables=SystemVariable(
user_id="1",
files=[],
query="dify",
conversation_id="abababa",
),
user_inputs={},
environment_variables=[],
)

graph_runtime_state = GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter())
node_factory = DifyNodeFactory(
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])

parallel_node_config = {
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "迭代",
"type": "iteration",
"is_parallel": True,
},
"id": "iteration-1",
}

parallel_iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
config=parallel_node_config,
)

# Initialize node data
parallel_iteration_node.init_node_data(parallel_node_config["data"])
sequential_node_config = {
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "迭代",
"type": "iteration",
"is_parallel": True,
},
"id": "iteration-1",
}

sequential_iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
config=sequential_node_config,
)

# Initialize node data
sequential_iteration_node.init_node_data(sequential_node_config["data"])

def tt_generator(self):
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={"iterator_selector": "dify"},
outputs={"output": "dify 123"},
)

with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node
parallel_result = parallel_iteration_node._run()
sequential_result = sequential_iteration_node._run()
assert parallel_iteration_node._node_data.parallel_nums == 10
assert parallel_iteration_node._node_data.error_handle_mode == ErrorHandleMode.TERMINATED
count = 0
parallel_arr = []
sequential_arr = []
for item in parallel_result:
count += 1
parallel_arr.append(item)
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayStringSegment(value=["dify 123", "dify 123"])}
assert count == 32

for item in sequential_result:
sequential_arr.append(item)
count += 1
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayStringSegment(value=["dify 123", "dify 123"])}
assert count == 64


@pytest.mark.skip(
reason="Iteration nodes are part of Phase 3 container support - not yet implemented in new queue-based engine"
)
def test_iteration_run_error_handle():
graph_config = {
"edges": [
{
"id": "start-source-pe-target",
"source": "start",
"target": "pe",
},
{
"id": "iteration-1-source-answer-3-target",
"source": "iteration-1",
"target": "answer-3",
},
{
"id": "tt-source-if-else-target",
"source": "iteration-start",
"target": "if-else",
},
{
"id": "if-else-true-answer-2-target",
"source": "if-else",
"sourceHandle": "true",
"target": "tt",
},
{
"id": "if-else-false-answer-4-target",
"source": "if-else",
"sourceHandle": "false",
"target": "tt2",
},
{
"id": "pe-source-iteration-1-target",
"source": "pe",
"target": "iteration-1",
},
],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt2", "output"],
"output_type": "array[string]",
"start_node_id": "if-else",
"title": "iteration",
"type": "iteration",
},
"id": "iteration-1",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1.split(arg2) }}",
"title": "template transform",
"type": "template-transform",
"variables": [
{"value_selector": ["iteration-1", "item"], "variable": "arg1"},
{"value_selector": ["iteration-1", "index"], "variable": "arg2"},
],
},
"id": "tt",
},
{
"data": {
"iteration_id": "iteration-1",
"template": "{{ arg1 }}",
"title": "template transform",
"type": "template-transform",
"variables": [
{"value_selector": ["iteration-1", "item"], "variable": "arg1"},
],
},
"id": "tt2",
},
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3",
},
{
"data": {
"iteration_id": "iteration-1",
"title": "iteration-start",
"type": "iteration-start",
},
"id": "iteration-start",
},
{
"data": {
"conditions": [
{
"comparison_operator": "is",
"id": "1721916275284",
"value": "1",
"variable_selector": ["iteration-1", "item"],
}
],
"iteration_id": "iteration-1",
"logical_operator": "and",
"title": "if",
"type": "if-else",
},
"id": "if-else",
},
{
"data": {
"instruction": "test1",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "gpt-4o",
"provider": "openai",
},
"parameters": [
{"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
],
"query": ["sys", "query"],
"reasoning_mode": "prompt",
"title": "pe",
"type": "parameter-extractor",
},
"id": "pe",
},
],
}

init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)

# construct variable pool
pool = VariablePool(
system_variables=SystemVariable(
user_id="1",
files=[],
query="dify",
conversation_id="abababa",
),
user_inputs={},
environment_variables=[],
)

graph_runtime_state = GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter())
node_factory = DifyNodeFactory(
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)
pool.add(["pe", "list_output"], ["1", "1"])
error_node_config = {
"data": {
"iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"],
"output_type": "array[string]",
"startNodeType": "template-transform",
"start_node_id": "iteration-start",
"title": "iteration",
"type": "iteration",
"is_parallel": True,
"error_handle_mode": ErrorHandleMode.CONTINUE_ON_ERROR,
},
"id": "iteration-1",
}

iteration_node = IterationNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph_runtime_state=graph_runtime_state,
config=error_node_config,
)

# Initialize node data
iteration_node.init_node_data(error_node_config["data"])
# execute continue on error node
result = iteration_node._run()
result_arr = []
count = 0
for item in result:
result_arr.append(item)
count += 1
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayAnySegment(value=[None, None])}

assert count == 14
# execute remove abnormal output
iteration_node._node_data.error_handle_mode = ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT
result = iteration_node._run()
count = 0
for item in result:
count += 1
if isinstance(item, StreamCompletedEvent):
assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.node_run_result.outputs == {"output": ArrayAnySegment(value=[])}
assert count == 14

+ 0
- 624
api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py Целия файл

@@ -1,624 +0,0 @@
import time
from unittest.mock import patch

import pytest

from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
from core.workflow.enums import (
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
from core.workflow.graph import Graph
from core.workflow.graph_engine import GraphEngine
from core.workflow.graph_engine.command_channels import InMemoryChannel
from core.workflow.graph_events import (
GraphRunPartialSucceededEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunStreamChunkEvent,
)
from core.workflow.node_events import NodeRunResult, StreamCompletedEvent
from core.workflow.nodes.llm.node import LLMNode
from core.workflow.nodes.node_factory import DifyNodeFactory
from core.workflow.system_variable import SystemVariable
from models.enums import UserFrom


class ContinueOnErrorTestHelper:
@staticmethod
def get_code_node(
code: str, error_strategy: str = "fail-branch", default_value: dict | None = None, retry_config: dict = {}
):
"""Helper method to create a code node configuration"""
node = {
"id": "node",
"data": {
"outputs": {"result": {"type": "number"}},
"error_strategy": error_strategy,
"title": "code",
"variables": [],
"code_language": "python3",
"code": "\n".join([line[4:] for line in code.split("\n")]),
"type": "code",
**retry_config,
},
}
if default_value:
node["data"]["default_value"] = default_value
return node

@staticmethod
def get_http_node(
error_strategy: str = "fail-branch",
default_value: dict | None = None,
authorization_success: bool = False,
retry_config: dict = {},
):
"""Helper method to create a http node configuration"""
authorization = (
{
"type": "api-key",
"config": {
"type": "basic",
"api_key": "ak-xxx",
"header": "api-key",
},
}
if authorization_success
else {
"type": "api-key",
# missing config field
}
)
node = {
"id": "node",
"data": {
"title": "http",
"desc": "",
"method": "get",
"url": "http://example.com",
"authorization": authorization,
"headers": "X-Header:123",
"params": "A:b",
"body": None,
"type": "http-request",
"error_strategy": error_strategy,
**retry_config,
},
}
if default_value:
node["data"]["default_value"] = default_value
return node

@staticmethod
def get_error_status_code_http_node(error_strategy: str = "fail-branch", default_value: dict | None = None):
"""Helper method to create a http node configuration"""
node = {
"id": "node",
"data": {
"type": "http-request",
"title": "HTTP Request",
"desc": "",
"variables": [],
"method": "get",
"url": "https://api.github.com/issues",
"authorization": {"type": "no-auth", "config": None},
"headers": "",
"params": "",
"body": {"type": "none", "data": []},
"timeout": {"max_connect_timeout": 0, "max_read_timeout": 0, "max_write_timeout": 0},
"error_strategy": error_strategy,
},
}
if default_value:
node["data"]["default_value"] = default_value
return node

@staticmethod
def get_tool_node(error_strategy: str = "fail-branch", default_value: dict | None = None):
"""Helper method to create a tool node configuration"""
node = {
"id": "node",
"data": {
"title": "a",
"desc": "a",
"provider_id": "maths",
"provider_type": "builtin",
"provider_name": "maths",
"tool_name": "eval_expression",
"tool_label": "eval_expression",
"tool_configurations": {},
"tool_parameters": {
"expression": {
"type": "variable",
"value": ["1", "123", "args1"],
}
},
"type": "tool",
"error_strategy": error_strategy,
},
}
if default_value:
node.node_data.default_value = default_value
return node

@staticmethod
def get_llm_node(error_strategy: str = "fail-branch", default_value: dict | None = None):
"""Helper method to create a llm node configuration"""
node = {
"id": "node",
"data": {
"title": "123",
"type": "llm",
"model": {"provider": "openai", "name": "gpt-3.5-turbo", "mode": "chat", "completion_params": {}},
"prompt_template": [
{"role": "system", "text": "you are a helpful assistant.\ntoday's weather is {{#abc.output#}}."},
{"role": "user", "text": "{{#sys.query#}}"},
],
"memory": None,
"context": {"enabled": False},
"vision": {"enabled": False},
"error_strategy": error_strategy,
},
}
if default_value:
node["data"]["default_value"] = default_value
return node

@staticmethod
def create_test_graph_engine(graph_config: dict, user_inputs: dict | None = None):
"""Helper method to create a graph engine instance for testing"""
# Create graph initialization parameters
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)

variable_pool = VariablePool(
system_variables=SystemVariable(
user_id="aaa",
files=[],
query="clear",
conversation_id="abababa",
),
user_inputs=user_inputs or {"uid": "takato"},
)

graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
node_factory = DifyNodeFactory(init_params, graph_runtime_state)
graph = Graph.init(graph_config=graph_config, node_factory=node_factory)

return GraphEngine(
tenant_id="111",
app_id="222",
workflow_id="333",
graph_config=graph_config,
user_id="444",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
graph_runtime_state=graph_runtime_state,
max_execution_steps=500,
max_execution_time=1200,
command_channel=InMemoryChannel(),
)


DEFAULT_VALUE_EDGE = [
{
"id": "start-source-node-target",
"source": "start",
"target": "node",
"sourceHandle": "source",
},
{
"id": "node-source-answer-target",
"source": "node",
"target": "answer",
"sourceHandle": "source",
},
]

FAIL_BRANCH_EDGES = [
{
"id": "start-source-node-target",
"source": "start",
"target": "node",
"sourceHandle": "source",
},
{
"id": "node-true-success-target",
"source": "node",
"target": "success",
"sourceHandle": "source",
},
{
"id": "node-false-error-target",
"source": "node",
"target": "error",
"sourceHandle": "fail-branch",
},
]


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_code_default_value_continue_on_error():
error_code = """
def main() -> dict:
return {
"result": 1 / 0,
}
"""

graph_config = {
"edges": DEFAULT_VALUE_EDGE,
"nodes": [
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"},
ContinueOnErrorTestHelper.get_code_node(
error_code, "default-value", [{"key": "result", "type": "number", "value": 132123}]
),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())
assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "132123"} for e in events)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_code_fail_branch_continue_on_error():
error_code = """
def main() -> dict:
return {
"result": 1 / 0,
}
"""

graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "node node run successfully"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "node node run failed"},
"id": "error",
},
ContinueOnErrorTestHelper.get_code_node(error_code),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1
assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "node node run failed"} for e in events
)


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_http_node_default_value_continue_on_error():
"""Test HTTP node with default value error strategy"""
graph_config = {
"edges": DEFAULT_VALUE_EDGE,
"nodes": [
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.response#}}"}, "id": "answer"},
ContinueOnErrorTestHelper.get_http_node(
"default-value", [{"key": "response", "type": "string", "value": "http node got error response"}]
),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "http node got error response"}
for e in events
)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_http_node_fail_branch_continue_on_error():
"""Test HTTP node with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "HTTP request failed"},
"id": "error",
},
ContinueOnErrorTestHelper.get_http_node(),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "HTTP request failed"} for e in events
)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


# def test_tool_node_default_value_continue_on_error():
# """Test tool node with default value error strategy"""
# graph_config = {
# "edges": DEFAULT_VALUE_EDGE,
# "nodes": [
# {"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
# {"data": {"title": "answer", "type": "answer", "answer": "{{#node.result#}}"}, "id": "answer"},
# ContinueOnErrorTestHelper.get_tool_node(
# "default-value", [{"key": "result", "type": "string", "value": "default tool result"}]
# ),
# ],
# }

# graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
# events = list(graph_engine.run())

# assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
# assert any(
# isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "default tool result"} for e in events # noqa: E501
# )
# assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


# def test_tool_node_fail_branch_continue_on_error():
# """Test HTTP node with fail-branch error strategy"""
# graph_config = {
# "edges": FAIL_BRANCH_EDGES,
# "nodes": [
# {"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
# {
# "data": {"title": "success", "type": "answer", "answer": "tool execute successful"},
# "id": "success",
# },
# {
# "data": {"title": "error", "type": "answer", "answer": "tool execute failed"},
# "id": "error",
# },
# ContinueOnErrorTestHelper.get_tool_node(),
# ],
# }

# graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
# events = list(graph_engine.run())

# assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
# assert any(
# isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "tool execute failed"} for e in events # noqa: E501
# )
# assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_llm_node_default_value_continue_on_error():
"""Test LLM node with default value error strategy"""
graph_config = {
"edges": DEFAULT_VALUE_EDGE,
"nodes": [
{"data": {"title": "start", "type": "start", "variables": []}, "id": "start"},
{"data": {"title": "answer", "type": "answer", "answer": "{{#node.answer#}}"}, "id": "answer"},
ContinueOnErrorTestHelper.get_llm_node(
"default-value", [{"key": "answer", "type": "string", "value": "default LLM response"}]
),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "default LLM response"} for e in events
)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_llm_node_fail_branch_continue_on_error():
"""Test LLM node with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "LLM request successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "LLM request failed"},
"id": "error",
},
ContinueOnErrorTestHelper.get_llm_node(),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "LLM request failed"} for e in events
)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_status_code_error_http_node_fail_branch_continue_on_error():
"""Test HTTP node with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "http execute successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "http execute failed"},
"id": "error",
},
ContinueOnErrorTestHelper.get_error_status_code_http_node(),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(
isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {"answer": "http execute failed"} for e in events
)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 1


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_variable_pool_error_type_variable():
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "http execute successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "http execute failed"},
"id": "error",
},
ContinueOnErrorTestHelper.get_error_status_code_http_node(),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
list(graph_engine.run())
error_message = graph_engine.graph_runtime_state.variable_pool.get(["node", "error_message"])
error_type = graph_engine.graph_runtime_state.variable_pool.get(["node", "error_type"])
assert error_message != None
assert error_type.value == "HTTPResponseCodeError"


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_no_node_in_fail_branch_continue_on_error():
"""Test HTTP node with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES[:-1],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, "id": "success"},
ContinueOnErrorTestHelper.get_http_node(),
],
}

graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
events = list(graph_engine.run())

assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {} for e in events)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 0


@pytest.mark.skip(
reason="Continue-on-error functionality is part of Phase 2 enhanced error handling - "
"not fully implemented in MVP of queue-based engine"
)
def test_stream_output_with_fail_branch_continue_on_error():
"""Test stream output with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "LLM request successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}"},
"id": "error",
},
ContinueOnErrorTestHelper.get_llm_node(),
],
}
graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)

def llm_generator(self):
contents = ["hi", "bye", "good morning"]

yield NodeRunStreamChunkEvent(
node_id=self.node_id,
node_type=self._node_type,
selector=[self.node_id, "text"],
chunk=contents[0],
is_final=False,
)

yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
process_data={},
outputs={},
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1,
WorkflowNodeExecutionMetadataKey.CURRENCY: "USD",
},
)
)

with patch.object(LLMNode, "_run", new=llm_generator):
events = list(graph_engine.run())
assert sum(isinstance(e, NodeRunStreamChunkEvent) for e in events) == 1
assert all(not isinstance(e, NodeRunFailedEvent | NodeRunExceptionEvent) for e in events)

+ 0
- 116
api/tests/unit_tests/core/workflow/nodes/tool/test_tool_node.py Целия файл

@@ -1,116 +0,0 @@
from collections.abc import Generator

import pytest

from core.app.entities.app_invoke_entities import InvokeFrom
from core.tools.entities.tool_entities import ToolInvokeMessage, ToolProviderType
from core.tools.errors import ToolInvokeError
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.node_events import NodeRunResult, StreamCompletedEvent
from core.workflow.nodes.answer.entities import AnswerStreamGenerateRoute
from core.workflow.nodes.end.entities import EndStreamParam
from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.system_variable import SystemVariable
from models import UserFrom


def _create_tool_node():
data = ToolNodeData(
title="Test Tool",
tool_parameters={},
provider_id="test_tool",
provider_type=ToolProviderType.WORKFLOW,
provider_name="test tool",
tool_name="test tool",
tool_label="test tool",
tool_configurations={},
plugin_unique_identifier=None,
desc="Exception handling test tool",
error_strategy=ErrorStrategy.FAIL_BRANCH,
version="1",
)
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
)
node_config = {
"id": "1",
"data": data.model_dump(),
}
node = ToolNode(
id="1",
config=node_config,
graph_init_params=GraphInitParams(
tenant_id="1",
app_id="1",
workflow_id="1",
graph_config={},
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.SERVICE_API,
call_depth=0,
),
graph=Graph(
root_node_id="1",
answer_stream_generate_routes=AnswerStreamGenerateRoute(
answer_dependencies={},
answer_generate_route={},
),
end_stream_param=EndStreamParam(
end_dependencies={},
end_stream_variable_selector_mapping={},
),
),
graph_runtime_state=GraphRuntimeState(
variable_pool=variable_pool,
start_at=0,
),
)
# Initialize node data
node.init_node_data(node_config["data"])
return node


class MockToolRuntime:
def get_merged_runtime_parameters(self):
pass


def mock_message_stream() -> Generator[ToolInvokeMessage, None, None]:
yield from []
raise ToolInvokeError("oops")


@pytest.mark.skip(
reason="Tool node test uses old Graph constructor incompatible with new queue-based engine - "
"needs rewrite for new architecture"
)
def test_tool_node_on_tool_invoke_error(monkeypatch: pytest.MonkeyPatch):
"""Ensure that ToolNode can handle ToolInvokeError when transforming
messages generated by ToolEngine.generic_invoke.
"""
tool_node = _create_tool_node()

# Need to patch ToolManager and ToolEngine so that we don't
# have to set up a database.
monkeypatch.setattr(
"core.tools.tool_manager.ToolManager.get_workflow_tool_runtime", lambda *args, **kwargs: MockToolRuntime()
)
monkeypatch.setattr(
"core.tools.tool_engine.ToolEngine.generic_invoke",
lambda *args, **kwargs: mock_message_stream(),
)

streams = list(tool_node._run())
assert len(streams) == 1
stream = streams[0]
assert isinstance(stream, StreamCompletedEvent)
result = stream.node_run_result
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert "oops" in result.error
assert "Failed to invoke tool" in result.error
assert result.error_type == "ToolInvokeError"

Loading…
Отказ
Запис