Ver código fonte

refactor(workflow): Rename NodeRunMetadataKey to WorkflowNodeExecutionMetadataKey (#20457)

Signed-off-by: -LAN- <laipz8200@outlook.com>
tags/1.4.2
-LAN- 5 meses atrás
pai
commit
32e779eef3
Nenhuma conta vinculada ao e-mail do autor do commit

+ 6
- 6
api/core/app/apps/common/workflow_response_converter.py Ver arquivo

) -> Optional[NodeStartStreamResponse]: ) -> Optional[NodeStartStreamResponse]:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None return None
if not workflow_node_execution.workflow_run_id:
if not workflow_node_execution.workflow_execution_id:
return None return None


response = NodeStartStreamResponse( response = NodeStartStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_node_execution.workflow_run_id,
workflow_run_id=workflow_node_execution.workflow_execution_id,
data=NodeStartStreamResponse.Data( data=NodeStartStreamResponse.Data(
id=workflow_node_execution.id, id=workflow_node_execution.id,
node_id=workflow_node_execution.node_id, node_id=workflow_node_execution.node_id,
) -> Optional[NodeFinishStreamResponse]: ) -> Optional[NodeFinishStreamResponse]:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None return None
if not workflow_node_execution.workflow_run_id:
if not workflow_node_execution.workflow_execution_id:
return None return None
if not workflow_node_execution.finished_at: if not workflow_node_execution.finished_at:
return None return None


return NodeFinishStreamResponse( return NodeFinishStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_node_execution.workflow_run_id,
workflow_run_id=workflow_node_execution.workflow_execution_id,
data=NodeFinishStreamResponse.Data( data=NodeFinishStreamResponse.Data(
id=workflow_node_execution.id, id=workflow_node_execution.id,
node_id=workflow_node_execution.node_id, node_id=workflow_node_execution.node_id,
) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]: ) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None return None
if not workflow_node_execution.workflow_run_id:
if not workflow_node_execution.workflow_execution_id:
return None return None
if not workflow_node_execution.finished_at: if not workflow_node_execution.finished_at:
return None return None


return NodeRetryStreamResponse( return NodeRetryStreamResponse(
task_id=task_id, task_id=task_id,
workflow_run_id=workflow_node_execution.workflow_run_id,
workflow_run_id=workflow_node_execution.workflow_execution_id,
data=NodeRetryStreamResponse.Data( data=NodeRetryStreamResponse.Data(
id=workflow_node_execution.id, id=workflow_node_execution.id,
node_id=workflow_node_execution.node_id, node_id=workflow_node_execution.node_id,

+ 2
- 2
api/core/app/apps/workflow_app_runner.py Ver arquivo

QueueWorkflowSucceededEvent, QueueWorkflowSucceededEvent,
) )
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
AgentLogEvent, AgentLogEvent,
GraphEngineEvent, GraphEngineEvent,
inputs: Mapping[str, Any] | None = {} inputs: Mapping[str, Any] | None = {}
process_data: Mapping[str, Any] | None = {} process_data: Mapping[str, Any] | None = {}
outputs: Mapping[str, Any] | None = {} outputs: Mapping[str, Any] | None = {}
execution_metadata: Mapping[NodeRunMetadataKey, Any] | None = {}
execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] | None = {}
if node_run_result: if node_run_result:
inputs = node_run_result.inputs inputs = node_run_result.inputs
process_data = node_run_result.process_data process_data = node_run_result.process_data

+ 7
- 7
api/core/app/entities/queue_entities.py Ver arquivo



from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from core.workflow.entities.node_entities import AgentNodeStrategyInit from core.workflow.entities.node_entities import AgentNodeStrategyInit
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.base import BaseNodeData from core.workflow.nodes.base import BaseNodeData
inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: Optional[str] = None error: Optional[str] = None
"""single iteration duration map""" """single iteration duration map"""
inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: str error: str
retry_index: int # retry index retry_index: int # retry index
inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: str error: str


inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: str error: str


inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: str error: str


inputs: Optional[Mapping[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None


error: str error: str



+ 3
- 3
api/core/app/entities/task_entities.py Ver arquivo

from core.model_runtime.entities.llm_entities import LLMResult from core.model_runtime.entities.llm_entities import LLMResult
from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import AgentNodeStrategyInit from core.workflow.entities.node_entities import AgentNodeStrategyInit
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus




class TaskState(BaseModel): class TaskState(BaseModel):
status: str status: str
error: Optional[str] = None error: Optional[str] = None
elapsed_time: float elapsed_time: float
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None
created_at: int created_at: int
finished_at: int finished_at: int
files: Optional[Sequence[Mapping[str, Any]]] = [] files: Optional[Sequence[Mapping[str, Any]]] = []
status: str status: str
error: Optional[str] = None error: Optional[str] = None
elapsed_time: float elapsed_time: float
execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None
created_at: int created_at: int
finished_at: int finished_at: int
files: Optional[Sequence[Mapping[str, Any]]] = [] files: Optional[Sequence[Mapping[str, Any]]] = []

+ 2
- 2
api/core/ops/langsmith_trace/langsmith_trace.py Ver arquivo

) )
from core.ops.utils import filter_none_values, generate_dotted_order from core.ops.utils import filter_none_values, generate_dotted_order
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from extensions.ext_database import db from extensions.ext_database import db
from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)


execution_metadata = node_execution.metadata if node_execution.metadata else {} execution_metadata = node_execution.metadata if node_execution.metadata else {}
node_total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0
node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0
metadata = {str(key): value for key, value in execution_metadata.items()} metadata = {str(key): value for key, value in execution_metadata.items()}
metadata.update( metadata.update(
{ {

+ 2
- 2
api/core/ops/opik_trace/opik_trace.py Ver arquivo

WorkflowTraceInfo, WorkflowTraceInfo,
) )
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from extensions.ext_database import db from extensions.ext_database import db
from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
parent_span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id parent_span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id


if not total_tokens: if not total_tokens:
total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0
total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0


span_data = { span_data = {
"trace_id": opik_trace_id, "trace_id": opik_trace_id,

+ 2
- 2
api/core/ops/weave_trace/weave_trace.py Ver arquivo

) )
from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from extensions.ext_database import db from extensions.ext_database import db
from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom
finished_at = created_at + timedelta(seconds=elapsed_time) finished_at = created_at + timedelta(seconds=elapsed_time)


execution_metadata = node_execution.metadata if node_execution.metadata else {} execution_metadata = node_execution.metadata if node_execution.metadata else {}
node_total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0
node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0
attributes = {str(k): v for k, v in execution_metadata.items()} attributes = {str(k): v for k, v in execution_metadata.items()}
attributes.update( attributes.update(
{ {

+ 4
- 4
api/core/repositories/sqlalchemy_workflow_node_execution_repository.py Ver arquivo

from core.model_runtime.utils.encoders import jsonable_encoder from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.workflow_node_execution import ( from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeRunMetadataKey,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
inputs = db_model.inputs_dict inputs = db_model.inputs_dict
process_data = db_model.process_data_dict process_data = db_model.process_data_dict
outputs = db_model.outputs_dict outputs = db_model.outputs_dict
metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()}
metadata = {WorkflowNodeExecutionMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()}


# Convert status to domain enum # Convert status to domain enum
status = WorkflowNodeExecutionStatus(db_model.status) status = WorkflowNodeExecutionStatus(db_model.status)
id=db_model.id, id=db_model.id,
node_execution_id=db_model.node_execution_id, node_execution_id=db_model.node_execution_id,
workflow_id=db_model.workflow_id, workflow_id=db_model.workflow_id,
workflow_run_id=db_model.workflow_run_id,
workflow_execution_id=db_model.workflow_run_id,
index=db_model.index, index=db_model.index,
predecessor_node_id=db_model.predecessor_node_id, predecessor_node_id=db_model.predecessor_node_id,
node_id=db_model.node_id, node_id=db_model.node_id,
db_model.app_id = self._app_id db_model.app_id = self._app_id
db_model.workflow_id = domain_model.workflow_id db_model.workflow_id = domain_model.workflow_id
db_model.triggered_from = self._triggered_from db_model.triggered_from = self._triggered_from
db_model.workflow_run_id = domain_model.workflow_run_id
db_model.workflow_run_id = domain_model.workflow_execution_id
db_model.index = domain_model.index db_model.index = domain_model.index
db_model.predecessor_node_id = domain_model.predecessor_node_id db_model.predecessor_node_id = domain_model.predecessor_node_id
db_model.node_execution_id = domain_model.node_execution_id db_model.node_execution_id = domain_model.node_execution_id

+ 2
- 2
api/core/workflow/entities/node_entities.py Ver arquivo

from pydantic import BaseModel from pydantic import BaseModel


from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.entities.llm_entities import LLMUsage
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus




class NodeRunResult(BaseModel): class NodeRunResult(BaseModel):
inputs: Optional[Mapping[str, Any]] = None # node inputs inputs: Optional[Mapping[str, Any]] = None # node inputs
process_data: Optional[Mapping[str, Any]] = None # process data process_data: Optional[Mapping[str, Any]] = None # process data
outputs: Optional[Mapping[str, Any]] = None # node outputs outputs: Optional[Mapping[str, Any]] = None # node outputs
metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None # node metadata
metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None # node metadata
llm_usage: Optional[LLMUsage] = None # llm usage llm_usage: Optional[LLMUsage] = None # llm usage


edge_source_handle: Optional[str] = None # source handle id of node with multiple branches edge_source_handle: Optional[str] = None # source handle id of node with multiple branches

+ 4
- 4
api/core/workflow/entities/workflow_node_execution.py Ver arquivo

from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType




class NodeRunMetadataKey(StrEnum):
class WorkflowNodeExecutionMetadataKey(StrEnum):
""" """
Node Run Metadata Key. Node Run Metadata Key.
""" """
id: str # Unique identifier for this execution record id: str # Unique identifier for this execution record
node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing
workflow_id: str # ID of the workflow this node belongs to workflow_id: str # ID of the workflow this node belongs to
workflow_run_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)
workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)


# Execution positioning and flow # Execution positioning and flow
index: int # Sequence number for ordering in trace visualization index: int # Sequence number for ordering in trace visualization
elapsed_time: float = Field(default=0.0) # Time taken for execution in seconds elapsed_time: float = Field(default=0.0) # Time taken for execution in seconds


# Additional metadata # Additional metadata
metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None # Execution metadata (tokens, cost, etc.)
metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None # Execution metadata (tokens, cost, etc.)


# Timing information # Timing information
created_at: datetime # When execution started created_at: datetime # When execution started
inputs: Optional[Mapping[str, Any]] = None, inputs: Optional[Mapping[str, Any]] = None,
process_data: Optional[Mapping[str, Any]] = None, process_data: Optional[Mapping[str, Any]] = None,
outputs: Optional[Mapping[str, Any]] = None, outputs: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None,
metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None,
) -> None: ) -> None:
""" """
Update the model from mappings. Update the model from mappings.

+ 15
- 9
api/core/workflow/graph_engine/graph_engine.py Ver arquivo

from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunResult from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunResult
from core.workflow.entities.variable_pool import VariablePool, VariableValue from core.workflow.entities.variable_pool import VariablePool, VariableValue
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph_engine.condition_handlers.condition_manager import ConditionManager from core.workflow.graph_engine.condition_handlers.condition_manager import ConditionManager
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
BaseAgentEvent, BaseAgentEvent,
and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH
): ):
run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS):
if run_result.metadata and run_result.metadata.get(
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS
):
# plus state total_tokens # plus state total_tokens
self.graph_runtime_state.total_tokens += int( self.graph_runtime_state.total_tokens += int(
run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) # type: ignore[arg-type]
run_result.metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) # type: ignore[arg-type]
) )


