You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

variable_loader.py 3.5KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import abc
  2. from collections.abc import Mapping, Sequence
  3. from typing import Any, Protocol
  4. from core.variables import Variable
  5. from core.variables.consts import SELECTORS_LENGTH
  6. from core.workflow.entities.variable_pool import VariablePool
  7. class VariableLoader(Protocol):
  8. """Interface for loading variables based on selectors.
  9. A `VariableLoader` is responsible for retrieving additional variables required during the execution
  10. of a single node, which are not provided as user inputs.
  11. NOTE(QuantumGhost): Typically, all variables loaded by a `VariableLoader` should belong to the same
  12. application and share the same `app_id`. However, this interface does not enforce that constraint,
  13. and the `app_id` parameter is intentionally omitted from `load_variables` to achieve separation of
  14. concern and allow for flexible implementations.
  15. Implementations of `VariableLoader` should almost always have an `app_id` parameter in
  16. their constructor.
  17. TODO(QuantumGhost): this is a temporally workaround. If we can move the creation of node instance into
  18. `WorkflowService.single_step_run`, we may get rid of this interface.
  19. """
  20. @abc.abstractmethod
  21. def load_variables(self, selectors: list[list[str]]) -> list[Variable]:
  22. """Load variables based on the provided selectors. If the selectors are empty,
  23. this method should return an empty list.
  24. The order of the returned variables is not guaranteed. If the caller wants to ensure
  25. a specific order, they should sort the returned list themselves.
  26. :param: selectors: a list of string list, each inner list should have at least two elements:
  27. - the first element is the node ID,
  28. - the second element is the variable name.
  29. :return: a list of Variable objects that match the provided selectors.
  30. """
  31. pass
  32. class _DummyVariableLoader(VariableLoader):
  33. """A dummy implementation of VariableLoader that does not load any variables.
  34. Serves as a placeholder when no variable loading is needed.
  35. """
  36. def load_variables(self, selectors: list[list[str]]) -> list[Variable]:
  37. return []
  38. DUMMY_VARIABLE_LOADER = _DummyVariableLoader()
  39. def load_into_variable_pool(
  40. variable_loader: VariableLoader,
  41. variable_pool: VariablePool,
  42. variable_mapping: Mapping[str, Sequence[str]],
  43. user_inputs: Mapping[str, Any],
  44. ):
  45. # Loading missing variable from draft var here, and set it into
  46. # variable_pool.
  47. variables_to_load: list[list[str]] = []
  48. for key, selector in variable_mapping.items():
  49. # NOTE(QuantumGhost): this logic needs to be in sync with
  50. # `WorkflowEntry.mapping_user_inputs_to_variable_pool`.
  51. node_variable_list = key.split(".")
  52. if len(node_variable_list) < 2:
  53. raise ValueError(f"Invalid variable key: {key}. It should have at least two elements.")
  54. if key in user_inputs:
  55. continue
  56. node_variable_key = ".".join(node_variable_list[1:])
  57. if node_variable_key in user_inputs:
  58. continue
  59. if variable_pool.get(selector) is None:
  60. variables_to_load.append(list(selector))
  61. loaded = variable_loader.load_variables(variables_to_load)
  62. for var in loaded:
  63. assert len(var.selector) >= SELECTORS_LENGTH, f"Invalid variable {var}"
  64. # Add variable directly to the pool
  65. # The variable pool expects 2-element selectors [node_id, variable_name]
  66. variable_pool.add([var.selector[0], var.selector[1]], var)