| ) | ) | ||||
| from core.app.task_pipeline.workflow_cycle_state_manager import WorkflowCycleStateManager | from core.app.task_pipeline.workflow_cycle_state_manager import WorkflowCycleStateManager | ||||
| from core.workflow.entities.node_entities import NodeType | from core.workflow.entities.node_entities import NodeType | ||||
| from core.workflow.workflow_engine_manager import WorkflowEngineManager | |||||
| from extensions.ext_database import db | from extensions.ext_database import db | ||||
| from models.workflow import ( | from models.workflow import ( | ||||
| WorkflowNodeExecution, | WorkflowNodeExecution, | ||||
| db.session.close() | db.session.close() | ||||
| def _handle_iteration_completed(self, event: QueueIterationCompletedEvent) -> WorkflowNodeExecution: | |||||
| def _handle_iteration_completed(self, event: QueueIterationCompletedEvent): | |||||
| if event.node_id not in self._iteration_state.current_iterations: | if event.node_id not in self._iteration_state.current_iterations: | ||||
| return | return | ||||
| ).first() | ).first() | ||||
| workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value | workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value | ||||
| workflow_node_execution.outputs = json.dumps(event.outputs) if event.outputs else None | |||||
| workflow_node_execution.outputs = json.dumps(WorkflowEngineManager.handle_special_values(event.outputs)) if event.outputs else None | |||||
| workflow_node_execution.elapsed_time = time.perf_counter() - current_iteration.started_at | workflow_node_execution.elapsed_time = time.perf_counter() - current_iteration.started_at | ||||
| original_node_execution_metadata = workflow_node_execution.execution_metadata_dict | original_node_execution_metadata = workflow_node_execution.execution_metadata_dict | ||||
| if original_node_execution_metadata: | if original_node_execution_metadata: | ||||
| original_node_execution_metadata['steps_boundary'] = current_iteration.iteration_steps_boundary | original_node_execution_metadata['steps_boundary'] = current_iteration.iteration_steps_boundary |