if run_result.llm_usage: if run_result.llm_usage:


if parallel_id and parallel_start_node_id: if parallel_id and parallel_start_node_id:
metadata_dict = dict(run_result.metadata) metadata_dict = dict(run_result.metadata)
metadata_dict[NodeRunMetadataKey.PARALLEL_ID] = parallel_id
metadata_dict[NodeRunMetadataKey.PARALLEL_START_NODE_ID] = parallel_start_node_id
metadata_dict[WorkflowNodeExecutionMetadataKey.PARALLEL_ID] = parallel_id
metadata_dict[WorkflowNodeExecutionMetadataKey.PARALLEL_START_NODE_ID] = (
parallel_start_node_id
)
if parent_parallel_id and parent_parallel_start_node_id: if parent_parallel_id and parent_parallel_start_node_id:
metadata_dict[NodeRunMetadataKey.PARENT_PARALLEL_ID] = parent_parallel_id
metadata_dict[NodeRunMetadataKey.PARENT_PARALLEL_START_NODE_ID] = (
parent_parallel_start_node_id
metadata_dict[WorkflowNodeExecutionMetadataKey.PARENT_PARALLEL_ID] = (
parent_parallel_id
) )
metadata_dict[
WorkflowNodeExecutionMetadataKey.PARENT_PARALLEL_START_NODE_ID
] = parent_parallel_start_node_id
run_result.metadata = metadata_dict run_result.metadata = metadata_dict


