Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

workflow.py 37KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032
  1. import json
  2. import logging
  3. from collections.abc import Mapping, Sequence
  4. from datetime import UTC, datetime
  5. from enum import Enum, StrEnum
  6. from typing import TYPE_CHECKING, Any, Optional, Union
  7. from uuid import uuid4
  8. from flask_login import current_user
  9. from core.variables import utils as variable_utils
  10. from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
  11. from factories.variable_factory import build_segment
  12. if TYPE_CHECKING:
  13. from models.model import AppMode
  14. import sqlalchemy as sa
  15. from sqlalchemy import UniqueConstraint, func
  16. from sqlalchemy.orm import Mapped, mapped_column
  17. from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE
  18. from core.helper import encrypter
  19. from core.variables import SecretVariable, Segment, SegmentType, Variable
  20. from factories import variable_factory
  21. from libs import helper
  22. from .account import Account
  23. from .base import Base
  24. from .engine import db
  25. from .enums import CreatorUserRole, DraftVariableType
  26. from .types import EnumText, StringUUID
  27. _logger = logging.getLogger(__name__)
  28. if TYPE_CHECKING:
  29. from models.model import AppMode
  30. class WorkflowType(Enum):
  31. """
  32. Workflow Type Enum
  33. """
  34. WORKFLOW = "workflow"
  35. CHAT = "chat"
  36. RAG_PIPELINE = "rag-pipeline"
  37. @classmethod
  38. def value_of(cls, value: str) -> "WorkflowType":
  39. """
  40. Get value of given mode.
  41. :param value: mode value
  42. :return: mode
  43. """
  44. for mode in cls:
  45. if mode.value == value:
  46. return mode
  47. raise ValueError(f"invalid workflow type value {value}")
  48. @classmethod
  49. def from_app_mode(cls, app_mode: Union[str, "AppMode"]) -> "WorkflowType":
  50. """
  51. Get workflow type from app mode.
  52. :param app_mode: app mode
  53. :return: workflow type
  54. """
  55. from models.model import AppMode
  56. app_mode = app_mode if isinstance(app_mode, AppMode) else AppMode.value_of(app_mode)
  57. return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
  58. class Workflow(Base):
  59. """
  60. Workflow, for `Workflow App` and `Chat App workflow mode`.
  61. Attributes:
  62. - id (uuid) Workflow ID, pk
  63. - tenant_id (uuid) Workspace ID
  64. - app_id (uuid) App ID
  65. - type (string) Workflow type
  66. `workflow` for `Workflow App`
  67. `chat` for `Chat App workflow mode`
  68. - version (string) Version
  69. `draft` for draft version (only one for each app), other for version number (redundant)
  70. - graph (text) Workflow canvas configuration (JSON)
  71. The entire canvas configuration JSON, including Node, Edge, and other configurations
  72. - nodes (array[object]) Node list, see Node Schema
  73. - edges (array[object]) Edge list, see Edge Schema
  74. - created_by (uuid) Creator ID
  75. - created_at (timestamp) Creation time
  76. - updated_by (uuid) `optional` Last updater ID
  77. - updated_at (timestamp) `optional` Last update time
  78. """
  79. __tablename__ = "workflows"
  80. __table_args__ = (
  81. db.PrimaryKeyConstraint("id", name="workflow_pkey"),
  82. db.Index("workflow_version_idx", "tenant_id", "app_id", "version"),
  83. )
  84. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  85. tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  86. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  87. type: Mapped[str] = mapped_column(db.String(255), nullable=False)
  88. version: Mapped[str] = mapped_column(db.String(255), nullable=False)
  89. marked_name: Mapped[str] = mapped_column(default="", server_default="")
  90. marked_comment: Mapped[str] = mapped_column(default="", server_default="")
  91. graph: Mapped[str] = mapped_column(sa.Text)
  92. _features: Mapped[str] = mapped_column("features", sa.TEXT)
  93. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  94. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  95. updated_by: Mapped[Optional[str]] = mapped_column(StringUUID)
  96. updated_at: Mapped[datetime] = mapped_column(
  97. db.DateTime,
  98. nullable=False,
  99. default=datetime.now(UTC).replace(tzinfo=None),
  100. server_onupdate=func.current_timestamp(),
  101. )
  102. _environment_variables: Mapped[str] = mapped_column(
  103. "environment_variables", db.Text, nullable=False, server_default="{}"
  104. )
  105. _conversation_variables: Mapped[str] = mapped_column(
  106. "conversation_variables", db.Text, nullable=False, server_default="{}"
  107. )
  108. _rag_pipeline_variables: Mapped[str] = mapped_column(
  109. "rag_pipeline_variables", db.Text, nullable=False, server_default="{}"
  110. )
  111. @classmethod
  112. def new(
  113. cls,
  114. *,
  115. tenant_id: str,
  116. app_id: str,
  117. type: str,
  118. version: str,
  119. graph: str,
  120. features: str,
  121. created_by: str,
  122. environment_variables: Sequence[Variable],
  123. conversation_variables: Sequence[Variable],
  124. rag_pipeline_variables: list[dict],
  125. marked_name: str = "",
  126. marked_comment: str = "",
  127. ) -> "Workflow":
  128. workflow = Workflow()
  129. workflow.id = str(uuid4())
  130. workflow.tenant_id = tenant_id
  131. workflow.app_id = app_id
  132. workflow.type = type
  133. workflow.version = version
  134. workflow.graph = graph
  135. workflow.features = features
  136. workflow.created_by = created_by
  137. workflow.environment_variables = environment_variables or []
  138. workflow.conversation_variables = conversation_variables or []
  139. workflow.rag_pipeline_variables = rag_pipeline_variables or []
  140. workflow.marked_name = marked_name
  141. workflow.marked_comment = marked_comment
  142. workflow.created_at = datetime.now(UTC).replace(tzinfo=None)
  143. workflow.updated_at = workflow.created_at
  144. return workflow
  145. @property
  146. def created_by_account(self):
  147. return db.session.get(Account, self.created_by)
  148. @property
  149. def updated_by_account(self):
  150. return db.session.get(Account, self.updated_by) if self.updated_by else None
  151. @property
  152. def graph_dict(self) -> Mapping[str, Any]:
  153. return json.loads(self.graph) if self.graph else {}
  154. @property
  155. def features(self) -> str:
  156. """
  157. Convert old features structure to new features structure.
  158. """
  159. if not self._features:
  160. return self._features
  161. features = json.loads(self._features)
  162. if features.get("file_upload", {}).get("image", {}).get("enabled", False):
  163. image_enabled = True
  164. image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
  165. image_transfer_methods = features["file_upload"]["image"].get(
  166. "transfer_methods", ["remote_url", "local_file"]
  167. )
  168. features["file_upload"]["enabled"] = image_enabled
  169. features["file_upload"]["number_limits"] = image_number_limits
  170. features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
  171. features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
  172. features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
  173. "allowed_file_extensions", []
  174. )
  175. del features["file_upload"]["image"]
  176. self._features = json.dumps(features)
  177. return self._features
  178. @features.setter
  179. def features(self, value: str) -> None:
  180. self._features = value
  181. @property
  182. def features_dict(self) -> dict[str, Any]:
  183. return json.loads(self.features) if self.features else {}
  184. def user_input_form(self, to_old_structure: bool = False) -> list:
  185. # get start node from graph
  186. if not self.graph:
  187. return []
  188. graph_dict = self.graph_dict
  189. if "nodes" not in graph_dict:
  190. return []
  191. start_node = next((node for node in graph_dict["nodes"] if node["data"]["type"] == "start"), None)
  192. if not start_node:
  193. return []
  194. # get user_input_form from start node
  195. variables: list[Any] = start_node.get("data", {}).get("variables", [])
  196. if to_old_structure:
  197. old_structure_variables = []
  198. for variable in variables:
  199. old_structure_variables.append({variable["type"]: variable})
  200. return old_structure_variables
  201. return variables
  202. @property
  203. def unique_hash(self) -> str:
  204. """
  205. Get hash of workflow.
  206. :return: hash
  207. """
  208. entity = {"graph": self.graph_dict, "features": self.features_dict}
  209. return helper.generate_text_hash(json.dumps(entity, sort_keys=True))
  210. @property
  211. def tool_published(self) -> bool:
  212. """
  213. DEPRECATED: This property is not accurate for determining if a workflow is published as a tool.
  214. It only checks if there's a WorkflowToolProvider for the app, not if this specific workflow version
  215. is the one being used by the tool.
  216. For accurate checking, use a direct query with tenant_id, app_id, and version.
  217. """
  218. from models.tools import WorkflowToolProvider
  219. return (
  220. db.session.query(WorkflowToolProvider)
  221. .filter(WorkflowToolProvider.tenant_id == self.tenant_id, WorkflowToolProvider.app_id == self.app_id)
  222. .count()
  223. > 0
  224. )
  225. @property
  226. def environment_variables(self) -> Sequence[Variable]:
  227. # TODO: find some way to init `self._environment_variables` when instance created.
  228. if self._environment_variables is None:
  229. self._environment_variables = "{}"
  230. # Get tenant_id from current_user (Account or EndUser)
  231. if isinstance(current_user, Account):
  232. # Account user
  233. tenant_id = current_user.current_tenant_id
  234. else:
  235. # EndUser
  236. tenant_id = current_user.tenant_id
  237. if not tenant_id:
  238. return []
  239. environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables)
  240. results = [
  241. variable_factory.build_environment_variable_from_mapping(v) for v in environment_variables_dict.values()
  242. ]
  243. # decrypt secret variables value
  244. def decrypt_func(var):
  245. if isinstance(var, SecretVariable):
  246. return var.model_copy(update={"value": encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)})
  247. else:
  248. return var
  249. results = list(map(decrypt_func, results))
  250. return results
  251. @environment_variables.setter
  252. def environment_variables(self, value: Sequence[Variable]):
  253. if not value:
  254. self._environment_variables = "{}"
  255. return
  256. # Get tenant_id from current_user (Account or EndUser)
  257. if isinstance(current_user, Account):
  258. # Account user
  259. tenant_id = current_user.current_tenant_id
  260. else:
  261. # EndUser
  262. tenant_id = current_user.tenant_id
  263. if not tenant_id:
  264. self._environment_variables = "{}"
  265. return
  266. value = list(value)
  267. if any(var for var in value if not var.id):
  268. raise ValueError("environment variable require a unique id")
  269. # Compare inputs and origin variables,
  270. # if the value is HIDDEN_VALUE, use the origin variable value (only update `name`).
  271. origin_variables_dictionary = {var.id: var for var in self.environment_variables}
  272. for i, variable in enumerate(value):
  273. if variable.id in origin_variables_dictionary and variable.value == HIDDEN_VALUE:
  274. value[i] = origin_variables_dictionary[variable.id].model_copy(update={"name": variable.name})
  275. # encrypt secret variables value
  276. def encrypt_func(var):
  277. if isinstance(var, SecretVariable):
  278. return var.model_copy(update={"value": encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)})
  279. else:
  280. return var
  281. encrypted_vars = list(map(encrypt_func, value))
  282. environment_variables_json = json.dumps(
  283. {var.name: var.model_dump() for var in encrypted_vars},
  284. ensure_ascii=False,
  285. )
  286. self._environment_variables = environment_variables_json
  287. def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
  288. environment_variables = list(self.environment_variables)
  289. environment_variables = [
  290. v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={"value": ""})
  291. for v in environment_variables
  292. ]
  293. result = {
  294. "graph": self.graph_dict,
  295. "features": self.features_dict,
  296. "environment_variables": [var.model_dump(mode="json") for var in environment_variables],
  297. "conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
  298. "rag_pipeline_variables": self.rag_pipeline_variables,
  299. }
  300. return result
  301. @property
  302. def conversation_variables(self) -> Sequence[Variable]:
  303. # TODO: find some way to init `self._conversation_variables` when instance created.
  304. if self._conversation_variables is None:
  305. self._conversation_variables = "{}"
  306. variables_dict: dict[str, Any] = json.loads(self._conversation_variables)
  307. results = [variable_factory.build_conversation_variable_from_mapping(v) for v in variables_dict.values()]
  308. return results
  309. @conversation_variables.setter
  310. def conversation_variables(self, value: Sequence[Variable]) -> None:
  311. self._conversation_variables = json.dumps(
  312. {var.name: var.model_dump() for var in value},
  313. ensure_ascii=False,
  314. )
  315. @property
  316. def rag_pipeline_variables(self) -> list[dict]:
  317. # TODO: find some way to init `self._conversation_variables` when instance created.
  318. if self._rag_pipeline_variables is None:
  319. self._rag_pipeline_variables = "{}"
  320. variables_dict: dict[str, Any] = json.loads(self._rag_pipeline_variables)
  321. results = list(variables_dict.values())
  322. return results
  323. @rag_pipeline_variables.setter
  324. def rag_pipeline_variables(self, values: list[dict]) -> None:
  325. self._rag_pipeline_variables = json.dumps(
  326. {item["variable"]: item for item in values},
  327. ensure_ascii=False,
  328. )
  329. class WorkflowRun(Base):
  330. """
  331. Workflow Run
  332. Attributes:
  333. - id (uuid) Run ID
  334. - tenant_id (uuid) Workspace ID
  335. - app_id (uuid) App ID
  336. - sequence_number (int) Auto-increment sequence number, incremented within the App, starting from 1
  337. - workflow_id (uuid) Workflow ID
  338. - type (string) Workflow type
  339. - triggered_from (string) Trigger source
  340. `debugging` for canvas debugging
  341. `app-run` for (published) app execution
  342. - version (string) Version
  343. - graph (text) Workflow canvas configuration (JSON)
  344. - inputs (text) Input parameters
  345. - status (string) Execution status, `running` / `succeeded` / `failed` / `stopped`
  346. - outputs (text) `optional` Output content
  347. - error (string) `optional` Error reason
  348. - elapsed_time (float) `optional` Time consumption (s)
  349. - total_tokens (int) `optional` Total tokens used
  350. - total_steps (int) Total steps (redundant), default 0
  351. - created_by_role (string) Creator role
  352. - `account` Console account
  353. - `end_user` End user
  354. - created_by (uuid) Runner ID
  355. - created_at (timestamp) Run time
  356. - finished_at (timestamp) End time
  357. """
  358. __tablename__ = "workflow_runs"
  359. __table_args__ = (
  360. db.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
  361. db.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
  362. db.Index("workflow_run_tenant_app_sequence_idx", "tenant_id", "app_id", "sequence_number"),
  363. )
  364. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  365. tenant_id: Mapped[str] = mapped_column(StringUUID)
  366. app_id: Mapped[str] = mapped_column(StringUUID)
  367. sequence_number: Mapped[int] = mapped_column()
  368. workflow_id: Mapped[str] = mapped_column(StringUUID)
  369. type: Mapped[str] = mapped_column(db.String(255))
  370. triggered_from: Mapped[str] = mapped_column(db.String(255))
  371. version: Mapped[str] = mapped_column(db.String(255))
  372. graph: Mapped[Optional[str]] = mapped_column(db.Text)
  373. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  374. status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded
  375. outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}")
  376. error: Mapped[Optional[str]] = mapped_column(db.Text)
  377. elapsed_time: Mapped[float] = mapped_column(db.Float, nullable=False, server_default=sa.text("0"))
  378. total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0"))
  379. total_steps: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True)
  380. created_by_role: Mapped[str] = mapped_column(db.String(255)) # account, end_user
  381. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  382. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  383. finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
  384. exceptions_count: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True)
  385. @property
  386. def created_by_account(self):
  387. created_by_role = CreatorUserRole(self.created_by_role)
  388. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  389. @property
  390. def created_by_end_user(self):
  391. from models.model import EndUser
  392. created_by_role = CreatorUserRole(self.created_by_role)
  393. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  394. @property
  395. def graph_dict(self) -> Mapping[str, Any]:
  396. return json.loads(self.graph) if self.graph else {}
  397. @property
  398. def inputs_dict(self) -> Mapping[str, Any]:
  399. return json.loads(self.inputs) if self.inputs else {}
  400. @property
  401. def outputs_dict(self) -> Mapping[str, Any]:
  402. return json.loads(self.outputs) if self.outputs else {}
  403. @property
  404. def message(self):
  405. from models.model import Message
  406. return (
  407. db.session.query(Message).filter(Message.app_id == self.app_id, Message.workflow_run_id == self.id).first()
  408. )
  409. @property
  410. def workflow(self):
  411. return db.session.query(Workflow).filter(Workflow.id == self.workflow_id).first()
  412. def to_dict(self):
  413. return {
  414. "id": self.id,
  415. "tenant_id": self.tenant_id,
  416. "app_id": self.app_id,
  417. "sequence_number": self.sequence_number,
  418. "workflow_id": self.workflow_id,
  419. "type": self.type,
  420. "triggered_from": self.triggered_from,
  421. "version": self.version,
  422. "graph": self.graph_dict,
  423. "inputs": self.inputs_dict,
  424. "status": self.status,
  425. "outputs": self.outputs_dict,
  426. "error": self.error,
  427. "elapsed_time": self.elapsed_time,
  428. "total_tokens": self.total_tokens,
  429. "total_steps": self.total_steps,
  430. "created_by_role": self.created_by_role,
  431. "created_by": self.created_by,
  432. "created_at": self.created_at,
  433. "finished_at": self.finished_at,
  434. "exceptions_count": self.exceptions_count,
  435. }
  436. @classmethod
  437. def from_dict(cls, data: dict) -> "WorkflowRun":
  438. return cls(
  439. id=data.get("id"),
  440. tenant_id=data.get("tenant_id"),
  441. app_id=data.get("app_id"),
  442. sequence_number=data.get("sequence_number"),
  443. workflow_id=data.get("workflow_id"),
  444. type=data.get("type"),
  445. triggered_from=data.get("triggered_from"),
  446. version=data.get("version"),
  447. graph=json.dumps(data.get("graph")),
  448. inputs=json.dumps(data.get("inputs")),
  449. status=data.get("status"),
  450. outputs=json.dumps(data.get("outputs")),
  451. error=data.get("error"),
  452. elapsed_time=data.get("elapsed_time"),
  453. total_tokens=data.get("total_tokens"),
  454. total_steps=data.get("total_steps"),
  455. created_by_role=data.get("created_by_role"),
  456. created_by=data.get("created_by"),
  457. created_at=data.get("created_at"),
  458. finished_at=data.get("finished_at"),
  459. exceptions_count=data.get("exceptions_count"),
  460. )
  461. class WorkflowNodeExecutionTriggeredFrom(StrEnum):
  462. """
  463. Workflow Node Execution Triggered From Enum
  464. """
  465. SINGLE_STEP = "single-step"
  466. WORKFLOW_RUN = "workflow-run"
  467. RAG_PIPELINE_RUN = "rag-pipeline-run"
  468. class WorkflowNodeExecutionModel(Base):
  469. """
  470. Workflow Node Execution
  471. - id (uuid) Execution ID
  472. - tenant_id (uuid) Workspace ID
  473. - app_id (uuid) App ID
  474. - workflow_id (uuid) Workflow ID
  475. - triggered_from (string) Trigger source
  476. `single-step` for single-step debugging
  477. `workflow-run` for workflow execution (debugging / user execution)
  478. - workflow_run_id (uuid) `optional` Workflow run ID
  479. Null for single-step debugging.
  480. - index (int) Execution sequence number, used for displaying Tracing Node order
  481. - predecessor_node_id (string) `optional` Predecessor node ID, used for displaying execution path
  482. - node_id (string) Node ID
  483. - node_type (string) Node type, such as `start`
  484. - title (string) Node title
  485. - inputs (json) All predecessor node variable content used in the node
  486. - process_data (json) Node process data
  487. - outputs (json) `optional` Node output variables
  488. - status (string) Execution status, `running` / `succeeded` / `failed`
  489. - error (string) `optional` Error reason
  490. - elapsed_time (float) `optional` Time consumption (s)
  491. - execution_metadata (text) Metadata
  492. - total_tokens (int) `optional` Total tokens used
  493. - total_price (decimal) `optional` Total cost
  494. - currency (string) `optional` Currency, such as USD / RMB
  495. - created_at (timestamp) Run time
  496. - created_by_role (string) Creator role
  497. - `account` Console account
  498. - `end_user` End user
  499. - created_by (uuid) Runner ID
  500. - finished_at (timestamp) End time
  501. """
  502. __tablename__ = "workflow_node_executions"
  503. __table_args__ = (
  504. db.PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
  505. db.Index(
  506. "workflow_node_execution_workflow_run_idx",
  507. "tenant_id",
  508. "app_id",
  509. "workflow_id",
  510. "triggered_from",
  511. "workflow_run_id",
  512. ),
  513. db.Index(
  514. "workflow_node_execution_node_run_idx", "tenant_id", "app_id", "workflow_id", "triggered_from", "node_id"
  515. ),
  516. db.Index(
  517. "workflow_node_execution_id_idx",
  518. "tenant_id",
  519. "app_id",
  520. "workflow_id",
  521. "triggered_from",
  522. "node_execution_id",
  523. ),
  524. )
  525. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  526. tenant_id: Mapped[str] = mapped_column(StringUUID)
  527. app_id: Mapped[str] = mapped_column(StringUUID)
  528. workflow_id: Mapped[str] = mapped_column(StringUUID)
  529. triggered_from: Mapped[str] = mapped_column(db.String(255))
  530. workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID)
  531. index: Mapped[int] = mapped_column(db.Integer)
  532. predecessor_node_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  533. node_execution_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  534. node_id: Mapped[str] = mapped_column(db.String(255))
  535. node_type: Mapped[str] = mapped_column(db.String(255))
  536. title: Mapped[str] = mapped_column(db.String(255))
  537. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  538. process_data: Mapped[Optional[str]] = mapped_column(db.Text)
  539. outputs: Mapped[Optional[str]] = mapped_column(db.Text)
  540. status: Mapped[str] = mapped_column(db.String(255))
  541. error: Mapped[Optional[str]] = mapped_column(db.Text)
  542. elapsed_time: Mapped[float] = mapped_column(db.Float, server_default=db.text("0"))
  543. execution_metadata: Mapped[Optional[str]] = mapped_column(db.Text)
  544. created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp())
  545. created_by_role: Mapped[str] = mapped_column(db.String(255))
  546. created_by: Mapped[str] = mapped_column(StringUUID)
  547. finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
  548. @property
  549. def created_by_account(self):
  550. created_by_role = CreatorUserRole(self.created_by_role)
  551. # TODO(-LAN-): Avoid using db.session.get() here.
  552. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  553. @property
  554. def created_by_end_user(self):
  555. from models.model import EndUser
  556. created_by_role = CreatorUserRole(self.created_by_role)
  557. # TODO(-LAN-): Avoid using db.session.get() here.
  558. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  559. @property
  560. def inputs_dict(self):
  561. return json.loads(self.inputs) if self.inputs else None
  562. @property
  563. def outputs_dict(self) -> dict[str, Any] | None:
  564. return json.loads(self.outputs) if self.outputs else None
  565. @property
  566. def process_data_dict(self):
  567. return json.loads(self.process_data) if self.process_data else None
  568. @property
  569. def execution_metadata_dict(self) -> dict[str, Any]:
  570. # When the metadata is unset, we return an empty dictionary instead of `None`.
  571. # This approach streamlines the logic for the caller, making it easier to handle
  572. # cases where metadata is absent.
  573. return json.loads(self.execution_metadata) if self.execution_metadata else {}
  574. @property
  575. def extras(self):
  576. from core.tools.tool_manager import ToolManager
  577. extras = {}
  578. if self.execution_metadata_dict:
  579. from core.workflow.nodes import NodeType
  580. if self.node_type == NodeType.TOOL.value and "tool_info" in self.execution_metadata_dict:
  581. tool_info = self.execution_metadata_dict["tool_info"]
  582. extras["icon"] = ToolManager.get_tool_icon(
  583. tenant_id=self.tenant_id,
  584. provider_type=tool_info["provider_type"],
  585. provider_id=tool_info["provider_id"],
  586. )
  587. return extras
  588. class WorkflowAppLogCreatedFrom(Enum):
  589. """
  590. Workflow App Log Created From Enum
  591. """
  592. SERVICE_API = "service-api"
  593. WEB_APP = "web-app"
  594. INSTALLED_APP = "installed-app"
  595. @classmethod
  596. def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":
  597. """
  598. Get value of given mode.
  599. :param value: mode value
  600. :return: mode
  601. """
  602. for mode in cls:
  603. if mode.value == value:
  604. return mode
  605. raise ValueError(f"invalid workflow app log created from value {value}")
  606. class WorkflowAppLog(Base):
  607. """
  608. Workflow App execution log, excluding workflow debugging records.
  609. Attributes:
  610. - id (uuid) run ID
  611. - tenant_id (uuid) Workspace ID
  612. - app_id (uuid) App ID
  613. - workflow_id (uuid) Associated Workflow ID
  614. - workflow_run_id (uuid) Associated Workflow Run ID
  615. - created_from (string) Creation source
  616. `service-api` App Execution OpenAPI
  617. `web-app` WebApp
  618. `installed-app` Installed App
  619. - created_by_role (string) Creator role
  620. - `account` Console account
  621. - `end_user` End user
  622. - created_by (uuid) Creator ID, depends on the user table according to created_by_role
  623. - created_at (timestamp) Creation time
  624. """
  625. __tablename__ = "workflow_app_logs"
  626. __table_args__ = (
  627. db.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"),
  628. db.Index("workflow_app_log_app_idx", "tenant_id", "app_id"),
  629. )
  630. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  631. tenant_id: Mapped[str] = mapped_column(StringUUID)
  632. app_id: Mapped[str] = mapped_column(StringUUID)
  633. workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  634. workflow_run_id: Mapped[str] = mapped_column(StringUUID)
  635. created_from: Mapped[str] = mapped_column(db.String(255), nullable=False)
  636. created_by_role: Mapped[str] = mapped_column(db.String(255), nullable=False)
  637. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  638. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  639. @property
  640. def workflow_run(self):
  641. return db.session.get(WorkflowRun, self.workflow_run_id)
  642. @property
  643. def created_by_account(self):
  644. created_by_role = CreatorUserRole(self.created_by_role)
  645. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  646. @property
  647. def created_by_end_user(self):
  648. from models.model import EndUser
  649. created_by_role = CreatorUserRole(self.created_by_role)
  650. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  651. class ConversationVariable(Base):
  652. __tablename__ = "workflow_conversation_variables"
  653. id: Mapped[str] = mapped_column(StringUUID, primary_key=True)
  654. conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False, primary_key=True, index=True)
  655. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False, index=True)
  656. data: Mapped[str] = mapped_column(db.Text, nullable=False)
  657. created_at: Mapped[datetime] = mapped_column(
  658. db.DateTime, nullable=False, server_default=func.current_timestamp(), index=True
  659. )
  660. updated_at: Mapped[datetime] = mapped_column(
  661. db.DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
  662. )
  663. def __init__(self, *, id: str, app_id: str, conversation_id: str, data: str) -> None:
  664. self.id = id
  665. self.app_id = app_id
  666. self.conversation_id = conversation_id
  667. self.data = data
  668. @classmethod
  669. def from_variable(cls, *, app_id: str, conversation_id: str, variable: Variable) -> "ConversationVariable":
  670. obj = cls(
  671. id=variable.id,
  672. app_id=app_id,
  673. conversation_id=conversation_id,
  674. data=variable.model_dump_json(),
  675. )
  676. return obj
  677. def to_variable(self) -> Variable:
  678. mapping = json.loads(self.data)
  679. return variable_factory.build_conversation_variable_from_mapping(mapping)
  680. # Only `sys.query` and `sys.files` could be modified.
  681. _EDITABLE_SYSTEM_VARIABLE = frozenset(["query", "files"])
  682. def _naive_utc_datetime():
  683. return datetime.now(UTC).replace(tzinfo=None)
  684. class WorkflowDraftVariable(Base):
  685. @staticmethod
  686. def unique_columns() -> list[str]:
  687. return [
  688. "app_id",
  689. "node_id",
  690. "name",
  691. ]
  692. __tablename__ = "workflow_draft_variables"
  693. __table_args__ = (UniqueConstraint(*unique_columns()),)
  694. # id is the unique identifier of a draft variable.
  695. id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=db.text("uuid_generate_v4()"))
  696. created_at: Mapped[datetime] = mapped_column(
  697. db.DateTime,
  698. nullable=False,
  699. default=_naive_utc_datetime,
  700. server_default=func.current_timestamp(),
  701. )
  702. updated_at: Mapped[datetime] = mapped_column(
  703. db.DateTime,
  704. nullable=False,
  705. default=_naive_utc_datetime,
  706. server_default=func.current_timestamp(),
  707. onupdate=func.current_timestamp(),
  708. )
  709. # "`app_id` maps to the `id` field in the `model.App` model."
  710. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  711. # `last_edited_at` records when the value of a given draft variable
  712. # is edited.
  713. #
  714. # If it's not edited after creation, its value is `None`.
  715. last_edited_at: Mapped[datetime | None] = mapped_column(
  716. db.DateTime,
  717. nullable=True,
  718. default=None,
  719. )
  720. # The `node_id` field is special.
  721. #
  722. # If the variable is a conversation variable or a system variable, then the value of `node_id`
  723. # is `conversation` or `sys`, respective.
  724. #
  725. # Otherwise, if the variable is a variable belonging to a specific node, the value of `_node_id` is
  726. # the identity of correspond node in graph definition. An example of node id is `"1745769620734"`.
  727. #
  728. # However, there's one caveat. The id of the first "Answer" node in chatflow is "answer". (Other
  729. # "Answer" node conform the rules above.)
  730. node_id: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="node_id")
  731. # From `VARIABLE_PATTERN`, we may conclude that the length of a top level variable is less than
  732. # 80 chars.
  733. #
  734. # ref: api/core/workflow/entities/variable_pool.py:18
  735. name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
  736. description: Mapped[str] = mapped_column(
  737. sa.String(255),
  738. default="",
  739. nullable=False,
  740. )
  741. selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector")
  742. value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20))
  743. # JSON string
  744. value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value")
  745. # visible
  746. visible: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=True)
  747. editable: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=False)
  748. def get_selector(self) -> list[str]:
  749. selector = json.loads(self.selector)
  750. if not isinstance(selector, list):
  751. _logger.error(
  752. "invalid selector loaded from database, type=%s, value=%s",
  753. type(selector),
  754. self.selector,
  755. )
  756. raise ValueError("invalid selector.")
  757. return selector
  758. def _set_selector(self, value: list[str]):
  759. self.selector = json.dumps(value)
  760. def get_value(self) -> Segment | None:
  761. return build_segment(json.loads(self.value))
  762. def set_name(self, name: str):
  763. self.name = name
  764. self._set_selector([self.node_id, name])
  765. def set_value(self, value: Segment):
  766. self.value = json.dumps(value.value)
  767. self.value_type = value.value_type
  768. def get_node_id(self) -> str | None:
  769. if self.get_variable_type() == DraftVariableType.NODE:
  770. return self.node_id
  771. else:
  772. return None
  773. def get_variable_type(self) -> DraftVariableType:
  774. match self.node_id:
  775. case DraftVariableType.CONVERSATION:
  776. return DraftVariableType.CONVERSATION
  777. case DraftVariableType.SYS:
  778. return DraftVariableType.SYS
  779. case _:
  780. return DraftVariableType.NODE
  781. @classmethod
  782. def _new(
  783. cls,
  784. *,
  785. app_id: str,
  786. node_id: str,
  787. name: str,
  788. value: Segment,
  789. description: str = "",
  790. ) -> "WorkflowDraftVariable":
  791. variable = WorkflowDraftVariable()
  792. variable.created_at = _naive_utc_datetime()
  793. variable.updated_at = _naive_utc_datetime()
  794. variable.description = description
  795. variable.app_id = app_id
  796. variable.node_id = node_id
  797. variable.name = name
  798. variable.set_value(value)
  799. variable._set_selector(list(variable_utils.to_selector(node_id, name)))
  800. return variable
  801. @classmethod
  802. def new_conversation_variable(
  803. cls,
  804. *,
  805. app_id: str,
  806. name: str,
  807. value: Segment,
  808. ) -> "WorkflowDraftVariable":
  809. variable = cls._new(
  810. app_id=app_id,
  811. node_id=CONVERSATION_VARIABLE_NODE_ID,
  812. name=name,
  813. value=value,
  814. )
  815. return variable
  816. @classmethod
  817. def new_sys_variable(
  818. cls,
  819. *,
  820. app_id: str,
  821. name: str,
  822. value: Segment,
  823. editable: bool = False,
  824. ) -> "WorkflowDraftVariable":
  825. variable = cls._new(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, value=value)
  826. variable.editable = editable
  827. return variable
  828. @classmethod
  829. def new_node_variable(
  830. cls,
  831. *,
  832. app_id: str,
  833. node_id: str,
  834. name: str,
  835. value: Segment,
  836. visible: bool = True,
  837. ) -> "WorkflowDraftVariable":
  838. variable = cls._new(app_id=app_id, node_id=node_id, name=name, value=value)
  839. variable.visible = visible
  840. variable.editable = True
  841. return variable
  842. @property
  843. def edited(self):
  844. return self.last_edited_at is not None
  845. def is_system_variable_editable(name: str) -> bool:
  846. return name in _EDITABLE_SYSTEM_VARIABLE