You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.


  1. import json
  2. import logging
  3. from collections.abc import Mapping, Sequence
  4. from datetime import UTC, datetime
  5. from enum import Enum, StrEnum
  6. from typing import TYPE_CHECKING, Any, Optional, Union
  7. from uuid import uuid4
  8. from core.variables import utils as variable_utils
  9. from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
  10. from factories.variable_factory import build_segment
  11. if TYPE_CHECKING:
  12. from models.model import AppMode
  13. import sqlalchemy as sa
  14. from sqlalchemy import UniqueConstraint, func
  15. from sqlalchemy.orm import Mapped, mapped_column
  16. import contexts
  17. from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE
  18. from core.helper import encrypter
  19. from core.variables import SecretVariable, Segment, SegmentType, Variable
  20. from factories import variable_factory
  21. from libs import helper
  22. from .account import Account
  23. from .base import Base
  24. from .engine import db
  25. from .enums import CreatorUserRole, DraftVariableType
  26. from .types import EnumText, StringUUID
  27. _logger = logging.getLogger(__name__)
  28. if TYPE_CHECKING:
  29. from models.model import AppMode
  30. class WorkflowType(Enum):
  31. """
  32. Workflow Type Enum
  33. """
  34. WORKFLOW = "workflow"
  35. CHAT = "chat"
  36. @classmethod
  37. def value_of(cls, value: str) -> "WorkflowType":
  38. """
  39. Get value of given mode.
  40. :param value: mode value
  41. :return: mode
  42. """
  43. for mode in cls:
  44. if mode.value == value:
  45. return mode
  46. raise ValueError(f"invalid workflow type value {value}")
  47. @classmethod
  48. def from_app_mode(cls, app_mode: Union[str, "AppMode"]) -> "WorkflowType":
  49. """
  50. Get workflow type from app mode.
  51. :param app_mode: app mode
  52. :return: workflow type
  53. """
  54. from models.model import AppMode
  55. app_mode = app_mode if isinstance(app_mode, AppMode) else AppMode.value_of(app_mode)
  56. return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
  57. class Workflow(Base):
  58. """
  59. Workflow, for `Workflow App` and `Chat App workflow mode`.
  60. Attributes:
  61. - id (uuid) Workflow ID, pk
  62. - tenant_id (uuid) Workspace ID
  63. - app_id (uuid) App ID
  64. - type (string) Workflow type
  65. `workflow` for `Workflow App`
  66. `chat` for `Chat App workflow mode`
  67. - version (string) Version
  68. `draft` for draft version (only one for each app), other for version number (redundant)
  69. - graph (text) Workflow canvas configuration (JSON)
  70. The entire canvas configuration JSON, including Node, Edge, and other configurations
  71. - nodes (array[object]) Node list, see Node Schema
  72. - edges (array[object]) Edge list, see Edge Schema
  73. - created_by (uuid) Creator ID
  74. - created_at (timestamp) Creation time
  75. - updated_by (uuid) `optional` Last updater ID
  76. - updated_at (timestamp) `optional` Last update time
  77. """
  78. __tablename__ = "workflows"
  79. __table_args__ = (
  80. db.PrimaryKeyConstraint("id", name="workflow_pkey"),
  81. db.Index("workflow_version_idx", "tenant_id", "app_id", "version"),
  82. )
  83. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  84. tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  85. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  86. type: Mapped[str] = mapped_column(db.String(255), nullable=False)
  87. version: Mapped[str] = mapped_column(db.String(255), nullable=False)
  88. marked_name: Mapped[str] = mapped_column(default="", server_default="")
  89. marked_comment: Mapped[str] = mapped_column(default="", server_default="")
  90. graph: Mapped[str] = mapped_column(sa.Text)
  91. _features: Mapped[str] = mapped_column("features", sa.TEXT)
  92. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  93. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  94. updated_by: Mapped[Optional[str]] = mapped_column(StringUUID)
  95. updated_at: Mapped[datetime] = mapped_column(
  96. db.DateTime,
  97. nullable=False,
  98. default=datetime.now(UTC).replace(tzinfo=None),
  99. server_onupdate=func.current_timestamp(),
  100. )
  101. _environment_variables: Mapped[str] = mapped_column(
  102. "environment_variables", db.Text, nullable=False, server_default="{}"
  103. )
  104. _conversation_variables: Mapped[str] = mapped_column(
  105. "conversation_variables", db.Text, nullable=False, server_default="{}"
  106. )
  107. @classmethod
  108. def new(
  109. cls,
  110. *,
  111. tenant_id: str,
  112. app_id: str,
  113. type: str,
  114. version: str,
  115. graph: str,
  116. features: str,
  117. created_by: str,
  118. environment_variables: Sequence[Variable],
  119. conversation_variables: Sequence[Variable],
  120. marked_name: str = "",
  121. marked_comment: str = "",
  122. ) -> "Workflow":
  123. workflow = Workflow()
  124. workflow.id = str(uuid4())
  125. workflow.tenant_id = tenant_id
  126. workflow.app_id = app_id
  127. workflow.type = type
  128. workflow.version = version
  129. workflow.graph = graph
  130. workflow.features = features
  131. workflow.created_by = created_by
  132. workflow.environment_variables = environment_variables or []
  133. workflow.conversation_variables = conversation_variables or []
  134. workflow.marked_name = marked_name
  135. workflow.marked_comment = marked_comment
  136. workflow.created_at = datetime.now(UTC).replace(tzinfo=None)
  137. workflow.updated_at = workflow.created_at
  138. return workflow
  139. @property
  140. def created_by_account(self):
  141. return db.session.get(Account, self.created_by)
  142. @property
  143. def updated_by_account(self):
  144. return db.session.get(Account, self.updated_by) if self.updated_by else None
  145. @property
  146. def graph_dict(self) -> Mapping[str, Any]:
  147. return json.loads(self.graph) if self.graph else {}
  148. @property
  149. def features(self) -> str:
  150. """
  151. Convert old features structure to new features structure.
  152. """
  153. if not self._features:
  154. return self._features
  155. features = json.loads(self._features)
  156. if features.get("file_upload", {}).get("image", {}).get("enabled", False):
  157. image_enabled = True
  158. image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
  159. image_transfer_methods = features["file_upload"]["image"].get(
  160. "transfer_methods", ["remote_url", "local_file"]
  161. )
  162. features["file_upload"]["enabled"] = image_enabled
  163. features["file_upload"]["number_limits"] = image_number_limits
  164. features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
  165. features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
  166. features["file_upload"]["allowed_file_extensions"] = []
  167. del features["file_upload"]["image"]
  168. self._features = json.dumps(features)
  169. return self._features
  170. @features.setter
  171. def features(self, value: str) -> None:
  172. self._features = value
  173. @property
  174. def features_dict(self) -> dict[str, Any]:
  175. return json.loads(self.features) if self.features else {}
  176. def user_input_form(self, to_old_structure: bool = False) -> list:
  177. # get start node from graph
  178. if not self.graph:
  179. return []
  180. graph_dict = self.graph_dict
  181. if "nodes" not in graph_dict:
  182. return []
  183. start_node = next((node for node in graph_dict["nodes"] if node["data"]["type"] == "start"), None)
  184. if not start_node:
  185. return []
  186. # get user_input_form from start node
  187. variables: list[Any] = start_node.get("data", {}).get("variables", [])
  188. if to_old_structure:
  189. old_structure_variables = []
  190. for variable in variables:
  191. old_structure_variables.append({variable["type"]: variable})
  192. return old_structure_variables
  193. return variables
  194. @property
  195. def unique_hash(self) -> str:
  196. """
  197. Get hash of workflow.
  198. :return: hash
  199. """
  200. entity = {"graph": self.graph_dict, "features": self.features_dict}
  201. return helper.generate_text_hash(json.dumps(entity, sort_keys=True))
  202. @property
  203. def tool_published(self) -> bool:
  204. """
  205. DEPRECATED: This property is not accurate for determining if a workflow is published as a tool.
  206. It only checks if there's a WorkflowToolProvider for the app, not if this specific workflow version
  207. is the one being used by the tool.
  208. For accurate checking, use a direct query with tenant_id, app_id, and version.
  209. """
  210. from models.tools import WorkflowToolProvider
  211. return (
  212. db.session.query(WorkflowToolProvider)
  213. .filter(WorkflowToolProvider.tenant_id == self.tenant_id, WorkflowToolProvider.app_id == self.app_id)
  214. .count()
  215. > 0
  216. )
  217. @property
  218. def environment_variables(self) -> Sequence[Variable]:
  219. # TODO: find some way to init `self._environment_variables` when instance created.
  220. if self._environment_variables is None:
  221. self._environment_variables = "{}"
  222. tenant_id = contexts.tenant_id.get()
  223. environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables)
  224. results = [
  225. variable_factory.build_environment_variable_from_mapping(v) for v in environment_variables_dict.values()
  226. ]
  227. # decrypt secret variables value
  228. def decrypt_func(var):
  229. if isinstance(var, SecretVariable):
  230. return var.model_copy(update={"value": encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)})
  231. else:
  232. return var
  233. results = list(map(decrypt_func, results))
  234. return results
  235. @environment_variables.setter
  236. def environment_variables(self, value: Sequence[Variable]):
  237. if not value:
  238. self._environment_variables = "{}"
  239. return
  240. tenant_id = contexts.tenant_id.get()
  241. value = list(value)
  242. if any(var for var in value if not var.id):
  243. raise ValueError("environment variable require a unique id")
  244. # Compare inputs and origin variables,
  245. # if the value is HIDDEN_VALUE, use the origin variable value (only update `name`).
  246. origin_variables_dictionary = {var.id: var for var in self.environment_variables}
  247. for i, variable in enumerate(value):
  248. if variable.id in origin_variables_dictionary and variable.value == HIDDEN_VALUE:
  249. value[i] = origin_variables_dictionary[variable.id].model_copy(update={"name": variable.name})
  250. # encrypt secret variables value
  251. def encrypt_func(var):
  252. if isinstance(var, SecretVariable):
  253. return var.model_copy(update={"value": encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)})
  254. else:
  255. return var
  256. encrypted_vars = list(map(encrypt_func, value))
  257. environment_variables_json = json.dumps(
  258. {var.name: var.model_dump() for var in encrypted_vars},
  259. ensure_ascii=False,
  260. )
  261. self._environment_variables = environment_variables_json
  262. def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
  263. environment_variables = list(self.environment_variables)
  264. environment_variables = [
  265. v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={"value": ""})
  266. for v in environment_variables
  267. ]
  268. result = {
  269. "graph": self.graph_dict,
  270. "features": self.features_dict,
  271. "environment_variables": [var.model_dump(mode="json") for var in environment_variables],
  272. "conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
  273. }
  274. return result
  275. @property
  276. def conversation_variables(self) -> Sequence[Variable]:
  277. # TODO: find some way to init `self._conversation_variables` when instance created.
  278. if self._conversation_variables is None:
  279. self._conversation_variables = "{}"
  280. variables_dict: dict[str, Any] = json.loads(self._conversation_variables)
  281. results = [variable_factory.build_conversation_variable_from_mapping(v) for v in variables_dict.values()]
  282. return results
  283. @conversation_variables.setter
  284. def conversation_variables(self, value: Sequence[Variable]) -> None:
  285. self._conversation_variables = json.dumps(
  286. {var.name: var.model_dump() for var in value},
  287. ensure_ascii=False,
  288. )
  289. class WorkflowRunStatus(StrEnum):
  290. """
  291. Workflow Run Status Enum
  292. """
  293. RUNNING = "running"
  294. SUCCEEDED = "succeeded"
  295. FAILED = "failed"
  296. STOPPED = "stopped"
  297. PARTIAL_SUCCEEDED = "partial-succeeded"
  298. class WorkflowRun(Base):
  299. """
  300. Workflow Run
  301. Attributes:
  302. - id (uuid) Run ID
  303. - tenant_id (uuid) Workspace ID
  304. - app_id (uuid) App ID
  305. - sequence_number (int) Auto-increment sequence number, incremented within the App, starting from 1
  306. - workflow_id (uuid) Workflow ID
  307. - type (string) Workflow type
  308. - triggered_from (string) Trigger source
  309. `debugging` for canvas debugging
  310. `app-run` for (published) app execution
  311. - version (string) Version
  312. - graph (text) Workflow canvas configuration (JSON)
  313. - inputs (text) Input parameters
  314. - status (string) Execution status, `running` / `succeeded` / `failed` / `stopped`
  315. - outputs (text) `optional` Output content
  316. - error (string) `optional` Error reason
  317. - elapsed_time (float) `optional` Time consumption (s)
  318. - total_tokens (int) `optional` Total tokens used
  319. - total_steps (int) Total steps (redundant), default 0
  320. - created_by_role (string) Creator role
  321. - `account` Console account
  322. - `end_user` End user
  323. - created_by (uuid) Runner ID
  324. - created_at (timestamp) Run time
  325. - finished_at (timestamp) End time
  326. """
  327. __tablename__ = "workflow_runs"
  328. __table_args__ = (
  329. db.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
  330. db.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
  331. db.Index("workflow_run_tenant_app_sequence_idx", "tenant_id", "app_id", "sequence_number"),
  332. )
  333. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  334. tenant_id: Mapped[str] = mapped_column(StringUUID)
  335. app_id: Mapped[str] = mapped_column(StringUUID)
  336. sequence_number: Mapped[int] = mapped_column()
  337. workflow_id: Mapped[str] = mapped_column(StringUUID)
  338. type: Mapped[str] = mapped_column(db.String(255))
  339. triggered_from: Mapped[str] = mapped_column(db.String(255))
  340. version: Mapped[str] = mapped_column(db.String(255))
  341. graph: Mapped[Optional[str]] = mapped_column(db.Text)
  342. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  343. status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded
  344. outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}")
  345. error: Mapped[Optional[str]] = mapped_column(db.Text)
  346. elapsed_time = db.Column(db.Float, nullable=False, server_default=sa.text("0"))
  347. total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0"))
  348. total_steps = db.Column(db.Integer, server_default=db.text("0"))
  349. created_by_role: Mapped[str] = mapped_column(db.String(255)) # account, end_user
  350. created_by = db.Column(StringUUID, nullable=False)
  351. created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  352. finished_at = db.Column(db.DateTime)
  353. exceptions_count = db.Column(db.Integer, server_default=db.text("0"))
  354. @property
  355. def created_by_account(self):
  356. created_by_role = CreatorUserRole(self.created_by_role)
  357. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  358. @property
  359. def created_by_end_user(self):
  360. from models.model import EndUser
  361. created_by_role = CreatorUserRole(self.created_by_role)
  362. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  363. @property
  364. def graph_dict(self):
  365. return json.loads(self.graph) if self.graph else {}
  366. @property
  367. def inputs_dict(self) -> Mapping[str, Any]:
  368. return json.loads(self.inputs) if self.inputs else {}
  369. @property
  370. def outputs_dict(self) -> Mapping[str, Any]:
  371. return json.loads(self.outputs) if self.outputs else {}
  372. @property
  373. def message(self):
  374. from models.model import Message
  375. return (
  376. db.session.query(Message).filter(Message.app_id == self.app_id, Message.workflow_run_id == self.id).first()
  377. )
  378. @property
  379. def workflow(self):
  380. return db.session.query(Workflow).filter(Workflow.id == self.workflow_id).first()
  381. def to_dict(self):
  382. return {
  383. "id": self.id,
  384. "tenant_id": self.tenant_id,
  385. "app_id": self.app_id,
  386. "sequence_number": self.sequence_number,
  387. "workflow_id": self.workflow_id,
  388. "type": self.type,
  389. "triggered_from": self.triggered_from,
  390. "version": self.version,
  391. "graph": self.graph_dict,
  392. "inputs": self.inputs_dict,
  393. "status": self.status,
  394. "outputs": self.outputs_dict,
  395. "error": self.error,
  396. "elapsed_time": self.elapsed_time,
  397. "total_tokens": self.total_tokens,
  398. "total_steps": self.total_steps,
  399. "created_by_role": self.created_by_role,
  400. "created_by": self.created_by,
  401. "created_at": self.created_at,
  402. "finished_at": self.finished_at,
  403. "exceptions_count": self.exceptions_count,
  404. }
  405. @classmethod
  406. def from_dict(cls, data: dict) -> "WorkflowRun":
  407. return cls(
  408. id=data.get("id"),
  409. tenant_id=data.get("tenant_id"),
  410. app_id=data.get("app_id"),
  411. sequence_number=data.get("sequence_number"),
  412. workflow_id=data.get("workflow_id"),
  413. type=data.get("type"),
  414. triggered_from=data.get("triggered_from"),
  415. version=data.get("version"),
  416. graph=json.dumps(data.get("graph")),
  417. inputs=json.dumps(data.get("inputs")),
  418. status=data.get("status"),
  419. outputs=json.dumps(data.get("outputs")),
  420. error=data.get("error"),
  421. elapsed_time=data.get("elapsed_time"),
  422. total_tokens=data.get("total_tokens"),
  423. total_steps=data.get("total_steps"),
  424. created_by_role=data.get("created_by_role"),
  425. created_by=data.get("created_by"),
  426. created_at=data.get("created_at"),
  427. finished_at=data.get("finished_at"),
  428. exceptions_count=data.get("exceptions_count"),
  429. )
  430. class WorkflowNodeExecutionTriggeredFrom(StrEnum):
  431. """
  432. Workflow Node Execution Triggered From Enum
  433. """
  434. SINGLE_STEP = "single-step"
  435. WORKFLOW_RUN = "workflow-run"
  436. class WorkflowNodeExecutionStatus(StrEnum):
  437. """
  438. Workflow Node Execution Status Enum
  439. """
  440. RUNNING = "running"
  441. SUCCEEDED = "succeeded"
  442. FAILED = "failed"
  443. EXCEPTION = "exception"
  444. RETRY = "retry"
  445. class WorkflowNodeExecution(Base):
  446. """
  447. Workflow Node Execution
  448. - id (uuid) Execution ID
  449. - tenant_id (uuid) Workspace ID
  450. - app_id (uuid) App ID
  451. - workflow_id (uuid) Workflow ID
  452. - triggered_from (string) Trigger source
  453. `single-step` for single-step debugging
  454. `workflow-run` for workflow execution (debugging / user execution)
  455. - workflow_run_id (uuid) `optional` Workflow run ID
  456. Null for single-step debugging.
  457. - index (int) Execution sequence number, used for displaying Tracing Node order
  458. - predecessor_node_id (string) `optional` Predecessor node ID, used for displaying execution path
  459. - node_id (string) Node ID
  460. - node_type (string) Node type, such as `start`
  461. - title (string) Node title
  462. - inputs (json) All predecessor node variable content used in the node
  463. - process_data (json) Node process data
  464. - outputs (json) `optional` Node output variables
  465. - status (string) Execution status, `running` / `succeeded` / `failed`
  466. - error (string) `optional` Error reason
  467. - elapsed_time (float) `optional` Time consumption (s)
  468. - execution_metadata (text) Metadata
  469. - total_tokens (int) `optional` Total tokens used
  470. - total_price (decimal) `optional` Total cost
  471. - currency (string) `optional` Currency, such as USD / RMB
  472. - created_at (timestamp) Run time
  473. - created_by_role (string) Creator role
  474. - `account` Console account
  475. - `end_user` End user
  476. - created_by (uuid) Runner ID
  477. - finished_at (timestamp) End time
  478. """
  479. __tablename__ = "workflow_node_executions"
  480. __table_args__ = (
  481. db.PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
  482. db.Index(
  483. "workflow_node_execution_workflow_run_idx",
  484. "tenant_id",
  485. "app_id",
  486. "workflow_id",
  487. "triggered_from",
  488. "workflow_run_id",
  489. ),
  490. db.Index(
  491. "workflow_node_execution_node_run_idx", "tenant_id", "app_id", "workflow_id", "triggered_from", "node_id"
  492. ),
  493. db.Index(
  494. "workflow_node_execution_id_idx",
  495. "tenant_id",
  496. "app_id",
  497. "workflow_id",
  498. "triggered_from",
  499. "node_execution_id",
  500. ),
  501. )
  502. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  503. tenant_id: Mapped[str] = mapped_column(StringUUID)
  504. app_id: Mapped[str] = mapped_column(StringUUID)
  505. workflow_id: Mapped[str] = mapped_column(StringUUID)
  506. triggered_from: Mapped[str] = mapped_column(db.String(255))
  507. workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID)
  508. index: Mapped[int] = mapped_column(db.Integer)
  509. predecessor_node_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  510. node_execution_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  511. node_id: Mapped[str] = mapped_column(db.String(255))
  512. node_type: Mapped[str] = mapped_column(db.String(255))
  513. title: Mapped[str] = mapped_column(db.String(255))
  514. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  515. process_data: Mapped[Optional[str]] = mapped_column(db.Text)
  516. outputs: Mapped[Optional[str]] = mapped_column(db.Text)
  517. status: Mapped[str] = mapped_column(db.String(255))
  518. error: Mapped[Optional[str]] = mapped_column(db.Text)
  519. elapsed_time: Mapped[float] = mapped_column(db.Float, server_default=db.text("0"))
  520. execution_metadata: Mapped[Optional[str]] = mapped_column(db.Text)
  521. created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp())
  522. created_by_role: Mapped[str] = mapped_column(db.String(255))
  523. created_by: Mapped[str] = mapped_column(StringUUID)
  524. finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
  525. @property
  526. def created_by_account(self):
  527. created_by_role = CreatorUserRole(self.created_by_role)
  528. # TODO(-LAN-): Avoid using db.session.get() here.
  529. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  530. @property
  531. def created_by_end_user(self):
  532. from models.model import EndUser
  533. created_by_role = CreatorUserRole(self.created_by_role)
  534. # TODO(-LAN-): Avoid using db.session.get() here.
  535. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  536. @property
  537. def inputs_dict(self):
  538. return json.loads(self.inputs) if self.inputs else None
  539. @property
  540. def outputs_dict(self) -> dict[str, Any] | None:
  541. return json.loads(self.outputs) if self.outputs else None
  542. @property
  543. def process_data_dict(self):
  544. return json.loads(self.process_data) if self.process_data else None
  545. @property
  546. def execution_metadata_dict(self) -> dict[str, Any] | None:
  547. return json.loads(self.execution_metadata) if self.execution_metadata else None
  548. @property
  549. def extras(self):
  550. from core.tools.tool_manager import ToolManager
  551. extras = {}
  552. if self.execution_metadata_dict:
  553. from core.workflow.nodes import NodeType
  554. if self.node_type == NodeType.TOOL.value and "tool_info" in self.execution_metadata_dict:
  555. tool_info = self.execution_metadata_dict["tool_info"]
  556. extras["icon"] = ToolManager.get_tool_icon(
  557. tenant_id=self.tenant_id,
  558. provider_type=tool_info["provider_type"],
  559. provider_id=tool_info["provider_id"],
  560. )
  561. return extras
  562. class WorkflowAppLogCreatedFrom(Enum):
  563. """
  564. Workflow App Log Created From Enum
  565. """
  566. SERVICE_API = "service-api"
  567. WEB_APP = "web-app"
  568. INSTALLED_APP = "installed-app"
  569. @classmethod
  570. def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":
  571. """
  572. Get value of given mode.
  573. :param value: mode value
  574. :return: mode
  575. """
  576. for mode in cls:
  577. if mode.value == value:
  578. return mode
  579. raise ValueError(f"invalid workflow app log created from value {value}")
  580. class WorkflowAppLog(Base):
  581. """
  582. Workflow App execution log, excluding workflow debugging records.
  583. Attributes:
  584. - id (uuid) run ID
  585. - tenant_id (uuid) Workspace ID
  586. - app_id (uuid) App ID
  587. - workflow_id (uuid) Associated Workflow ID
  588. - workflow_run_id (uuid) Associated Workflow Run ID
  589. - created_from (string) Creation source
  590. `service-api` App Execution OpenAPI
  591. `web-app` WebApp
  592. `installed-app` Installed App
  593. - created_by_role (string) Creator role
  594. - `account` Console account
  595. - `end_user` End user
  596. - created_by (uuid) Creator ID, depends on the user table according to created_by_role
  597. - created_at (timestamp) Creation time
  598. """
  599. __tablename__ = "workflow_app_logs"
  600. __table_args__ = (
  601. db.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"),
  602. db.Index("workflow_app_log_app_idx", "tenant_id", "app_id"),
  603. )
  604. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  605. tenant_id: Mapped[str] = mapped_column(StringUUID)
  606. app_id: Mapped[str] = mapped_column(StringUUID)
  607. workflow_id = db.Column(StringUUID, nullable=False)
  608. workflow_run_id: Mapped[str] = mapped_column(StringUUID)
  609. created_from = db.Column(db.String(255), nullable=False)
  610. created_by_role = db.Column(db.String(255), nullable=False)
  611. created_by = db.Column(StringUUID, nullable=False)
  612. created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  613. @property
  614. def workflow_run(self):
  615. return db.session.get(WorkflowRun, self.workflow_run_id)
  616. @property
  617. def created_by_account(self):
  618. created_by_role = CreatorUserRole(self.created_by_role)
  619. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  620. @property
  621. def created_by_end_user(self):
  622. from models.model import EndUser
  623. created_by_role = CreatorUserRole(self.created_by_role)
  624. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  625. class ConversationVariable(Base):
  626. __tablename__ = "workflow_conversation_variables"
  627. id: Mapped[str] = mapped_column(StringUUID, primary_key=True)
  628. conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False, primary_key=True, index=True)
  629. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False, index=True)
  630. data = mapped_column(db.Text, nullable=False)
  631. created_at = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp(), index=True)
  632. updated_at = mapped_column(
  633. db.DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
  634. )
  635. def __init__(self, *, id: str, app_id: str, conversation_id: str, data: str) -> None:
  636. self.id = id
  637. self.app_id = app_id
  638. self.conversation_id = conversation_id
  639. self.data = data
  640. @classmethod
  641. def from_variable(cls, *, app_id: str, conversation_id: str, variable: Variable) -> "ConversationVariable":
  642. obj = cls(
  643. id=variable.id,
  644. app_id=app_id,
  645. conversation_id=conversation_id,
  646. data=variable.model_dump_json(),
  647. )
  648. return obj
  649. def to_variable(self) -> Variable:
  650. mapping = json.loads(self.data)
  651. return variable_factory.build_conversation_variable_from_mapping(mapping)
  652. # Only `sys.query` and `sys.files` could be modified.
  653. _EDITABLE_SYSTEM_VARIABLE = frozenset(["query", "files"])
  654. def _naive_utc_datetime():
  655. return datetime.now(UTC).replace(tzinfo=None)
  656. class WorkflowDraftVariable(Base):
  657. @staticmethod
  658. def unique_columns() -> list[str]:
  659. return [
  660. "app_id",
  661. "node_id",
  662. "name",
  663. ]
  664. __tablename__ = "workflow_draft_variables"
  665. __table_args__ = (UniqueConstraint(*unique_columns()),)
  666. # id is the unique identifier of a draft variable.
  667. id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=db.text("uuid_generate_v4()"))
  668. created_at = mapped_column(
  669. db.DateTime,
  670. nullable=False,
  671. default=_naive_utc_datetime,
  672. server_default=func.current_timestamp(),
  673. )
  674. updated_at = mapped_column(
  675. db.DateTime,
  676. nullable=False,
  677. default=_naive_utc_datetime,
  678. server_default=func.current_timestamp(),
  679. onupdate=func.current_timestamp(),
  680. )
  681. # "`app_id` maps to the `id` field in the `model.App` model."
  682. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  683. # `last_edited_at` records when the value of a given draft variable
  684. # is edited.
  685. #
  686. # If it's not edited after creation, its value is `None`.
  687. last_edited_at: Mapped[datetime | None] = mapped_column(
  688. db.DateTime,
  689. nullable=True,
  690. default=None,
  691. )
  692. # The `node_id` field is special.
  693. #
  694. # If the variable is a conversation variable or a system variable, then the value of `node_id`
  695. # is `conversation` or `sys`, respective.
  696. #
  697. # Otherwise, if the variable is a variable belonging to a specific node, the value of `_node_id` is
  698. # the identity of correspond node in graph definition. An example of node id is `"1745769620734"`.
  699. #
  700. # However, there's one caveat. The id of the first "Answer" node in chatflow is "answer". (Other
  701. # "Answer" node conform the rules above.)
  702. node_id: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="node_id")
  703. # From `VARIABLE_PATTERN`, we may conclude that the length of a top level variable is less than
  704. # 80 chars.
  705. #
  706. # ref: api/core/workflow/entities/variable_pool.py:18
  707. name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
  708. description: Mapped[str] = mapped_column(
  709. sa.String(255),
  710. default="",
  711. nullable=False,
  712. )
  713. selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector")
  714. value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20))
  715. # JSON string
  716. value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value")
  717. # visible
  718. visible: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=True)
  719. editable: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=False)
  720. def get_selector(self) -> list[str]:
  721. selector = json.loads(self.selector)
  722. if not isinstance(selector, list):
  723. _logger.error(
  724. "invalid selector loaded from database, type=%s, value=%s",
  725. type(selector),
  726. self.selector,
  727. )
  728. raise ValueError("invalid selector.")
  729. return selector
  730. def _set_selector(self, value: list[str]):
  731. self.selector = json.dumps(value)
  732. def get_value(self) -> Segment | None:
  733. return build_segment(json.loads(self.value))
  734. def set_name(self, name: str):
  735. self.name = name
  736. self._set_selector([self.node_id, name])
  737. def set_value(self, value: Segment):
  738. self.value = json.dumps(value.value)
  739. self.value_type = value.value_type
  740. def get_node_id(self) -> str | None:
  741. if self.get_variable_type() == DraftVariableType.NODE:
  742. return self.node_id
  743. else:
  744. return None
  745. def get_variable_type(self) -> DraftVariableType:
  746. match self.node_id:
  747. case DraftVariableType.CONVERSATION:
  748. return DraftVariableType.CONVERSATION
  749. case DraftVariableType.SYS:
  750. return DraftVariableType.SYS
  751. case _:
  752. return DraftVariableType.NODE
  753. @classmethod
  754. def _new(
  755. cls,
  756. *,
  757. app_id: str,
  758. node_id: str,
  759. name: str,
  760. value: Segment,
  761. description: str = "",
  762. ) -> "WorkflowDraftVariable":
  763. variable = WorkflowDraftVariable()
  764. variable.created_at = _naive_utc_datetime()
  765. variable.updated_at = _naive_utc_datetime()
  766. variable.description = description
  767. variable.app_id = app_id
  768. variable.node_id = node_id
  769. variable.name = name
  770. variable.set_value(value)
  771. variable._set_selector(list(variable_utils.to_selector(node_id, name)))
  772. return variable
  773. @classmethod
  774. def new_conversation_variable(
  775. cls,
  776. *,
  777. app_id: str,
  778. name: str,
  779. value: Segment,
  780. ) -> "WorkflowDraftVariable":
  781. variable = cls._new(
  782. app_id=app_id,
  783. node_id=CONVERSATION_VARIABLE_NODE_ID,
  784. name=name,
  785. value=value,
  786. )
  787. return variable
  788. @classmethod
  789. def new_sys_variable(
  790. cls,
  791. *,
  792. app_id: str,
  793. name: str,
  794. value: Segment,
  795. editable: bool = False,
  796. ) -> "WorkflowDraftVariable":
  797. variable = cls._new(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, value=value)
  798. variable.editable = editable
  799. return variable
  800. @classmethod
  801. def new_node_variable(
  802. cls,
  803. *,
  804. app_id: str,
  805. node_id: str,
  806. name: str,
  807. value: Segment,
  808. visible: bool = True,
  809. ) -> "WorkflowDraftVariable":
  810. variable = cls._new(app_id=app_id, node_id=node_id, name=name, value=value)
  811. variable.visible = visible
  812. variable.editable = True
  813. return variable
  814. @property
  815. def edited(self):
  816. return self.last_edited_at is not None
  817. def is_system_variable_editable(name: str) -> bool:
  818. return name in _EDITABLE_SYSTEM_VARIABLE