yield NodeRunSucceededEvent( yield NodeRunSucceededEvent(
"error": error_result.error, "error": error_result.error,
"inputs": error_result.inputs, "inputs": error_result.inputs,
"metadata": { "metadata": {
NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy,
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy,
}, },
} }



+ 7
- 7
api/core/workflow/nodes/iteration/iteration_node.py Ver arquivo

NodeRunResult, NodeRunResult,
) )
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
BaseGraphEvent, BaseGraphEvent,
BaseNodeEvent, BaseNodeEvent,
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"output": outputs}, outputs={"output": outputs},
metadata={ metadata={
NodeRunMetadataKey.ITERATION_DURATION_MAP: iter_run_map,
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: iter_run_map,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
}, },
) )
) )
event.parallel_mode_run_id = parallel_mode_run_id event.parallel_mode_run_id = parallel_mode_run_id


iter_metadata = { iter_metadata = {
NodeRunMetadataKey.ITERATION_ID: self.node_id,
NodeRunMetadataKey.ITERATION_INDEX: iter_run_index,
WorkflowNodeExecutionMetadataKey.ITERATION_ID: self.node_id,
WorkflowNodeExecutionMetadataKey.ITERATION_INDEX: iter_run_index,
} }
if parallel_mode_run_id: if parallel_mode_run_id:
# for parallel, the specific branch ID is more important than the sequential index # for parallel, the specific branch ID is more important than the sequential index
iter_metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id
iter_metadata[WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id


if event.route_node_state.node_run_result: if event.route_node_state.node_run_result:
current_metadata = event.route_node_state.node_run_result.metadata or {} current_metadata = event.route_node_state.node_run_result.metadata or {}
if NodeRunMetadataKey.ITERATION_ID not in current_metadata:
if WorkflowNodeExecutionMetadataKey.ITERATION_ID not in current_metadata:
event.route_node_state.node_run_result.metadata = {**current_metadata, **iter_metadata} event.route_node_state.node_run_result.metadata = {**current_metadata, **iter_metadata}


return event return event

+ 4
- 4
api/core/workflow/nodes/llm/node.py Ver arquivo

from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_entities import VariableSelector from core.workflow.entities.variable_entities import VariableSelector
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.graph_engine.entities.event import InNodeEvent
from core.workflow.nodes.base import BaseNode from core.workflow.nodes.base import BaseNode
process_data=process_data, process_data=process_data,
outputs=outputs, outputs=outputs,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens,
NodeRunMetadataKey.TOTAL_PRICE: usage.total_price,
NodeRunMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
}, },
llm_usage=usage, llm_usage=usage,
) )

