You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workflow_service.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from uuid import uuid4
  7. from sqlalchemy import select
  8. from sqlalchemy.orm import Session
  9. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  10. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  11. from core.model_runtime.utils.encoders import jsonable_encoder
  12. from core.variables import Variable
  13. from core.workflow.entities.node_entities import NodeRunResult
  14. from core.workflow.errors import WorkflowNodeRunFailedError
  15. from core.workflow.graph_engine.entities.event import InNodeEvent
  16. from core.workflow.nodes import NodeType
  17. from core.workflow.nodes.base.node import BaseNode
  18. from core.workflow.nodes.enums import ErrorStrategy
  19. from core.workflow.nodes.event import RunCompletedEvent
  20. from core.workflow.nodes.event.types import NodeEvent
  21. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  22. from core.workflow.repository import RepositoryFactory
  23. from core.workflow.workflow_entry import WorkflowEntry
  24. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  25. from extensions.ext_database import db
  26. from models.account import Account
  27. from models.enums import CreatedByRole
  28. from models.model import App, AppMode
  29. from models.tools import WorkflowToolProvider
  30. from models.workflow import (
  31. Workflow,
  32. WorkflowNodeExecution,
  33. WorkflowNodeExecutionStatus,
  34. WorkflowNodeExecutionTriggeredFrom,
  35. WorkflowType,
  36. )
  37. from services.errors.app import WorkflowHashNotEqualError
  38. from services.workflow.workflow_converter import WorkflowConverter
  39. from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
  40. class WorkflowService:
  41. """
  42. Workflow Service
  43. """
  44. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  45. """
  46. Get draft workflow
  47. """
  48. # fetch draft workflow by app_model
  49. workflow = (
  50. db.session.query(Workflow)
  51. .filter(
  52. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  53. )
  54. .first()
  55. )
  56. # return draft workflow
  57. return workflow
  58. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  59. """
  60. Get published workflow
  61. """
  62. if not app_model.workflow_id:
  63. return None
  64. # fetch published workflow by workflow_id
  65. workflow = (
  66. db.session.query(Workflow)
  67. .filter(
  68. Workflow.tenant_id == app_model.tenant_id,
  69. Workflow.app_id == app_model.id,
  70. Workflow.id == app_model.workflow_id,
  71. )
  72. .first()
  73. )
  74. return workflow
  75. def get_all_published_workflow(
  76. self,
  77. *,
  78. session: Session,
  79. app_model: App,
  80. page: int,
  81. limit: int,
  82. user_id: str | None,
  83. named_only: bool = False,
  84. ) -> tuple[Sequence[Workflow], bool]:
  85. """
  86. Get published workflow with pagination
  87. """
  88. if not app_model.workflow_id:
  89. return [], False
  90. stmt = (
  91. select(Workflow)
  92. .where(Workflow.app_id == app_model.id)
  93. .order_by(Workflow.version.desc())
  94. .limit(limit + 1)
  95. .offset((page - 1) * limit)
  96. )
  97. if user_id:
  98. stmt = stmt.where(Workflow.created_by == user_id)
  99. if named_only:
  100. stmt = stmt.where(Workflow.marked_name != "")
  101. workflows = session.scalars(stmt).all()
  102. has_more = len(workflows) > limit
  103. if has_more:
  104. workflows = workflows[:-1]
  105. return workflows, has_more
  106. def sync_draft_workflow(
  107. self,
  108. *,
  109. app_model: App,
  110. graph: dict,
  111. features: dict,
  112. unique_hash: Optional[str],
  113. account: Account,
  114. environment_variables: Sequence[Variable],
  115. conversation_variables: Sequence[Variable],
  116. ) -> Workflow:
  117. """
  118. Sync draft workflow
  119. :raises WorkflowHashNotEqualError
  120. """
  121. # fetch draft workflow by app_model
  122. workflow = self.get_draft_workflow(app_model=app_model)
  123. if workflow and workflow.unique_hash != unique_hash:
  124. raise WorkflowHashNotEqualError()
  125. # validate features structure
  126. self.validate_features_structure(app_model=app_model, features=features)
  127. # create draft workflow if not found
  128. if not workflow:
  129. workflow = Workflow(
  130. tenant_id=app_model.tenant_id,
  131. app_id=app_model.id,
  132. type=WorkflowType.from_app_mode(app_model.mode).value,
  133. version="draft",
  134. graph=json.dumps(graph),
  135. features=json.dumps(features),
  136. created_by=account.id,
  137. environment_variables=environment_variables,
  138. conversation_variables=conversation_variables,
  139. )
  140. db.session.add(workflow)
  141. # update draft workflow if found
  142. else:
  143. workflow.graph = json.dumps(graph)
  144. workflow.features = json.dumps(features)
  145. workflow.updated_by = account.id
  146. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  147. workflow.environment_variables = environment_variables
  148. workflow.conversation_variables = conversation_variables
  149. # commit db session changes
  150. db.session.commit()
  151. # trigger app workflow events
  152. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  153. # return draft workflow
  154. return workflow
  155. def publish_workflow(
  156. self,
  157. *,
  158. session: Session,
  159. app_model: App,
  160. account: Account,
  161. marked_name: str = "",
  162. marked_comment: str = "",
  163. ) -> Workflow:
  164. draft_workflow_stmt = select(Workflow).where(
  165. Workflow.tenant_id == app_model.tenant_id,
  166. Workflow.app_id == app_model.id,
  167. Workflow.version == "draft",
  168. )
  169. draft_workflow = session.scalar(draft_workflow_stmt)
  170. if not draft_workflow:
  171. raise ValueError("No valid workflow found.")
  172. # create new workflow
  173. workflow = Workflow.new(
  174. tenant_id=app_model.tenant_id,
  175. app_id=app_model.id,
  176. type=draft_workflow.type,
  177. version=str(datetime.now(UTC).replace(tzinfo=None)),
  178. graph=draft_workflow.graph,
  179. features=draft_workflow.features,
  180. created_by=account.id,
  181. environment_variables=draft_workflow.environment_variables,
  182. conversation_variables=draft_workflow.conversation_variables,
  183. marked_name=marked_name,
  184. marked_comment=marked_comment,
  185. )
  186. # commit db session changes
  187. session.add(workflow)
  188. # trigger app workflow events
  189. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  190. # return new workflow
  191. return workflow
  192. def get_default_block_configs(self) -> list[dict]:
  193. """
  194. Get default block configs
  195. """
  196. # return default block config
  197. default_block_configs = []
  198. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  199. node_class = node_class_mapping[LATEST_VERSION]
  200. default_config = node_class.get_default_config()
  201. if default_config:
  202. default_block_configs.append(default_config)
  203. return default_block_configs
  204. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  205. """
  206. Get default config of node.
  207. :param node_type: node type
  208. :param filters: filter by node config parameters.
  209. :return:
  210. """
  211. node_type_enum = NodeType(node_type)
  212. # return default block config
  213. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  214. return None
  215. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  216. default_config = node_class.get_default_config(filters=filters)
  217. if not default_config:
  218. return None
  219. return default_config
  220. def run_draft_workflow_node(
  221. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  222. ) -> WorkflowNodeExecution:
  223. """
  224. Run draft workflow node
  225. """
  226. # fetch draft workflow by app_model
  227. draft_workflow = self.get_draft_workflow(app_model=app_model)
  228. if not draft_workflow:
  229. raise ValueError("Workflow not initialized")
  230. # run draft workflow node
  231. start_at = time.perf_counter()
  232. workflow_node_execution = self._handle_node_run_result(
  233. getter=lambda: WorkflowEntry.single_step_run(
  234. workflow=draft_workflow,
  235. node_id=node_id,
  236. user_inputs=user_inputs,
  237. user_id=account.id,
  238. ),
  239. start_at=start_at,
  240. tenant_id=app_model.tenant_id,
  241. node_id=node_id,
  242. )
  243. workflow_node_execution.app_id = app_model.id
  244. workflow_node_execution.created_by = account.id
  245. workflow_node_execution.workflow_id = draft_workflow.id
  246. # Use the repository to save the workflow node execution
  247. repository = RepositoryFactory.create_workflow_node_execution_repository(
  248. params={
  249. "tenant_id": app_model.tenant_id,
  250. "app_id": app_model.id,
  251. "session_factory": db.session.get_bind(),
  252. }
  253. )
  254. repository.save(workflow_node_execution)
  255. return workflow_node_execution
  256. def run_free_workflow_node(
  257. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  258. ) -> WorkflowNodeExecution:
  259. """
  260. Run draft workflow node
  261. """
  262. # run draft workflow node
  263. start_at = time.perf_counter()
  264. workflow_node_execution = self._handle_node_run_result(
  265. getter=lambda: WorkflowEntry.run_free_node(
  266. node_id=node_id,
  267. node_data=node_data,
  268. tenant_id=tenant_id,
  269. user_id=user_id,
  270. user_inputs=user_inputs,
  271. ),
  272. start_at=start_at,
  273. tenant_id=tenant_id,
  274. node_id=node_id,
  275. )
  276. return workflow_node_execution
  277. def _handle_node_run_result(
  278. self,
  279. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  280. start_at: float,
  281. tenant_id: str,
  282. node_id: str,
  283. ) -> WorkflowNodeExecution:
  284. """
  285. Handle node run result
  286. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  287. :param start_at: float
  288. :param tenant_id: str
  289. :param node_id: str
  290. """
  291. try:
  292. node_instance, generator = getter()
  293. node_run_result: NodeRunResult | None = None
  294. for event in generator:
  295. if isinstance(event, RunCompletedEvent):
  296. node_run_result = event.run_result
  297. # sign output files
  298. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  299. break
  300. if not node_run_result:
  301. raise ValueError("Node run failed with no run result")
  302. # single step debug mode error handling return
  303. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  304. node_error_args: dict[str, Any] = {
  305. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  306. "error": node_run_result.error,
  307. "inputs": node_run_result.inputs,
  308. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  309. }
  310. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  311. node_run_result = NodeRunResult(
  312. **node_error_args,
  313. outputs={
  314. **node_instance.node_data.default_value_dict,
  315. "error_message": node_run_result.error,
  316. "error_type": node_run_result.error_type,
  317. },
  318. )
  319. else:
  320. node_run_result = NodeRunResult(
  321. **node_error_args,
  322. outputs={
  323. "error_message": node_run_result.error,
  324. "error_type": node_run_result.error_type,
  325. },
  326. )
  327. run_succeeded = node_run_result.status in (
  328. WorkflowNodeExecutionStatus.SUCCEEDED,
  329. WorkflowNodeExecutionStatus.EXCEPTION,
  330. )
  331. error = node_run_result.error if not run_succeeded else None
  332. except WorkflowNodeRunFailedError as e:
  333. node_instance = e.node_instance
  334. run_succeeded = False
  335. node_run_result = None
  336. error = e.error
  337. workflow_node_execution = WorkflowNodeExecution()
  338. workflow_node_execution.id = str(uuid4())
  339. workflow_node_execution.tenant_id = tenant_id
  340. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  341. workflow_node_execution.index = 1
  342. workflow_node_execution.node_id = node_id
  343. workflow_node_execution.node_type = node_instance.node_type
  344. workflow_node_execution.title = node_instance.node_data.title
  345. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  346. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  347. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  348. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  349. if run_succeeded and node_run_result:
  350. # create workflow node execution
  351. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  352. process_data = (
  353. WorkflowEntry.handle_special_values(node_run_result.process_data)
  354. if node_run_result.process_data
  355. else None
  356. )
  357. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  358. workflow_node_execution.inputs = json.dumps(inputs)
  359. workflow_node_execution.process_data = json.dumps(process_data)
  360. workflow_node_execution.outputs = json.dumps(outputs)
  361. workflow_node_execution.execution_metadata = (
  362. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  363. )
  364. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  365. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  366. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  367. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  368. workflow_node_execution.error = node_run_result.error
  369. else:
  370. # create workflow node execution
  371. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  372. workflow_node_execution.error = error
  373. return workflow_node_execution
  374. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  375. """
  376. Basic mode of chatbot app(expert mode) to workflow
  377. Completion App to Workflow App
  378. :param app_model: App instance
  379. :param account: Account instance
  380. :param args: dict
  381. :return:
  382. """
  383. # chatbot convert to workflow mode
  384. workflow_converter = WorkflowConverter()
  385. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  386. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  387. # convert to workflow
  388. new_app: App = workflow_converter.convert_to_workflow(
  389. app_model=app_model,
  390. account=account,
  391. name=args.get("name", "Default Name"),
  392. icon_type=args.get("icon_type", "emoji"),
  393. icon=args.get("icon", "🤖"),
  394. icon_background=args.get("icon_background", "#FFEAD5"),
  395. )
  396. return new_app
  397. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  398. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  399. return AdvancedChatAppConfigManager.config_validate(
  400. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  401. )
  402. elif app_model.mode == AppMode.WORKFLOW.value:
  403. return WorkflowAppConfigManager.config_validate(
  404. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  405. )
  406. else:
  407. raise ValueError(f"Invalid app mode: {app_model.mode}")
  408. def update_workflow(
  409. self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict
  410. ) -> Optional[Workflow]:
  411. """
  412. Update workflow attributes
  413. :param session: SQLAlchemy database session
  414. :param workflow_id: Workflow ID
  415. :param tenant_id: Tenant ID
  416. :param account_id: Account ID (for permission check)
  417. :param data: Dictionary containing fields to update
  418. :return: Updated workflow or None if not found
  419. """
  420. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  421. workflow = session.scalar(stmt)
  422. if not workflow:
  423. return None
  424. allowed_fields = ["marked_name", "marked_comment"]
  425. for field, value in data.items():
  426. if field in allowed_fields:
  427. setattr(workflow, field, value)
  428. workflow.updated_by = account_id
  429. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  430. return workflow
  431. def delete_workflow(self, *, session: Session, workflow_id: str, tenant_id: str) -> bool:
  432. """
  433. Delete a workflow
  434. :param session: SQLAlchemy database session
  435. :param workflow_id: Workflow ID
  436. :param tenant_id: Tenant ID
  437. :return: True if successful
  438. :raises: ValueError if workflow not found
  439. :raises: WorkflowInUseError if workflow is in use
  440. :raises: DraftWorkflowDeletionError if workflow is a draft version
  441. """
  442. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  443. workflow = session.scalar(stmt)
  444. if not workflow:
  445. raise ValueError(f"Workflow with ID {workflow_id} not found")
  446. # Check if workflow is a draft version
  447. if workflow.version == "draft":
  448. raise DraftWorkflowDeletionError("Cannot delete draft workflow versions")
  449. # Check if this workflow is currently referenced by an app
  450. stmt = select(App).where(App.workflow_id == workflow_id)
  451. app = session.scalar(stmt)
  452. if app:
  453. # Cannot delete a workflow that's currently in use by an app
  454. raise WorkflowInUseError(f"Cannot delete workflow that is currently in use by app '{app.name}'")
  455. # Don't use workflow.tool_published as it's not accurate for specific workflow versions
  456. # Check if there's a tool provider using this specific workflow version
  457. tool_provider = (
  458. session.query(WorkflowToolProvider)
  459. .filter(
  460. WorkflowToolProvider.tenant_id == workflow.tenant_id,
  461. WorkflowToolProvider.app_id == workflow.app_id,
  462. WorkflowToolProvider.version == workflow.version,
  463. )
  464. .first()
  465. )
  466. if tool_provider:
  467. # Cannot delete a workflow that's published as a tool
  468. raise WorkflowInUseError("Cannot delete workflow that is published as a tool")
  469. session.delete(workflow)
  470. return True