Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.

workflow.py 37KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039
  1. import json
  2. import logging
  3. from collections.abc import Mapping, Sequence
  4. from datetime import UTC, datetime
  5. from enum import Enum, StrEnum
  6. from typing import TYPE_CHECKING, Any, Optional, Union
  7. from uuid import uuid4
  8. from flask_login import current_user
  9. from core.variables import utils as variable_utils
  10. from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
  11. from factories.variable_factory import build_segment
  12. if TYPE_CHECKING:
  13. from models.model import AppMode
  14. import sqlalchemy as sa
  15. from sqlalchemy import Index, PrimaryKeyConstraint, UniqueConstraint, func
  16. from sqlalchemy.orm import Mapped, declared_attr, mapped_column
  17. from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE
  18. from core.helper import encrypter
  19. from core.variables import SecretVariable, Segment, SegmentType, Variable
  20. from factories import variable_factory
  21. from libs import helper
  22. from .account import Account
  23. from .base import Base
  24. from .engine import db
  25. from .enums import CreatorUserRole, DraftVariableType
  26. from .types import EnumText, StringUUID
  27. _logger = logging.getLogger(__name__)
  28. if TYPE_CHECKING:
  29. from models.model import AppMode
  30. class WorkflowType(Enum):
  31. """
  32. Workflow Type Enum
  33. """
  34. WORKFLOW = "workflow"
  35. CHAT = "chat"
  36. @classmethod
  37. def value_of(cls, value: str) -> "WorkflowType":
  38. """
  39. Get value of given mode.
  40. :param value: mode value
  41. :return: mode
  42. """
  43. for mode in cls:
  44. if mode.value == value:
  45. return mode
  46. raise ValueError(f"invalid workflow type value {value}")
  47. @classmethod
  48. def from_app_mode(cls, app_mode: Union[str, "AppMode"]) -> "WorkflowType":
  49. """
  50. Get workflow type from app mode.
  51. :param app_mode: app mode
  52. :return: workflow type
  53. """
  54. from models.model import AppMode
  55. app_mode = app_mode if isinstance(app_mode, AppMode) else AppMode.value_of(app_mode)
  56. return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
  57. class Workflow(Base):
  58. """
  59. Workflow, for `Workflow App` and `Chat App workflow mode`.
  60. Attributes:
  61. - id (uuid) Workflow ID, pk
  62. - tenant_id (uuid) Workspace ID
  63. - app_id (uuid) App ID
  64. - type (string) Workflow type
  65. `workflow` for `Workflow App`
  66. `chat` for `Chat App workflow mode`
  67. - version (string) Version
  68. `draft` for draft version (only one for each app), other for version number (redundant)
  69. - graph (text) Workflow canvas configuration (JSON)
  70. The entire canvas configuration JSON, including Node, Edge, and other configurations
  71. - nodes (array[object]) Node list, see Node Schema
  72. - edges (array[object]) Edge list, see Edge Schema
  73. - created_by (uuid) Creator ID
  74. - created_at (timestamp) Creation time
  75. - updated_by (uuid) `optional` Last updater ID
  76. - updated_at (timestamp) `optional` Last update time
  77. """
  78. __tablename__ = "workflows"
  79. __table_args__ = (
  80. db.PrimaryKeyConstraint("id", name="workflow_pkey"),
  81. db.Index("workflow_version_idx", "tenant_id", "app_id", "version"),
  82. )
  83. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  84. tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  85. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  86. type: Mapped[str] = mapped_column(db.String(255), nullable=False)
  87. version: Mapped[str] = mapped_column(db.String(255), nullable=False)
  88. marked_name: Mapped[str] = mapped_column(default="", server_default="")
  89. marked_comment: Mapped[str] = mapped_column(default="", server_default="")
  90. graph: Mapped[str] = mapped_column(sa.Text)
  91. _features: Mapped[str] = mapped_column("features", sa.TEXT)
  92. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  93. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  94. updated_by: Mapped[Optional[str]] = mapped_column(StringUUID)
  95. updated_at: Mapped[datetime] = mapped_column(
  96. db.DateTime,
  97. nullable=False,
  98. default=datetime.now(UTC).replace(tzinfo=None),
  99. server_onupdate=func.current_timestamp(),
  100. )
  101. _environment_variables: Mapped[str] = mapped_column(
  102. "environment_variables", db.Text, nullable=False, server_default="{}"
  103. )
  104. _conversation_variables: Mapped[str] = mapped_column(
  105. "conversation_variables", db.Text, nullable=False, server_default="{}"
  106. )
  107. @classmethod
  108. def new(
  109. cls,
  110. *,
  111. tenant_id: str,
  112. app_id: str,
  113. type: str,
  114. version: str,
  115. graph: str,
  116. features: str,
  117. created_by: str,
  118. environment_variables: Sequence[Variable],
  119. conversation_variables: Sequence[Variable],
  120. marked_name: str = "",
  121. marked_comment: str = "",
  122. ) -> "Workflow":
  123. workflow = Workflow()
  124. workflow.id = str(uuid4())
  125. workflow.tenant_id = tenant_id
  126. workflow.app_id = app_id
  127. workflow.type = type
  128. workflow.version = version
  129. workflow.graph = graph
  130. workflow.features = features
  131. workflow.created_by = created_by
  132. workflow.environment_variables = environment_variables or []
  133. workflow.conversation_variables = conversation_variables or []
  134. workflow.marked_name = marked_name
  135. workflow.marked_comment = marked_comment
  136. workflow.created_at = datetime.now(UTC).replace(tzinfo=None)
  137. workflow.updated_at = workflow.created_at
  138. return workflow
  139. @property
  140. def created_by_account(self):
  141. return db.session.get(Account, self.created_by)
  142. @property
  143. def updated_by_account(self):
  144. return db.session.get(Account, self.updated_by) if self.updated_by else None
  145. @property
  146. def graph_dict(self) -> Mapping[str, Any]:
  147. return json.loads(self.graph) if self.graph else {}
  148. @property
  149. def features(self) -> str:
  150. """
  151. Convert old features structure to new features structure.
  152. """
  153. if not self._features:
  154. return self._features
  155. features = json.loads(self._features)
  156. if features.get("file_upload", {}).get("image", {}).get("enabled", False):
  157. image_enabled = True
  158. image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
  159. image_transfer_methods = features["file_upload"]["image"].get(
  160. "transfer_methods", ["remote_url", "local_file"]
  161. )
  162. features["file_upload"]["enabled"] = image_enabled
  163. features["file_upload"]["number_limits"] = image_number_limits
  164. features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
  165. features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
  166. features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
  167. "allowed_file_extensions", []
  168. )
  169. del features["file_upload"]["image"]
  170. self._features = json.dumps(features)
  171. return self._features
  172. @features.setter
  173. def features(self, value: str) -> None:
  174. self._features = value
  175. @property
  176. def features_dict(self) -> dict[str, Any]:
  177. return json.loads(self.features) if self.features else {}
  178. def user_input_form(self, to_old_structure: bool = False) -> list:
  179. # get start node from graph
  180. if not self.graph:
  181. return []
  182. graph_dict = self.graph_dict
  183. if "nodes" not in graph_dict:
  184. return []
  185. start_node = next((node for node in graph_dict["nodes"] if node["data"]["type"] == "start"), None)
  186. if not start_node:
  187. return []
  188. # get user_input_form from start node
  189. variables: list[Any] = start_node.get("data", {}).get("variables", [])
  190. if to_old_structure:
  191. old_structure_variables = []
  192. for variable in variables:
  193. old_structure_variables.append({variable["type"]: variable})
  194. return old_structure_variables
  195. return variables
  196. @property
  197. def unique_hash(self) -> str:
  198. """
  199. Get hash of workflow.
  200. :return: hash
  201. """
  202. entity = {"graph": self.graph_dict, "features": self.features_dict}
  203. return helper.generate_text_hash(json.dumps(entity, sort_keys=True))
  204. @property
  205. def tool_published(self) -> bool:
  206. """
  207. DEPRECATED: This property is not accurate for determining if a workflow is published as a tool.
  208. It only checks if there's a WorkflowToolProvider for the app, not if this specific workflow version
  209. is the one being used by the tool.
  210. For accurate checking, use a direct query with tenant_id, app_id, and version.
  211. """
  212. from models.tools import WorkflowToolProvider
  213. return (
  214. db.session.query(WorkflowToolProvider)
  215. .filter(WorkflowToolProvider.tenant_id == self.tenant_id, WorkflowToolProvider.app_id == self.app_id)
  216. .count()
  217. > 0
  218. )
  219. @property
  220. def environment_variables(self) -> Sequence[Variable]:
  221. # TODO: find some way to init `self._environment_variables` when instance created.
  222. if self._environment_variables is None:
  223. self._environment_variables = "{}"
  224. # Get tenant_id from current_user (Account or EndUser)
  225. if isinstance(current_user, Account):
  226. # Account user
  227. tenant_id = current_user.current_tenant_id
  228. else:
  229. # EndUser
  230. tenant_id = current_user.tenant_id
  231. if not tenant_id:
  232. return []
  233. environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables)
  234. results = [
  235. variable_factory.build_environment_variable_from_mapping(v) for v in environment_variables_dict.values()
  236. ]
  237. # decrypt secret variables value
  238. def decrypt_func(var):
  239. if isinstance(var, SecretVariable):
  240. return var.model_copy(update={"value": encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)})
  241. else:
  242. return var
  243. results = list(map(decrypt_func, results))
  244. return results
  245. @environment_variables.setter
  246. def environment_variables(self, value: Sequence[Variable]):
  247. if not value:
  248. self._environment_variables = "{}"
  249. return
  250. # Get tenant_id from current_user (Account or EndUser)
  251. if isinstance(current_user, Account):
  252. # Account user
  253. tenant_id = current_user.current_tenant_id
  254. else:
  255. # EndUser
  256. tenant_id = current_user.tenant_id
  257. if not tenant_id:
  258. self._environment_variables = "{}"
  259. return
  260. value = list(value)
  261. if any(var for var in value if not var.id):
  262. raise ValueError("environment variable require a unique id")
  263. # Compare inputs and origin variables,
  264. # if the value is HIDDEN_VALUE, use the origin variable value (only update `name`).
  265. origin_variables_dictionary = {var.id: var for var in self.environment_variables}
  266. for i, variable in enumerate(value):
  267. if variable.id in origin_variables_dictionary and variable.value == HIDDEN_VALUE:
  268. value[i] = origin_variables_dictionary[variable.id].model_copy(update={"name": variable.name})
  269. # encrypt secret variables value
  270. def encrypt_func(var):
  271. if isinstance(var, SecretVariable):
  272. return var.model_copy(update={"value": encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)})
  273. else:
  274. return var
  275. encrypted_vars = list(map(encrypt_func, value))
  276. environment_variables_json = json.dumps(
  277. {var.name: var.model_dump() for var in encrypted_vars},
  278. ensure_ascii=False,
  279. )
  280. self._environment_variables = environment_variables_json
  281. def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
  282. environment_variables = list(self.environment_variables)
  283. environment_variables = [
  284. v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={"value": ""})
  285. for v in environment_variables
  286. ]
  287. result = {
  288. "graph": self.graph_dict,
  289. "features": self.features_dict,
  290. "environment_variables": [var.model_dump(mode="json") for var in environment_variables],
  291. "conversation_variables": [var.model_dump(mode="json") for var in self.conversation_variables],
  292. }
  293. return result
  294. @property
  295. def conversation_variables(self) -> Sequence[Variable]:
  296. # TODO: find some way to init `self._conversation_variables` when instance created.
  297. if self._conversation_variables is None:
  298. self._conversation_variables = "{}"
  299. variables_dict: dict[str, Any] = json.loads(self._conversation_variables)
  300. results = [variable_factory.build_conversation_variable_from_mapping(v) for v in variables_dict.values()]
  301. return results
  302. @conversation_variables.setter
  303. def conversation_variables(self, value: Sequence[Variable]) -> None:
  304. self._conversation_variables = json.dumps(
  305. {var.name: var.model_dump() for var in value},
  306. ensure_ascii=False,
  307. )
  308. class WorkflowRun(Base):
  309. """
  310. Workflow Run
  311. Attributes:
  312. - id (uuid) Run ID
  313. - tenant_id (uuid) Workspace ID
  314. - app_id (uuid) App ID
  315. - workflow_id (uuid) Workflow ID
  316. - type (string) Workflow type
  317. - triggered_from (string) Trigger source
  318. `debugging` for canvas debugging
  319. `app-run` for (published) app execution
  320. - version (string) Version
  321. - graph (text) Workflow canvas configuration (JSON)
  322. - inputs (text) Input parameters
  323. - status (string) Execution status, `running` / `succeeded` / `failed` / `stopped`
  324. - outputs (text) `optional` Output content
  325. - error (string) `optional` Error reason
  326. - elapsed_time (float) `optional` Time consumption (s)
  327. - total_tokens (int) `optional` Total tokens used
  328. - total_steps (int) Total steps (redundant), default 0
  329. - created_by_role (string) Creator role
  330. - `account` Console account
  331. - `end_user` End user
  332. - created_by (uuid) Runner ID
  333. - created_at (timestamp) Run time
  334. - finished_at (timestamp) End time
  335. """
  336. __tablename__ = "workflow_runs"
  337. __table_args__ = (
  338. db.PrimaryKeyConstraint("id", name="workflow_run_pkey"),
  339. db.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"),
  340. )
  341. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  342. tenant_id: Mapped[str] = mapped_column(StringUUID)
  343. app_id: Mapped[str] = mapped_column(StringUUID)
  344. workflow_id: Mapped[str] = mapped_column(StringUUID)
  345. type: Mapped[str] = mapped_column(db.String(255))
  346. triggered_from: Mapped[str] = mapped_column(db.String(255))
  347. version: Mapped[str] = mapped_column(db.String(255))
  348. graph: Mapped[Optional[str]] = mapped_column(db.Text)
  349. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  350. status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded
  351. outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}")
  352. error: Mapped[Optional[str]] = mapped_column(db.Text)
  353. elapsed_time: Mapped[float] = mapped_column(db.Float, nullable=False, server_default=sa.text("0"))
  354. total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0"))
  355. total_steps: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True)
  356. created_by_role: Mapped[str] = mapped_column(db.String(255)) # account, end_user
  357. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  358. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  359. finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
  360. exceptions_count: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True)
  361. @property
  362. def created_by_account(self):
  363. created_by_role = CreatorUserRole(self.created_by_role)
  364. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  365. @property
  366. def created_by_end_user(self):
  367. from models.model import EndUser
  368. created_by_role = CreatorUserRole(self.created_by_role)
  369. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  370. @property
  371. def graph_dict(self) -> Mapping[str, Any]:
  372. return json.loads(self.graph) if self.graph else {}
  373. @property
  374. def inputs_dict(self) -> Mapping[str, Any]:
  375. return json.loads(self.inputs) if self.inputs else {}
  376. @property
  377. def outputs_dict(self) -> Mapping[str, Any]:
  378. return json.loads(self.outputs) if self.outputs else {}
  379. @property
  380. def message(self):
  381. from models.model import Message
  382. return (
  383. db.session.query(Message).filter(Message.app_id == self.app_id, Message.workflow_run_id == self.id).first()
  384. )
  385. @property
  386. def workflow(self):
  387. return db.session.query(Workflow).filter(Workflow.id == self.workflow_id).first()
  388. def to_dict(self):
  389. return {
  390. "id": self.id,
  391. "tenant_id": self.tenant_id,
  392. "app_id": self.app_id,
  393. "workflow_id": self.workflow_id,
  394. "type": self.type,
  395. "triggered_from": self.triggered_from,
  396. "version": self.version,
  397. "graph": self.graph_dict,
  398. "inputs": self.inputs_dict,
  399. "status": self.status,
  400. "outputs": self.outputs_dict,
  401. "error": self.error,
  402. "elapsed_time": self.elapsed_time,
  403. "total_tokens": self.total_tokens,
  404. "total_steps": self.total_steps,
  405. "created_by_role": self.created_by_role,
  406. "created_by": self.created_by,
  407. "created_at": self.created_at,
  408. "finished_at": self.finished_at,
  409. "exceptions_count": self.exceptions_count,
  410. }
  411. @classmethod
  412. def from_dict(cls, data: dict) -> "WorkflowRun":
  413. return cls(
  414. id=data.get("id"),
  415. tenant_id=data.get("tenant_id"),
  416. app_id=data.get("app_id"),
  417. workflow_id=data.get("workflow_id"),
  418. type=data.get("type"),
  419. triggered_from=data.get("triggered_from"),
  420. version=data.get("version"),
  421. graph=json.dumps(data.get("graph")),
  422. inputs=json.dumps(data.get("inputs")),
  423. status=data.get("status"),
  424. outputs=json.dumps(data.get("outputs")),
  425. error=data.get("error"),
  426. elapsed_time=data.get("elapsed_time"),
  427. total_tokens=data.get("total_tokens"),
  428. total_steps=data.get("total_steps"),
  429. created_by_role=data.get("created_by_role"),
  430. created_by=data.get("created_by"),
  431. created_at=data.get("created_at"),
  432. finished_at=data.get("finished_at"),
  433. exceptions_count=data.get("exceptions_count"),
  434. )
  435. class WorkflowNodeExecutionTriggeredFrom(StrEnum):
  436. """
  437. Workflow Node Execution Triggered From Enum
  438. """
  439. SINGLE_STEP = "single-step"
  440. WORKFLOW_RUN = "workflow-run"
  441. class WorkflowNodeExecutionModel(Base):
  442. """
  443. Workflow Node Execution
  444. - id (uuid) Execution ID
  445. - tenant_id (uuid) Workspace ID
  446. - app_id (uuid) App ID
  447. - workflow_id (uuid) Workflow ID
  448. - triggered_from (string) Trigger source
  449. `single-step` for single-step debugging
  450. `workflow-run` for workflow execution (debugging / user execution)
  451. - workflow_run_id (uuid) `optional` Workflow run ID
  452. Null for single-step debugging.
  453. - index (int) Execution sequence number, used for displaying Tracing Node order
  454. - predecessor_node_id (string) `optional` Predecessor node ID, used for displaying execution path
  455. - node_id (string) Node ID
  456. - node_type (string) Node type, such as `start`
  457. - title (string) Node title
  458. - inputs (json) All predecessor node variable content used in the node
  459. - process_data (json) Node process data
  460. - outputs (json) `optional` Node output variables
  461. - status (string) Execution status, `running` / `succeeded` / `failed`
  462. - error (string) `optional` Error reason
  463. - elapsed_time (float) `optional` Time consumption (s)
  464. - execution_metadata (text) Metadata
  465. - total_tokens (int) `optional` Total tokens used
  466. - total_price (decimal) `optional` Total cost
  467. - currency (string) `optional` Currency, such as USD / RMB
  468. - created_at (timestamp) Run time
  469. - created_by_role (string) Creator role
  470. - `account` Console account
  471. - `end_user` End user
  472. - created_by (uuid) Runner ID
  473. - finished_at (timestamp) End time
  474. """
  475. __tablename__ = "workflow_node_executions"
  476. @declared_attr
  477. def __table_args__(cls): # noqa
  478. return (
  479. PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
  480. Index(
  481. "workflow_node_execution_workflow_run_idx",
  482. "tenant_id",
  483. "app_id",
  484. "workflow_id",
  485. "triggered_from",
  486. "workflow_run_id",
  487. ),
  488. Index(
  489. "workflow_node_execution_node_run_idx",
  490. "tenant_id",
  491. "app_id",
  492. "workflow_id",
  493. "triggered_from",
  494. "node_id",
  495. ),
  496. Index(
  497. "workflow_node_execution_id_idx",
  498. "tenant_id",
  499. "app_id",
  500. "workflow_id",
  501. "triggered_from",
  502. "node_execution_id",
  503. ),
  504. Index(
  505. # The first argument is the index name,
  506. # which we leave as `None`` to allow auto-generation by the ORM.
  507. None,
  508. cls.tenant_id,
  509. cls.workflow_id,
  510. cls.node_id,
  511. # MyPy may flag the following line because it doesn't recognize that
  512. # the `declared_attr` decorator passes the receiving class as the first
  513. # argument to this method, allowing us to reference class attributes.
  514. cls.created_at.desc(), # type: ignore
  515. ),
  516. )
  517. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  518. tenant_id: Mapped[str] = mapped_column(StringUUID)
  519. app_id: Mapped[str] = mapped_column(StringUUID)
  520. workflow_id: Mapped[str] = mapped_column(StringUUID)
  521. triggered_from: Mapped[str] = mapped_column(db.String(255))
  522. workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID)
  523. index: Mapped[int] = mapped_column(db.Integer)
  524. predecessor_node_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  525. node_execution_id: Mapped[Optional[str]] = mapped_column(db.String(255))
  526. node_id: Mapped[str] = mapped_column(db.String(255))
  527. node_type: Mapped[str] = mapped_column(db.String(255))
  528. title: Mapped[str] = mapped_column(db.String(255))
  529. inputs: Mapped[Optional[str]] = mapped_column(db.Text)
  530. process_data: Mapped[Optional[str]] = mapped_column(db.Text)
  531. outputs: Mapped[Optional[str]] = mapped_column(db.Text)
  532. status: Mapped[str] = mapped_column(db.String(255))
  533. error: Mapped[Optional[str]] = mapped_column(db.Text)
  534. elapsed_time: Mapped[float] = mapped_column(db.Float, server_default=db.text("0"))
  535. execution_metadata: Mapped[Optional[str]] = mapped_column(db.Text)
  536. created_at: Mapped[datetime] = mapped_column(db.DateTime, server_default=func.current_timestamp())
  537. created_by_role: Mapped[str] = mapped_column(db.String(255))
  538. created_by: Mapped[str] = mapped_column(StringUUID)
  539. finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
  540. @property
  541. def created_by_account(self):
  542. created_by_role = CreatorUserRole(self.created_by_role)
  543. # TODO(-LAN-): Avoid using db.session.get() here.
  544. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  545. @property
  546. def created_by_end_user(self):
  547. from models.model import EndUser
  548. created_by_role = CreatorUserRole(self.created_by_role)
  549. # TODO(-LAN-): Avoid using db.session.get() here.
  550. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  551. @property
  552. def inputs_dict(self):
  553. return json.loads(self.inputs) if self.inputs else None
  554. @property
  555. def outputs_dict(self) -> dict[str, Any] | None:
  556. return json.loads(self.outputs) if self.outputs else None
  557. @property
  558. def process_data_dict(self):
  559. return json.loads(self.process_data) if self.process_data else None
  560. @property
  561. def execution_metadata_dict(self) -> dict[str, Any]:
  562. # When the metadata is unset, we return an empty dictionary instead of `None`.
  563. # This approach streamlines the logic for the caller, making it easier to handle
  564. # cases where metadata is absent.
  565. return json.loads(self.execution_metadata) if self.execution_metadata else {}
  566. @property
  567. def extras(self):
  568. from core.tools.tool_manager import ToolManager
  569. extras = {}
  570. if self.execution_metadata_dict:
  571. from core.workflow.nodes import NodeType
  572. if self.node_type == NodeType.TOOL.value and "tool_info" in self.execution_metadata_dict:
  573. tool_info = self.execution_metadata_dict["tool_info"]
  574. extras["icon"] = ToolManager.get_tool_icon(
  575. tenant_id=self.tenant_id,
  576. provider_type=tool_info["provider_type"],
  577. provider_id=tool_info["provider_id"],
  578. )
  579. return extras
  580. class WorkflowAppLogCreatedFrom(Enum):
  581. """
  582. Workflow App Log Created From Enum
  583. """
  584. SERVICE_API = "service-api"
  585. WEB_APP = "web-app"
  586. INSTALLED_APP = "installed-app"
  587. @classmethod
  588. def value_of(cls, value: str) -> "WorkflowAppLogCreatedFrom":
  589. """
  590. Get value of given mode.
  591. :param value: mode value
  592. :return: mode
  593. """
  594. for mode in cls:
  595. if mode.value == value:
  596. return mode
  597. raise ValueError(f"invalid workflow app log created from value {value}")
  598. class WorkflowAppLog(Base):
  599. """
  600. Workflow App execution log, excluding workflow debugging records.
  601. Attributes:
  602. - id (uuid) run ID
  603. - tenant_id (uuid) Workspace ID
  604. - app_id (uuid) App ID
  605. - workflow_id (uuid) Associated Workflow ID
  606. - workflow_run_id (uuid) Associated Workflow Run ID
  607. - created_from (string) Creation source
  608. `service-api` App Execution OpenAPI
  609. `web-app` WebApp
  610. `installed-app` Installed App
  611. - created_by_role (string) Creator role
  612. - `account` Console account
  613. - `end_user` End user
  614. - created_by (uuid) Creator ID, depends on the user table according to created_by_role
  615. - created_at (timestamp) Creation time
  616. """
  617. __tablename__ = "workflow_app_logs"
  618. __table_args__ = (
  619. db.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"),
  620. db.Index("workflow_app_log_app_idx", "tenant_id", "app_id"),
  621. )
  622. id: Mapped[str] = mapped_column(StringUUID, server_default=db.text("uuid_generate_v4()"))
  623. tenant_id: Mapped[str] = mapped_column(StringUUID)
  624. app_id: Mapped[str] = mapped_column(StringUUID)
  625. workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  626. workflow_run_id: Mapped[str] = mapped_column(StringUUID)
  627. created_from: Mapped[str] = mapped_column(db.String(255), nullable=False)
  628. created_by_role: Mapped[str] = mapped_column(db.String(255), nullable=False)
  629. created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
  630. created_at: Mapped[datetime] = mapped_column(db.DateTime, nullable=False, server_default=func.current_timestamp())
  631. @property
  632. def workflow_run(self):
  633. return db.session.get(WorkflowRun, self.workflow_run_id)
  634. @property
  635. def created_by_account(self):
  636. created_by_role = CreatorUserRole(self.created_by_role)
  637. return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None
  638. @property
  639. def created_by_end_user(self):
  640. from models.model import EndUser
  641. created_by_role = CreatorUserRole(self.created_by_role)
  642. return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None
  643. class ConversationVariable(Base):
  644. __tablename__ = "workflow_conversation_variables"
  645. id: Mapped[str] = mapped_column(StringUUID, primary_key=True)
  646. conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False, primary_key=True, index=True)
  647. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False, index=True)
  648. data: Mapped[str] = mapped_column(db.Text, nullable=False)
  649. created_at: Mapped[datetime] = mapped_column(
  650. db.DateTime, nullable=False, server_default=func.current_timestamp(), index=True
  651. )
  652. updated_at: Mapped[datetime] = mapped_column(
  653. db.DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
  654. )
  655. def __init__(self, *, id: str, app_id: str, conversation_id: str, data: str) -> None:
  656. self.id = id
  657. self.app_id = app_id
  658. self.conversation_id = conversation_id
  659. self.data = data
  660. @classmethod
  661. def from_variable(cls, *, app_id: str, conversation_id: str, variable: Variable) -> "ConversationVariable":
  662. obj = cls(
  663. id=variable.id,
  664. app_id=app_id,
  665. conversation_id=conversation_id,
  666. data=variable.model_dump_json(),
  667. )
  668. return obj
  669. def to_variable(self) -> Variable:
  670. mapping = json.loads(self.data)
  671. return variable_factory.build_conversation_variable_from_mapping(mapping)
  672. # Only `sys.query` and `sys.files` could be modified.
  673. _EDITABLE_SYSTEM_VARIABLE = frozenset(["query", "files"])
  674. def _naive_utc_datetime():
  675. return datetime.now(UTC).replace(tzinfo=None)
  676. class WorkflowDraftVariable(Base):
  677. @staticmethod
  678. def unique_columns() -> list[str]:
  679. return [
  680. "app_id",
  681. "node_id",
  682. "name",
  683. ]
  684. __tablename__ = "workflow_draft_variables"
  685. __table_args__ = (UniqueConstraint(*unique_columns()),)
  686. # id is the unique identifier of a draft variable.
  687. id: Mapped[str] = mapped_column(StringUUID, primary_key=True, server_default=db.text("uuid_generate_v4()"))
  688. created_at: Mapped[datetime] = mapped_column(
  689. db.DateTime,
  690. nullable=False,
  691. default=_naive_utc_datetime,
  692. server_default=func.current_timestamp(),
  693. )
  694. updated_at: Mapped[datetime] = mapped_column(
  695. db.DateTime,
  696. nullable=False,
  697. default=_naive_utc_datetime,
  698. server_default=func.current_timestamp(),
  699. onupdate=func.current_timestamp(),
  700. )
  701. # "`app_id` maps to the `id` field in the `model.App` model."
  702. app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
  703. # `last_edited_at` records when the value of a given draft variable
  704. # is edited.
  705. #
  706. # If it's not edited after creation, its value is `None`.
  707. last_edited_at: Mapped[datetime | None] = mapped_column(
  708. db.DateTime,
  709. nullable=True,
  710. default=None,
  711. )
  712. # The `node_id` field is special.
  713. #
  714. # If the variable is a conversation variable or a system variable, then the value of `node_id`
  715. # is `conversation` or `sys`, respective.
  716. #
  717. # Otherwise, if the variable is a variable belonging to a specific node, the value of `_node_id` is
  718. # the identity of correspond node in graph definition. An example of node id is `"1745769620734"`.
  719. #
  720. # However, there's one caveat. The id of the first "Answer" node in chatflow is "answer". (Other
  721. # "Answer" node conform the rules above.)
  722. node_id: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="node_id")
  723. # From `VARIABLE_PATTERN`, we may conclude that the length of a top level variable is less than
  724. # 80 chars.
  725. #
  726. # ref: api/core/workflow/entities/variable_pool.py:18
  727. name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
  728. description: Mapped[str] = mapped_column(
  729. sa.String(255),
  730. default="",
  731. nullable=False,
  732. )
  733. selector: Mapped[str] = mapped_column(sa.String(255), nullable=False, name="selector")
  734. # The data type of this variable's value
  735. value_type: Mapped[SegmentType] = mapped_column(EnumText(SegmentType, length=20))
  736. # The variable's value serialized as a JSON string
  737. value: Mapped[str] = mapped_column(sa.Text, nullable=False, name="value")
  738. # Controls whether the variable should be displayed in the variable inspection panel
  739. visible: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=True)
  740. # Determines whether this variable can be modified by users
  741. editable: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, default=False)
  742. # The `node_execution_id` field identifies the workflow node execution that created this variable.
  743. # It corresponds to the `id` field in the `WorkflowNodeExecutionModel` model.
  744. #
  745. # This field is not `None` for system variables and node variables, and is `None`
  746. # for conversation variables.
  747. node_execution_id: Mapped[str | None] = mapped_column(
  748. StringUUID,
  749. nullable=True,
  750. default=None,
  751. )
  752. def get_selector(self) -> list[str]:
  753. selector = json.loads(self.selector)
  754. if not isinstance(selector, list):
  755. _logger.error(
  756. "invalid selector loaded from database, type=%s, value=%s",
  757. type(selector),
  758. self.selector,
  759. )
  760. raise ValueError("invalid selector.")
  761. return selector
  762. def _set_selector(self, value: list[str]):
  763. self.selector = json.dumps(value)
  764. def get_value(self) -> Segment | None:
  765. return build_segment(json.loads(self.value))
  766. def set_name(self, name: str):
  767. self.name = name
  768. self._set_selector([self.node_id, name])
  769. def set_value(self, value: Segment):
  770. self.value = json.dumps(value.value)
  771. self.value_type = value.value_type
  772. def get_node_id(self) -> str | None:
  773. if self.get_variable_type() == DraftVariableType.NODE:
  774. return self.node_id
  775. else:
  776. return None
  777. def get_variable_type(self) -> DraftVariableType:
  778. match self.node_id:
  779. case DraftVariableType.CONVERSATION:
  780. return DraftVariableType.CONVERSATION
  781. case DraftVariableType.SYS:
  782. return DraftVariableType.SYS
  783. case _:
  784. return DraftVariableType.NODE
  785. @classmethod
  786. def _new(
  787. cls,
  788. *,
  789. app_id: str,
  790. node_id: str,
  791. name: str,
  792. value: Segment,
  793. description: str = "",
  794. ) -> "WorkflowDraftVariable":
  795. variable = WorkflowDraftVariable()
  796. variable.created_at = _naive_utc_datetime()
  797. variable.updated_at = _naive_utc_datetime()
  798. variable.description = description
  799. variable.app_id = app_id
  800. variable.node_id = node_id
  801. variable.name = name
  802. variable.set_value(value)
  803. variable._set_selector(list(variable_utils.to_selector(node_id, name)))
  804. return variable
  805. @classmethod
  806. def new_conversation_variable(
  807. cls,
  808. *,
  809. app_id: str,
  810. name: str,
  811. value: Segment,
  812. ) -> "WorkflowDraftVariable":
  813. variable = cls._new(
  814. app_id=app_id,
  815. node_id=CONVERSATION_VARIABLE_NODE_ID,
  816. name=name,
  817. value=value,
  818. )
  819. return variable
  820. @classmethod
  821. def new_sys_variable(
  822. cls,
  823. *,
  824. app_id: str,
  825. name: str,
  826. value: Segment,
  827. editable: bool = False,
  828. ) -> "WorkflowDraftVariable":
  829. variable = cls._new(app_id=app_id, node_id=SYSTEM_VARIABLE_NODE_ID, name=name, value=value)
  830. variable.editable = editable
  831. return variable
  832. @classmethod
  833. def new_node_variable(
  834. cls,
  835. *,
  836. app_id: str,
  837. node_id: str,
  838. name: str,
  839. value: Segment,
  840. visible: bool = True,
  841. ) -> "WorkflowDraftVariable":
  842. variable = cls._new(app_id=app_id, node_id=node_id, name=name, value=value)
  843. variable.visible = visible
  844. variable.editable = True
  845. return variable
  846. @property
  847. def edited(self):
  848. return self.last_edited_at is not None
  849. def is_system_variable_editable(name: str) -> bool:
  850. return name in _EDITABLE_SYSTEM_VARIABLE