+ 27
- 19
api/core/workflow/nodes/loop/loop_node.py Ver arquivo

StringSegment, StringSegment,
) )
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
BaseGraphEvent, BaseGraphEvent,
BaseNodeEvent, BaseNodeEvent,
outputs=self.node_data.outputs, outputs=self.node_data.outputs,
steps=loop_count, steps=loop_count,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
"completed_reason": "loop_break" if check_break_result else "loop_completed", "completed_reason": "loop_break" if check_break_result else "loop_completed",
NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
}, },
) )


run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
}, },
outputs=self.node_data.outputs, outputs=self.node_data.outputs,
inputs=inputs, inputs=inputs,
metadata={ metadata={
"total_tokens": graph_engine.graph_runtime_state.total_tokens, "total_tokens": graph_engine.graph_runtime_state.total_tokens,
"completed_reason": "error", "completed_reason": "error",
NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
}, },
error=str(e), error=str(e),
) )
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
error=str(e), error=str(e),
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map,
WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map,
}, },
) )
) )
inputs=inputs, inputs=inputs,
steps=current_index, steps=current_index,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: (
graph_engine.graph_runtime_state.total_tokens
),
"completed_reason": "error", "completed_reason": "error",
}, },
error=event.error, error=event.error,
run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
error=event.error, error=event.error,
metadata={NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens},
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: (
graph_engine.graph_runtime_state.total_tokens
)
},
) )
) )
return {"check_break_result": True} return {"check_break_result": True}
inputs=inputs, inputs=inputs,
steps=current_index, steps=current_index,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens,
"completed_reason": "error", "completed_reason": "error",
}, },
error=event.error, error=event.error,
run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
error=event.error, error=event.error,
metadata={NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens},
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens
},
) )
) )
return {"check_break_result": True} return {"check_break_result": True}
metadata = event.route_node_state.node_run_result.metadata metadata = event.route_node_state.node_run_result.metadata
if not metadata: if not metadata:
metadata = {} metadata = {}
if NodeRunMetadataKey.LOOP_ID not in metadata:
if WorkflowNodeExecutionMetadataKey.LOOP_ID not in metadata:
metadata = { metadata = {
**metadata, **metadata,
NodeRunMetadataKey.LOOP_ID: self.node_id,
NodeRunMetadataKey.LOOP_INDEX: iter_run_index,
WorkflowNodeExecutionMetadataKey.LOOP_ID: self.node_id,
WorkflowNodeExecutionMetadataKey.LOOP_INDEX: iter_run_index,
} }
event.route_node_state.node_run_result.metadata = metadata event.route_node_state.node_run_result.metadata = metadata
return event return event

