Currently, `WorkflowNodeExecution.execution_metadata_dict` returns `None` when metadata is absent in the database. This requires all callers to perform `None` checks when processing metadata, leading to more complex caller-side logic. This pull request updates the `execution_metadata_dict` method to return an empty dictionary instead of `None` when metadata is absent. This change would simplify the caller logic, as it removes the need for explicit `None` checks and provides a more consistent data structure to work with.tags/1.4.1
| inputs = db_model.inputs_dict | inputs = db_model.inputs_dict | ||||
| process_data = db_model.process_data_dict | process_data = db_model.process_data_dict | ||||
| outputs = db_model.outputs_dict | outputs = db_model.outputs_dict | ||||
| if db_model.execution_metadata_dict: | |||||
| metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} | |||||
| else: | |||||
| metadata = {} | |||||
| metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} | |||||
| # Convert status to domain enum | # Convert status to domain enum | ||||
| status = NodeExecutionStatus(db_model.status) | status = NodeExecutionStatus(db_model.status) |
| return json.loads(self.process_data) if self.process_data else None | return json.loads(self.process_data) if self.process_data else None | ||||
| @property | @property | ||||
| def execution_metadata_dict(self) -> dict[str, Any] | None: | |||||
| return json.loads(self.execution_metadata) if self.execution_metadata else None | |||||
| def execution_metadata_dict(self) -> dict[str, Any]: | |||||
| # When the metadata is unset, we return an empty dictionary instead of `None`. | |||||
| # This approach streamlines the logic for the caller, making it easier to handle | |||||
| # cases where metadata is absent. | |||||
| return json.loads(self.execution_metadata) if self.execution_metadata else {} | |||||
| @property | @property | ||||
| def extras(self): | def extras(self): |
| import json | |||||
| from unittest import mock | from unittest import mock | ||||
| from uuid import uuid4 | from uuid import uuid4 | ||||
| import contexts | import contexts | ||||
| from constants import HIDDEN_VALUE | from constants import HIDDEN_VALUE | ||||
| from core.variables import FloatVariable, IntegerVariable, SecretVariable, StringVariable | from core.variables import FloatVariable, IntegerVariable, SecretVariable, StringVariable | ||||
| from models.workflow import Workflow | |||||
| from models.workflow import Workflow, WorkflowNodeExecution | |||||
| def test_environment_variables(): | def test_environment_variables(): | ||||
| workflow_dict = workflow.to_dict(include_secret=True) | workflow_dict = workflow.to_dict(include_secret=True) | ||||
| assert workflow_dict["environment_variables"][0]["value"] == "secret" | assert workflow_dict["environment_variables"][0]["value"] == "secret" | ||||
| assert workflow_dict["environment_variables"][1]["value"] == "text" | assert workflow_dict["environment_variables"][1]["value"] == "text" | ||||
| class TestWorkflowNodeExecution: | |||||
| def test_execution_metadata_dict(self): | |||||
| node_exec = WorkflowNodeExecution() | |||||
| node_exec.execution_metadata = None | |||||
| assert node_exec.execution_metadata_dict == {} | |||||
| original = {"a": 1, "b": ["2"]} | |||||
| node_exec.execution_metadata = json.dumps(original) | |||||
| assert node_exec.execution_metadata_dict == original |