You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workflow_service.py 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from uuid import uuid4
  7. from sqlalchemy import select
  8. from sqlalchemy.orm import Session
  9. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  10. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  11. from core.model_runtime.utils.encoders import jsonable_encoder
  12. from core.repository import RepositoryFactory
  13. from core.variables import Variable
  14. from core.workflow.entities.node_entities import NodeRunResult
  15. from core.workflow.errors import WorkflowNodeRunFailedError
  16. from core.workflow.graph_engine.entities.event import InNodeEvent
  17. from core.workflow.nodes import NodeType
  18. from core.workflow.nodes.base.node import BaseNode
  19. from core.workflow.nodes.enums import ErrorStrategy
  20. from core.workflow.nodes.event import RunCompletedEvent
  21. from core.workflow.nodes.event.types import NodeEvent
  22. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  23. from core.workflow.workflow_entry import WorkflowEntry
  24. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  25. from extensions.ext_database import db
  26. from models.account import Account
  27. from models.enums import CreatedByRole
  28. from models.model import App, AppMode
  29. from models.workflow import (
  30. Workflow,
  31. WorkflowNodeExecution,
  32. WorkflowNodeExecutionStatus,
  33. WorkflowNodeExecutionTriggeredFrom,
  34. WorkflowType,
  35. )
  36. from services.errors.app import WorkflowHashNotEqualError
  37. from services.workflow.workflow_converter import WorkflowConverter
  38. from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
  39. class WorkflowService:
  40. """
  41. Workflow Service
  42. """
  43. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  44. """
  45. Get draft workflow
  46. """
  47. # fetch draft workflow by app_model
  48. workflow = (
  49. db.session.query(Workflow)
  50. .filter(
  51. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  52. )
  53. .first()
  54. )
  55. # return draft workflow
  56. return workflow
  57. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  58. """
  59. Get published workflow
  60. """
  61. if not app_model.workflow_id:
  62. return None
  63. # fetch published workflow by workflow_id
  64. workflow = (
  65. db.session.query(Workflow)
  66. .filter(
  67. Workflow.tenant_id == app_model.tenant_id,
  68. Workflow.app_id == app_model.id,
  69. Workflow.id == app_model.workflow_id,
  70. )
  71. .first()
  72. )
  73. return workflow
  74. def get_all_published_workflow(
  75. self,
  76. *,
  77. session: Session,
  78. app_model: App,
  79. page: int,
  80. limit: int,
  81. user_id: str | None,
  82. named_only: bool = False,
  83. ) -> tuple[Sequence[Workflow], bool]:
  84. """
  85. Get published workflow with pagination
  86. """
  87. if not app_model.workflow_id:
  88. return [], False
  89. stmt = (
  90. select(Workflow)
  91. .where(Workflow.app_id == app_model.id)
  92. .order_by(Workflow.version.desc())
  93. .limit(limit + 1)
  94. .offset((page - 1) * limit)
  95. )
  96. if user_id:
  97. stmt = stmt.where(Workflow.created_by == user_id)
  98. if named_only:
  99. stmt = stmt.where(Workflow.marked_name != "")
  100. workflows = session.scalars(stmt).all()
  101. has_more = len(workflows) > limit
  102. if has_more:
  103. workflows = workflows[:-1]
  104. return workflows, has_more
  105. def sync_draft_workflow(
  106. self,
  107. *,
  108. app_model: App,
  109. graph: dict,
  110. features: dict,
  111. unique_hash: Optional[str],
  112. account: Account,
  113. environment_variables: Sequence[Variable],
  114. conversation_variables: Sequence[Variable],
  115. ) -> Workflow:
  116. """
  117. Sync draft workflow
  118. :raises WorkflowHashNotEqualError
  119. """
  120. # fetch draft workflow by app_model
  121. workflow = self.get_draft_workflow(app_model=app_model)
  122. if workflow and workflow.unique_hash != unique_hash:
  123. raise WorkflowHashNotEqualError()
  124. # validate features structure
  125. self.validate_features_structure(app_model=app_model, features=features)
  126. # create draft workflow if not found
  127. if not workflow:
  128. workflow = Workflow(
  129. tenant_id=app_model.tenant_id,
  130. app_id=app_model.id,
  131. type=WorkflowType.from_app_mode(app_model.mode).value,
  132. version="draft",
  133. graph=json.dumps(graph),
  134. features=json.dumps(features),
  135. created_by=account.id,
  136. environment_variables=environment_variables,
  137. conversation_variables=conversation_variables,
  138. )
  139. db.session.add(workflow)
  140. # update draft workflow if found
  141. else:
  142. workflow.graph = json.dumps(graph)
  143. workflow.features = json.dumps(features)
  144. workflow.updated_by = account.id
  145. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  146. workflow.environment_variables = environment_variables
  147. workflow.conversation_variables = conversation_variables
  148. # commit db session changes
  149. db.session.commit()
  150. # trigger app workflow events
  151. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  152. # return draft workflow
  153. return workflow
  154. def publish_workflow(
  155. self,
  156. *,
  157. session: Session,
  158. app_model: App,
  159. account: Account,
  160. marked_name: str = "",
  161. marked_comment: str = "",
  162. ) -> Workflow:
  163. draft_workflow_stmt = select(Workflow).where(
  164. Workflow.tenant_id == app_model.tenant_id,
  165. Workflow.app_id == app_model.id,
  166. Workflow.version == "draft",
  167. )
  168. draft_workflow = session.scalar(draft_workflow_stmt)
  169. if not draft_workflow:
  170. raise ValueError("No valid workflow found.")
  171. # create new workflow
  172. workflow = Workflow.new(
  173. tenant_id=app_model.tenant_id,
  174. app_id=app_model.id,
  175. type=draft_workflow.type,
  176. version=str(datetime.now(UTC).replace(tzinfo=None)),
  177. graph=draft_workflow.graph,
  178. features=draft_workflow.features,
  179. created_by=account.id,
  180. environment_variables=draft_workflow.environment_variables,
  181. conversation_variables=draft_workflow.conversation_variables,
  182. marked_name=marked_name,
  183. marked_comment=marked_comment,
  184. )
  185. # commit db session changes
  186. session.add(workflow)
  187. # trigger app workflow events
  188. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  189. # return new workflow
  190. return workflow
  191. def get_default_block_configs(self) -> list[dict]:
  192. """
  193. Get default block configs
  194. """
  195. # return default block config
  196. default_block_configs = []
  197. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  198. node_class = node_class_mapping[LATEST_VERSION]
  199. default_config = node_class.get_default_config()
  200. if default_config:
  201. default_block_configs.append(default_config)
  202. return default_block_configs
  203. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  204. """
  205. Get default config of node.
  206. :param node_type: node type
  207. :param filters: filter by node config parameters.
  208. :return:
  209. """
  210. node_type_enum = NodeType(node_type)
  211. # return default block config
  212. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  213. return None
  214. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  215. default_config = node_class.get_default_config(filters=filters)
  216. if not default_config:
  217. return None
  218. return default_config
  219. def run_draft_workflow_node(
  220. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  221. ) -> WorkflowNodeExecution:
  222. """
  223. Run draft workflow node
  224. """
  225. # fetch draft workflow by app_model
  226. draft_workflow = self.get_draft_workflow(app_model=app_model)
  227. if not draft_workflow:
  228. raise ValueError("Workflow not initialized")
  229. # run draft workflow node
  230. start_at = time.perf_counter()
  231. workflow_node_execution = self._handle_node_run_result(
  232. getter=lambda: WorkflowEntry.single_step_run(
  233. workflow=draft_workflow,
  234. node_id=node_id,
  235. user_inputs=user_inputs,
  236. user_id=account.id,
  237. ),
  238. start_at=start_at,
  239. tenant_id=app_model.tenant_id,
  240. node_id=node_id,
  241. )
  242. workflow_node_execution.app_id = app_model.id
  243. workflow_node_execution.created_by = account.id
  244. workflow_node_execution.workflow_id = draft_workflow.id
  245. # Use the repository to save the workflow node execution
  246. repository = RepositoryFactory.create_workflow_node_execution_repository(
  247. params={
  248. "tenant_id": app_model.tenant_id,
  249. "app_id": app_model.id,
  250. "session_factory": db.session.get_bind,
  251. }
  252. )
  253. repository.save(workflow_node_execution)
  254. return workflow_node_execution
  255. def run_free_workflow_node(
  256. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  257. ) -> WorkflowNodeExecution:
  258. """
  259. Run draft workflow node
  260. """
  261. # run draft workflow node
  262. start_at = time.perf_counter()
  263. workflow_node_execution = self._handle_node_run_result(
  264. getter=lambda: WorkflowEntry.run_free_node(
  265. node_id=node_id,
  266. node_data=node_data,
  267. tenant_id=tenant_id,
  268. user_id=user_id,
  269. user_inputs=user_inputs,
  270. ),
  271. start_at=start_at,
  272. tenant_id=tenant_id,
  273. node_id=node_id,
  274. )
  275. return workflow_node_execution
  276. def _handle_node_run_result(
  277. self,
  278. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  279. start_at: float,
  280. tenant_id: str,
  281. node_id: str,
  282. ) -> WorkflowNodeExecution:
  283. """
  284. Handle node run result
  285. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  286. :param start_at: float
  287. :param tenant_id: str
  288. :param node_id: str
  289. """
  290. try:
  291. node_instance, generator = getter()
  292. node_run_result: NodeRunResult | None = None
  293. for event in generator:
  294. if isinstance(event, RunCompletedEvent):
  295. node_run_result = event.run_result
  296. # sign output files
  297. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  298. break
  299. if not node_run_result:
  300. raise ValueError("Node run failed with no run result")
  301. # single step debug mode error handling return
  302. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  303. node_error_args: dict[str, Any] = {
  304. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  305. "error": node_run_result.error,
  306. "inputs": node_run_result.inputs,
  307. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  308. }
  309. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  310. node_run_result = NodeRunResult(
  311. **node_error_args,
  312. outputs={
  313. **node_instance.node_data.default_value_dict,
  314. "error_message": node_run_result.error,
  315. "error_type": node_run_result.error_type,
  316. },
  317. )
  318. else:
  319. node_run_result = NodeRunResult(
  320. **node_error_args,
  321. outputs={
  322. "error_message": node_run_result.error,
  323. "error_type": node_run_result.error_type,
  324. },
  325. )
  326. run_succeeded = node_run_result.status in (
  327. WorkflowNodeExecutionStatus.SUCCEEDED,
  328. WorkflowNodeExecutionStatus.EXCEPTION,
  329. )
  330. error = node_run_result.error if not run_succeeded else None
  331. except WorkflowNodeRunFailedError as e:
  332. node_instance = e.node_instance
  333. run_succeeded = False
  334. node_run_result = None
  335. error = e.error
  336. workflow_node_execution = WorkflowNodeExecution()
  337. workflow_node_execution.id = str(uuid4())
  338. workflow_node_execution.tenant_id = tenant_id
  339. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  340. workflow_node_execution.index = 1
  341. workflow_node_execution.node_id = node_id
  342. workflow_node_execution.node_type = node_instance.node_type
  343. workflow_node_execution.title = node_instance.node_data.title
  344. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  345. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  346. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  347. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  348. if run_succeeded and node_run_result:
  349. # create workflow node execution
  350. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  351. process_data = (
  352. WorkflowEntry.handle_special_values(node_run_result.process_data)
  353. if node_run_result.process_data
  354. else None
  355. )
  356. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  357. workflow_node_execution.inputs = json.dumps(inputs)
  358. workflow_node_execution.process_data = json.dumps(process_data)
  359. workflow_node_execution.outputs = json.dumps(outputs)
  360. workflow_node_execution.execution_metadata = (
  361. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  362. )
  363. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  364. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  365. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  366. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  367. workflow_node_execution.error = node_run_result.error
  368. else:
  369. # create workflow node execution
  370. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  371. workflow_node_execution.error = error
  372. return workflow_node_execution
  373. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  374. """
  375. Basic mode of chatbot app(expert mode) to workflow
  376. Completion App to Workflow App
  377. :param app_model: App instance
  378. :param account: Account instance
  379. :param args: dict
  380. :return:
  381. """
  382. # chatbot convert to workflow mode
  383. workflow_converter = WorkflowConverter()
  384. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  385. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  386. # convert to workflow
  387. new_app: App = workflow_converter.convert_to_workflow(
  388. app_model=app_model,
  389. account=account,
  390. name=args.get("name", "Default Name"),
  391. icon_type=args.get("icon_type", "emoji"),
  392. icon=args.get("icon", "🤖"),
  393. icon_background=args.get("icon_background", "#FFEAD5"),
  394. )
  395. return new_app
  396. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  397. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  398. return AdvancedChatAppConfigManager.config_validate(
  399. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  400. )
  401. elif app_model.mode == AppMode.WORKFLOW.value:
  402. return WorkflowAppConfigManager.config_validate(
  403. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  404. )
  405. else:
  406. raise ValueError(f"Invalid app mode: {app_model.mode}")
  407. def update_workflow(
  408. self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict
  409. ) -> Optional[Workflow]:
  410. """
  411. Update workflow attributes
  412. :param session: SQLAlchemy database session
  413. :param workflow_id: Workflow ID
  414. :param tenant_id: Tenant ID
  415. :param account_id: Account ID (for permission check)
  416. :param data: Dictionary containing fields to update
  417. :return: Updated workflow or None if not found
  418. """
  419. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  420. workflow = session.scalar(stmt)
  421. if not workflow:
  422. return None
  423. allowed_fields = ["marked_name", "marked_comment"]
  424. for field, value in data.items():
  425. if field in allowed_fields:
  426. setattr(workflow, field, value)
  427. workflow.updated_by = account_id
  428. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  429. return workflow
  430. def delete_workflow(self, *, session: Session, workflow_id: str, tenant_id: str) -> bool:
  431. """
  432. Delete a workflow
  433. :param session: SQLAlchemy database session
  434. :param workflow_id: Workflow ID
  435. :param tenant_id: Tenant ID
  436. :return: True if successful
  437. :raises: ValueError if workflow not found
  438. :raises: WorkflowInUseError if workflow is in use
  439. :raises: DraftWorkflowDeletionError if workflow is a draft version
  440. """
  441. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  442. workflow = session.scalar(stmt)
  443. if not workflow:
  444. raise ValueError(f"Workflow with ID {workflow_id} not found")
  445. # Check if workflow is a draft version
  446. if workflow.version == "draft":
  447. raise DraftWorkflowDeletionError("Cannot delete draft workflow versions")
  448. # Check if this workflow is currently referenced by an app
  449. stmt = select(App).where(App.workflow_id == workflow_id)
  450. app = session.scalar(stmt)
  451. if app:
  452. # Cannot delete a workflow that's currently in use by an app
  453. raise WorkflowInUseError(f"Cannot delete workflow that is currently in use by app '{app.name}'")
  454. # Check if this workflow is published as a tool
  455. if workflow.tool_published:
  456. # Cannot delete a workflow that's published as a tool
  457. raise WorkflowInUseError("Cannot delete workflow that is published as a tool")
  458. session.delete(workflow)
  459. return True