+ 4
- 4
api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py Ver arquivo

from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from core.workflow.nodes.llm import LLMNode, ModelConfig from core.workflow.nodes.llm import LLMNode, ModelConfig
from core.workflow.utils import variable_template_parser from core.workflow.utils import variable_template_parser
process_data=process_data, process_data=process_data,
outputs={"__is_success": 1 if not error else 0, "__reason": error, **result}, outputs={"__is_success": 1 if not error else 0, "__reason": error, **result},
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens,
NodeRunMetadataKey.TOTAL_PRICE: usage.total_price,
NodeRunMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
}, },
llm_usage=usage, llm_usage=usage,
) )

+ 7
- 7
api/core/workflow/nodes/question_classifier/question_classifier_node.py Ver arquivo

from core.prompt.simple_prompt_transform import ModelMode from core.prompt.simple_prompt_transform import ModelMode
from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.prompt.utils.prompt_message_util import PromptMessageUtil
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from core.workflow.nodes.event import ModelInvokeCompletedEvent from core.workflow.nodes.event import ModelInvokeCompletedEvent
from core.workflow.nodes.llm import ( from core.workflow.nodes.llm import (
outputs=outputs, outputs=outputs,
edge_source_handle=category_id, edge_source_handle=category_id,
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens,
NodeRunMetadataKey.TOTAL_PRICE: usage.total_price,
NodeRunMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
}, },
llm_usage=usage, llm_usage=usage,
) )
inputs=variables, inputs=variables,
error=str(e), error=str(e),
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens,
NodeRunMetadataKey.TOTAL_PRICE: usage.total_price,
NodeRunMetadataKey.CURRENCY: usage.currency,
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price,
WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency,
}, },
llm_usage=usage, llm_usage=usage,
) )

+ 8
- 8
api/core/workflow/nodes/tool/tool_node.py Ver arquivo

from core.variables.variables import ArrayAnyVariable from core.variables.variables import ArrayAnyVariable
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.event import AgentLogEvent from core.workflow.graph_engine.entities.event import AgentLogEvent
from core.workflow.nodes.base import BaseNode from core.workflow.nodes.base import BaseNode
run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
inputs={}, inputs={},
metadata={NodeRunMetadataKey.TOOL_INFO: tool_info},
metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info},
error=f"Failed to get tool runtime: {str(e)}", error=f"Failed to get tool runtime: {str(e)}",
error_type=type(e).__name__, error_type=type(e).__name__,
) )
run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
inputs=parameters_for_log, inputs=parameters_for_log,
metadata={NodeRunMetadataKey.TOOL_INFO: tool_info},
metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info},
error=f"Failed to invoke tool: {str(e)}", error=f"Failed to invoke tool: {str(e)}",
error_type=type(e).__name__, error_type=type(e).__name__,
) )
run_result=NodeRunResult( run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED, status=WorkflowNodeExecutionStatus.FAILED,
inputs=parameters_for_log, inputs=parameters_for_log,
metadata={NodeRunMetadataKey.TOOL_INFO: tool_info},
metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info},
error=f"Failed to transform tool message: {str(e)}", error=f"Failed to transform tool message: {str(e)}",
error_type=type(e).__name__, error_type=type(e).__name__,
) )
json: list[dict] = [] json: list[dict] = []


agent_logs: list[AgentLogEvent] = [] agent_logs: list[AgentLogEvent] = []
agent_execution_metadata: Mapping[NodeRunMetadataKey, Any] = {}
agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {}


variables: dict[str, Any] = {} variables: dict[str, Any] = {}


agent_execution_metadata = { agent_execution_metadata = {
key: value key: value
for key, value in msg_metadata.items() for key, value in msg_metadata.items()
if key in NodeRunMetadataKey.__members__.values()
if key in WorkflowNodeExecutionMetadataKey.__members__.values()
} }
json.append(message.message.json_object) json.append(message.message.json_object)
elif message.type == ToolInvokeMessage.MessageType.LINK: elif message.type == ToolInvokeMessage.MessageType.LINK:
outputs={"text": text, "files": files, "json": json, **variables}, outputs={"text": text, "files": files, "json": json, **variables},
metadata={ metadata={
**agent_execution_metadata, **agent_execution_metadata,
NodeRunMetadataKey.TOOL_INFO: tool_info,
NodeRunMetadataKey.AGENT_LOG: agent_logs,
WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info,
WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs,
}, },
inputs=parameters_for_log, inputs=parameters_for_log,
) )

+ 10
- 10
api/core/workflow/workflow_cycle_manager.py Ver arquivo

from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.entities.workflow_node_execution import ( from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeRunMetadataKey,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
# Create a domain model # Create a domain model
created_at = datetime.now(UTC).replace(tzinfo=None) created_at = datetime.now(UTC).replace(tzinfo=None)
metadata = { metadata = {
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.LOOP_ID: event.in_loop_id,
WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
WorkflowNodeExecutionMetadataKey.ITERATION_ID: event.in_iteration_id,
WorkflowNodeExecutionMetadataKey.LOOP_ID: event.in_loop_id,
} }


domain_execution = NodeExecution( domain_execution = NodeExecution(
id=str(uuid4()), id=str(uuid4()),
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
workflow_run_id=workflow_execution.id_,
workflow_execution_id=workflow_execution.id_,
predecessor_node_id=event.predecessor_node_id, predecessor_node_id=event.predecessor_node_id,
index=event.node_run_index, index=event.node_run_index,
node_execution_id=event.node_execution_id, node_execution_id=event.node_execution_id,


# Convert metadata keys to strings # Convert metadata keys to strings
origin_metadata = { origin_metadata = {
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.LOOP_ID: event.in_loop_id,
WorkflowNodeExecutionMetadataKey.ITERATION_ID: event.in_iteration_id,
WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
WorkflowNodeExecutionMetadataKey.LOOP_ID: event.in_loop_id,
} }


# Convert execution metadata keys to strings # Convert execution metadata keys to strings
execution_metadata_dict: dict[NodeRunMetadataKey, str | None] = {}
execution_metadata_dict: dict[WorkflowNodeExecutionMetadataKey, str | None] = {}
if event.execution_metadata: if event.execution_metadata:
for key, value in event.execution_metadata.items(): for key, value in event.execution_metadata.items():
execution_metadata_dict[key] = value execution_metadata_dict[key] = value
domain_execution = NodeExecution( domain_execution = NodeExecution(
id=str(uuid4()), id=str(uuid4()),
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
workflow_run_id=workflow_execution.id_,
workflow_execution_id=workflow_execution.id_,
predecessor_node_id=event.predecessor_node_id, predecessor_node_id=event.predecessor_node_id,
node_execution_id=event.node_execution_id, node_execution_id=event.node_execution_id,
node_id=event.node_id, node_id=event.node_id,

+ 10
- 10
api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py Ver arquivo

from flask import Flask from flask import Flask


from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
process_data={}, process_data={},
outputs={}, outputs={},
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1,
WorkflowNodeExecutionMetadataKey.CURRENCY: "USD",
}, },
) )
) )
process_data={}, process_data={},
outputs={"class_name": "financial", "class_id": "1"}, outputs={"class_name": "financial", "class_id": "1"},
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1,
WorkflowNodeExecutionMetadataKey.CURRENCY: "USD",
}, },
edge_source_handle="1", edge_source_handle="1",
) )
process_data={}, process_data={},
outputs={"result": "dify 123"}, outputs={"result": "dify 123"},
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1,
WorkflowNodeExecutionMetadataKey.CURRENCY: "USD",
}, },
) )
) )

+ 4
- 4
api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py Ver arquivo

from unittest.mock import patch from unittest.mock import patch


from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.event import (
process_data={}, process_data={},
outputs={}, outputs={},
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1,
WorkflowNodeExecutionMetadataKey.CURRENCY: "USD",
}, },
) )
) )

+ 6
- 6
api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py Ver arquivo

from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType
from core.workflow.entities.workflow_node_execution import ( from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeRunMetadataKey,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey


# Verify the result # Verify the result
assert result.workflow_id == workflow_execution.workflow_id assert result.workflow_id == workflow_execution.workflow_id
assert result.workflow_run_id == workflow_execution.id_
assert result.workflow_execution_id == workflow_execution.id_
assert result.node_execution_id == event.node_execution_id assert result.node_execution_id == event.node_execution_id
assert result.node_id == event.node_id assert result.node_id == event.node_id
assert result.node_type == event.node_type assert result.node_type == event.node_type
event.inputs = {"input": "test input"} event.inputs = {"input": "test input"}
event.process_data = {"process": "test process"} event.process_data = {"process": "test process"}
event.outputs = {"output": "test output"} event.outputs = {"output": "test output"}
event.execution_metadata = {NodeRunMetadataKey.TOTAL_TOKENS: 100}
event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100}
event.start_at = datetime.now(UTC).replace(tzinfo=None) event.start_at = datetime.now(UTC).replace(tzinfo=None)


# Create a real node execution # Create a real node execution
id="test-node-execution-record-id", id="test-node-execution-record-id",
node_execution_id="test-node-execution-id", node_execution_id="test-node-execution-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_run_id="test-workflow-run-id",
workflow_execution_id="test-workflow-run-id",
index=1, index=1,
node_id="test-node-id", node_id="test-node-id",
node_type=NodeType.LLM, node_type=NodeType.LLM,
event.inputs = {"input": "test input"} event.inputs = {"input": "test input"}
event.process_data = {"process": "test process"} event.process_data = {"process": "test process"}
event.outputs = {"output": "test output"} event.outputs = {"output": "test output"}
event.execution_metadata = {NodeRunMetadataKey.TOTAL_TOKENS: 100}
event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100}
event.start_at = datetime.now(UTC).replace(tzinfo=None) event.start_at = datetime.now(UTC).replace(tzinfo=None)
event.error = "Test error message" event.error = "Test error message"


id="test-node-execution-record-id", id="test-node-execution-record-id",
node_execution_id="test-node-execution-id", node_execution_id="test-node-execution-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_run_id="test-workflow-run-id",
workflow_execution_id="test-workflow-run-id",
index=1, index=1,
node_id="test-node-id", node_id="test-node-id",
node_type=NodeType.LLM, node_type=NodeType.LLM,

+ 9
- 6
api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py Ver arquivo

from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.entities.workflow_node_execution import ( from core.workflow.entities.workflow_node_execution import (
NodeExecution, NodeExecution,
NodeRunMetadataKey,
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus, WorkflowNodeExecutionStatus,
) )
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
id="test-id", id="test-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
node_execution_id="test-node-execution-id", node_execution_id="test-node-execution-id",
workflow_run_id="test-workflow-run-id",
workflow_execution_id="test-workflow-run-id",
index=1, index=1,
predecessor_node_id="test-predecessor-id", predecessor_node_id="test-predecessor-id",
node_id="test-node-id", node_id="test-node-id",
status=WorkflowNodeExecutionStatus.RUNNING, status=WorkflowNodeExecutionStatus.RUNNING,
error=None, error=None,
elapsed_time=1.5, elapsed_time=1.5,
metadata={NodeRunMetadataKey.TOTAL_TOKENS: 100, NodeRunMetadataKey.TOTAL_PRICE: Decimal("0.0")},
metadata={
WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100,
WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: Decimal("0.0"),
},
created_at=datetime.now(), created_at=datetime.now(),
finished_at=None, finished_at=None,
) )
assert db_model.app_id == repository._app_id assert db_model.app_id == repository._app_id
assert db_model.workflow_id == domain_model.workflow_id assert db_model.workflow_id == domain_model.workflow_id
assert db_model.triggered_from == repository._triggered_from assert db_model.triggered_from == repository._triggered_from
assert db_model.workflow_run_id == domain_model.workflow_run_id
assert db_model.workflow_run_id == domain_model.workflow_execution_id
assert db_model.index == domain_model.index assert db_model.index == domain_model.index
assert db_model.predecessor_node_id == domain_model.predecessor_node_id assert db_model.predecessor_node_id == domain_model.predecessor_node_id
assert db_model.node_execution_id == domain_model.node_execution_id assert db_model.node_execution_id == domain_model.node_execution_id
inputs_dict = {"input_key": "input_value"} inputs_dict = {"input_key": "input_value"}
process_data_dict = {"process_key": "process_value"} process_data_dict = {"process_key": "process_value"}
outputs_dict = {"output_key": "output_value"} outputs_dict = {"output_key": "output_value"}
metadata_dict = {str(NodeRunMetadataKey.TOTAL_TOKENS): 100}
metadata_dict = {str(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS): 100}


# Create a DB model using our custom subclass # Create a DB model using our custom subclass
db_model = WorkflowNodeExecution() db_model = WorkflowNodeExecution()
assert isinstance(domain_model, NodeExecution) assert isinstance(domain_model, NodeExecution)
assert domain_model.id == db_model.id assert domain_model.id == db_model.id
assert domain_model.workflow_id == db_model.workflow_id assert domain_model.workflow_id == db_model.workflow_id
assert domain_model.workflow_run_id == db_model.workflow_run_id
assert domain_model.workflow_execution_id == db_model.workflow_run_id
assert domain_model.index == db_model.index assert domain_model.index == db_model.index
assert domain_model.predecessor_node_id == db_model.predecessor_node_id assert domain_model.predecessor_node_id == db_model.predecessor_node_id
assert domain_model.node_execution_id == db_model.node_execution_id assert domain_model.node_execution_id == db_model.node_execution_id

Carregando…
Cancelar
Salvar