選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

workflow.py 31KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. import json
  2. import logging
  3. from collections.abc import Sequence
  4. from typing import cast
  5. from flask import abort, request
  6. from flask_restx import Resource, inputs, marshal_with, reqparse
  7. from sqlalchemy.orm import Session
  8. from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
  9. import services
  10. from configs import dify_config
  11. from controllers.console import api
  12. from controllers.console.app.error import (
  13. ConversationCompletedError,
  14. DraftWorkflowNotExist,
  15. DraftWorkflowNotSync,
  16. )
  17. from controllers.console.app.wraps import get_app_model
  18. from controllers.console.wraps import account_initialization_required, setup_required
  19. from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
  20. from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
  21. from core.app.apps.base_app_queue_manager import AppQueueManager
  22. from core.app.entities.app_invoke_entities import InvokeFrom
  23. from core.file.models import File
  24. from core.helper.trace_id_helper import get_external_trace_id
  25. from core.workflow.graph_engine.manager import GraphEngineManager
  26. from extensions.ext_database import db
  27. from factories import file_factory, variable_factory
  28. from fields.workflow_fields import workflow_fields, workflow_pagination_fields
  29. from fields.workflow_run_fields import workflow_run_node_execution_fields
  30. from libs import helper
  31. from libs.helper import TimestampField, uuid_value
  32. from libs.login import current_user, login_required
  33. from models import App
  34. from models.account import Account
  35. from models.model import AppMode
  36. from models.workflow import Workflow
  37. from services.app_generate_service import AppGenerateService
  38. from services.errors.app import WorkflowHashNotEqualError
  39. from services.errors.llm import InvokeRateLimitError
  40. from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
  41. logger = logging.getLogger(__name__)
  42. # TODO(QuantumGhost): Refactor existing node run API to handle file parameter parsing
  43. # at the controller level rather than in the workflow logic. This would improve separation
  44. # of concerns and make the code more maintainable.
  45. def _parse_file(workflow: Workflow, files: list[dict] | None = None) -> Sequence[File]:
  46. files = files or []
  47. file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
  48. file_objs: Sequence[File] = []
  49. if file_extra_config is None:
  50. return file_objs
  51. file_objs = file_factory.build_from_mappings(
  52. mappings=files,
  53. tenant_id=workflow.tenant_id,
  54. config=file_extra_config,
  55. )
  56. return file_objs
  57. class DraftWorkflowApi(Resource):
  58. @setup_required
  59. @login_required
  60. @account_initialization_required
  61. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  62. @marshal_with(workflow_fields)
  63. def get(self, app_model: App):
  64. """
  65. Get draft workflow
  66. """
  67. # The role of the current user in the ta table must be admin, owner, or editor
  68. assert isinstance(current_user, Account)
  69. if not current_user.is_editor:
  70. raise Forbidden()
  71. # fetch draft workflow by app_model
  72. workflow_service = WorkflowService()
  73. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  74. if not workflow:
  75. raise DraftWorkflowNotExist()
  76. # return workflow, if not found, return None (initiate graph by frontend)
  77. return workflow
  78. @setup_required
  79. @login_required
  80. @account_initialization_required
  81. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  82. def post(self, app_model: App):
  83. """
  84. Sync draft workflow
  85. """
  86. # The role of the current user in the ta table must be admin, owner, or editor
  87. assert isinstance(current_user, Account)
  88. if not current_user.is_editor:
  89. raise Forbidden()
  90. content_type = request.headers.get("Content-Type", "")
  91. if "application/json" in content_type:
  92. parser = reqparse.RequestParser()
  93. parser.add_argument("graph", type=dict, required=True, nullable=False, location="json")
  94. parser.add_argument("features", type=dict, required=True, nullable=False, location="json")
  95. parser.add_argument("hash", type=str, required=False, location="json")
  96. parser.add_argument("environment_variables", type=list, required=True, location="json")
  97. parser.add_argument("conversation_variables", type=list, required=False, location="json")
  98. args = parser.parse_args()
  99. elif "text/plain" in content_type:
  100. try:
  101. data = json.loads(request.data.decode("utf-8"))
  102. if "graph" not in data or "features" not in data:
  103. raise ValueError("graph or features not found in data")
  104. if not isinstance(data.get("graph"), dict) or not isinstance(data.get("features"), dict):
  105. raise ValueError("graph or features is not a dict")
  106. args = {
  107. "graph": data.get("graph"),
  108. "features": data.get("features"),
  109. "hash": data.get("hash"),
  110. "environment_variables": data.get("environment_variables"),
  111. "conversation_variables": data.get("conversation_variables"),
  112. }
  113. except json.JSONDecodeError:
  114. return {"message": "Invalid JSON data"}, 400
  115. else:
  116. abort(415)
  117. if not isinstance(current_user, Account):
  118. raise Forbidden()
  119. workflow_service = WorkflowService()
  120. try:
  121. environment_variables_list = args.get("environment_variables") or []
  122. environment_variables = [
  123. variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
  124. ]
  125. conversation_variables_list = args.get("conversation_variables") or []
  126. conversation_variables = [
  127. variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
  128. ]
  129. workflow = workflow_service.sync_draft_workflow(
  130. app_model=app_model,
  131. graph=args["graph"],
  132. features=args["features"],
  133. unique_hash=args.get("hash"),
  134. account=current_user,
  135. environment_variables=environment_variables,
  136. conversation_variables=conversation_variables,
  137. )
  138. except WorkflowHashNotEqualError:
  139. raise DraftWorkflowNotSync()
  140. return {
  141. "result": "success",
  142. "hash": workflow.unique_hash,
  143. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
  144. }
  145. class AdvancedChatDraftWorkflowRunApi(Resource):
  146. @setup_required
  147. @login_required
  148. @account_initialization_required
  149. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  150. def post(self, app_model: App):
  151. """
  152. Run draft workflow
  153. """
  154. # The role of the current user in the ta table must be admin, owner, or editor
  155. assert isinstance(current_user, Account)
  156. if not current_user.is_editor:
  157. raise Forbidden()
  158. if not isinstance(current_user, Account):
  159. raise Forbidden()
  160. parser = reqparse.RequestParser()
  161. parser.add_argument("inputs", type=dict, location="json")
  162. parser.add_argument("query", type=str, required=True, location="json", default="")
  163. parser.add_argument("files", type=list, location="json")
  164. parser.add_argument("conversation_id", type=uuid_value, location="json")
  165. parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json")
  166. args = parser.parse_args()
  167. external_trace_id = get_external_trace_id(request)
  168. if external_trace_id:
  169. args["external_trace_id"] = external_trace_id
  170. try:
  171. response = AppGenerateService.generate(
  172. app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.DEBUGGER, streaming=True
  173. )
  174. return helper.compact_generate_response(response)
  175. except services.errors.conversation.ConversationNotExistsError:
  176. raise NotFound("Conversation Not Exists.")
  177. except services.errors.conversation.ConversationCompletedError:
  178. raise ConversationCompletedError()
  179. except InvokeRateLimitError as ex:
  180. raise InvokeRateLimitHttpError(ex.description)
  181. except ValueError as e:
  182. raise e
  183. except Exception:
  184. logger.exception("internal server error.")
  185. raise InternalServerError()
  186. class AdvancedChatDraftRunIterationNodeApi(Resource):
  187. @setup_required
  188. @login_required
  189. @account_initialization_required
  190. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  191. def post(self, app_model: App, node_id: str):
  192. """
  193. Run draft workflow iteration node
  194. """
  195. if not isinstance(current_user, Account):
  196. raise Forbidden()
  197. # The role of the current user in the ta table must be admin, owner, or editor
  198. if not current_user.is_editor:
  199. raise Forbidden()
  200. parser = reqparse.RequestParser()
  201. parser.add_argument("inputs", type=dict, location="json")
  202. args = parser.parse_args()
  203. try:
  204. response = AppGenerateService.generate_single_iteration(
  205. app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
  206. )
  207. return helper.compact_generate_response(response)
  208. except services.errors.conversation.ConversationNotExistsError:
  209. raise NotFound("Conversation Not Exists.")
  210. except services.errors.conversation.ConversationCompletedError:
  211. raise ConversationCompletedError()
  212. except ValueError as e:
  213. raise e
  214. except Exception:
  215. logger.exception("internal server error.")
  216. raise InternalServerError()
  217. class WorkflowDraftRunIterationNodeApi(Resource):
  218. @setup_required
  219. @login_required
  220. @account_initialization_required
  221. @get_app_model(mode=[AppMode.WORKFLOW])
  222. def post(self, app_model: App, node_id: str):
  223. """
  224. Run draft workflow iteration node
  225. """
  226. # The role of the current user in the ta table must be admin, owner, or editor
  227. if not isinstance(current_user, Account):
  228. raise Forbidden()
  229. if not current_user.is_editor:
  230. raise Forbidden()
  231. parser = reqparse.RequestParser()
  232. parser.add_argument("inputs", type=dict, location="json")
  233. args = parser.parse_args()
  234. try:
  235. response = AppGenerateService.generate_single_iteration(
  236. app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
  237. )
  238. return helper.compact_generate_response(response)
  239. except services.errors.conversation.ConversationNotExistsError:
  240. raise NotFound("Conversation Not Exists.")
  241. except services.errors.conversation.ConversationCompletedError:
  242. raise ConversationCompletedError()
  243. except ValueError as e:
  244. raise e
  245. except Exception:
  246. logger.exception("internal server error.")
  247. raise InternalServerError()
  248. class AdvancedChatDraftRunLoopNodeApi(Resource):
  249. @setup_required
  250. @login_required
  251. @account_initialization_required
  252. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  253. def post(self, app_model: App, node_id: str):
  254. """
  255. Run draft workflow loop node
  256. """
  257. if not isinstance(current_user, Account):
  258. raise Forbidden()
  259. # The role of the current user in the ta table must be admin, owner, or editor
  260. if not current_user.is_editor:
  261. raise Forbidden()
  262. parser = reqparse.RequestParser()
  263. parser.add_argument("inputs", type=dict, location="json")
  264. args = parser.parse_args()
  265. try:
  266. response = AppGenerateService.generate_single_loop(
  267. app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
  268. )
  269. return helper.compact_generate_response(response)
  270. except services.errors.conversation.ConversationNotExistsError:
  271. raise NotFound("Conversation Not Exists.")
  272. except services.errors.conversation.ConversationCompletedError:
  273. raise ConversationCompletedError()
  274. except ValueError as e:
  275. raise e
  276. except Exception:
  277. logger.exception("internal server error.")
  278. raise InternalServerError()
  279. class WorkflowDraftRunLoopNodeApi(Resource):
  280. @setup_required
  281. @login_required
  282. @account_initialization_required
  283. @get_app_model(mode=[AppMode.WORKFLOW])
  284. def post(self, app_model: App, node_id: str):
  285. """
  286. Run draft workflow loop node
  287. """
  288. if not isinstance(current_user, Account):
  289. raise Forbidden()
  290. # The role of the current user in the ta table must be admin, owner, or editor
  291. if not current_user.is_editor:
  292. raise Forbidden()
  293. parser = reqparse.RequestParser()
  294. parser.add_argument("inputs", type=dict, location="json")
  295. args = parser.parse_args()
  296. try:
  297. response = AppGenerateService.generate_single_loop(
  298. app_model=app_model, user=current_user, node_id=node_id, args=args, streaming=True
  299. )
  300. return helper.compact_generate_response(response)
  301. except services.errors.conversation.ConversationNotExistsError:
  302. raise NotFound("Conversation Not Exists.")
  303. except services.errors.conversation.ConversationCompletedError:
  304. raise ConversationCompletedError()
  305. except ValueError as e:
  306. raise e
  307. except Exception:
  308. logger.exception("internal server error.")
  309. raise InternalServerError()
  310. class DraftWorkflowRunApi(Resource):
  311. @setup_required
  312. @login_required
  313. @account_initialization_required
  314. @get_app_model(mode=[AppMode.WORKFLOW])
  315. def post(self, app_model: App):
  316. """
  317. Run draft workflow
  318. """
  319. if not isinstance(current_user, Account):
  320. raise Forbidden()
  321. # The role of the current user in the ta table must be admin, owner, or editor
  322. if not current_user.is_editor:
  323. raise Forbidden()
  324. parser = reqparse.RequestParser()
  325. parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
  326. parser.add_argument("files", type=list, required=False, location="json")
  327. args = parser.parse_args()
  328. external_trace_id = get_external_trace_id(request)
  329. if external_trace_id:
  330. args["external_trace_id"] = external_trace_id
  331. try:
  332. response = AppGenerateService.generate(
  333. app_model=app_model,
  334. user=current_user,
  335. args=args,
  336. invoke_from=InvokeFrom.DEBUGGER,
  337. streaming=True,
  338. )
  339. return helper.compact_generate_response(response)
  340. except InvokeRateLimitError as ex:
  341. raise InvokeRateLimitHttpError(ex.description)
  342. class WorkflowTaskStopApi(Resource):
  343. @setup_required
  344. @login_required
  345. @account_initialization_required
  346. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  347. def post(self, app_model: App, task_id: str):
  348. """
  349. Stop workflow task
  350. """
  351. if not isinstance(current_user, Account):
  352. raise Forbidden()
  353. # The role of the current user in the ta table must be admin, owner, or editor
  354. if not current_user.is_editor:
  355. raise Forbidden()
  356. # Stop using both mechanisms for backward compatibility
  357. # Legacy stop flag mechanism (without user check)
  358. AppQueueManager.set_stop_flag_no_user_check(task_id)
  359. # New graph engine command channel mechanism
  360. GraphEngineManager.send_stop_command(task_id)
  361. return {"result": "success"}
  362. class DraftWorkflowNodeRunApi(Resource):
  363. @setup_required
  364. @login_required
  365. @account_initialization_required
  366. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  367. @marshal_with(workflow_run_node_execution_fields)
  368. def post(self, app_model: App, node_id: str):
  369. """
  370. Run draft workflow node
  371. """
  372. if not isinstance(current_user, Account):
  373. raise Forbidden()
  374. # The role of the current user in the ta table must be admin, owner, or editor
  375. if not current_user.is_editor:
  376. raise Forbidden()
  377. parser = reqparse.RequestParser()
  378. parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
  379. parser.add_argument("query", type=str, required=False, location="json", default="")
  380. parser.add_argument("files", type=list, location="json", default=[])
  381. args = parser.parse_args()
  382. user_inputs = args.get("inputs")
  383. if user_inputs is None:
  384. raise ValueError("missing inputs")
  385. workflow_srv = WorkflowService()
  386. # fetch draft workflow by app_model
  387. draft_workflow = workflow_srv.get_draft_workflow(app_model=app_model)
  388. if not draft_workflow:
  389. raise ValueError("Workflow not initialized")
  390. files = _parse_file(draft_workflow, args.get("files"))
  391. workflow_service = WorkflowService()
  392. workflow_node_execution = workflow_service.run_draft_workflow_node(
  393. app_model=app_model,
  394. draft_workflow=draft_workflow,
  395. node_id=node_id,
  396. user_inputs=user_inputs,
  397. account=current_user,
  398. query=args.get("query", ""),
  399. files=files,
  400. )
  401. return workflow_node_execution
  402. class PublishedWorkflowApi(Resource):
  403. @setup_required
  404. @login_required
  405. @account_initialization_required
  406. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  407. @marshal_with(workflow_fields)
  408. def get(self, app_model: App):
  409. """
  410. Get published workflow
  411. """
  412. if not isinstance(current_user, Account):
  413. raise Forbidden()
  414. # The role of the current user in the ta table must be admin, owner, or editor
  415. if not current_user.is_editor:
  416. raise Forbidden()
  417. # fetch published workflow by app_model
  418. workflow_service = WorkflowService()
  419. workflow = workflow_service.get_published_workflow(app_model=app_model)
  420. # return workflow, if not found, return None
  421. return workflow
  422. @setup_required
  423. @login_required
  424. @account_initialization_required
  425. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  426. def post(self, app_model: App):
  427. """
  428. Publish workflow
  429. """
  430. if not isinstance(current_user, Account):
  431. raise Forbidden()
  432. # The role of the current user in the ta table must be admin, owner, or editor
  433. if not current_user.is_editor:
  434. raise Forbidden()
  435. parser = reqparse.RequestParser()
  436. parser.add_argument("marked_name", type=str, required=False, default="", location="json")
  437. parser.add_argument("marked_comment", type=str, required=False, default="", location="json")
  438. args = parser.parse_args()
  439. # Validate name and comment length
  440. if args.marked_name and len(args.marked_name) > 20:
  441. raise ValueError("Marked name cannot exceed 20 characters")
  442. if args.marked_comment and len(args.marked_comment) > 100:
  443. raise ValueError("Marked comment cannot exceed 100 characters")
  444. workflow_service = WorkflowService()
  445. with Session(db.engine) as session:
  446. workflow = workflow_service.publish_workflow(
  447. session=session,
  448. app_model=app_model,
  449. account=current_user,
  450. marked_name=args.marked_name or "",
  451. marked_comment=args.marked_comment or "",
  452. )
  453. app_model.workflow_id = workflow.id
  454. db.session.commit()
  455. workflow_created_at = TimestampField().format(workflow.created_at)
  456. session.commit()
  457. return {
  458. "result": "success",
  459. "created_at": workflow_created_at,
  460. }
  461. class DefaultBlockConfigsApi(Resource):
  462. @setup_required
  463. @login_required
  464. @account_initialization_required
  465. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  466. def get(self, app_model: App):
  467. """
  468. Get default block config
  469. """
  470. if not isinstance(current_user, Account):
  471. raise Forbidden()
  472. # The role of the current user in the ta table must be admin, owner, or editor
  473. if not current_user.is_editor:
  474. raise Forbidden()
  475. # Get default block configs
  476. workflow_service = WorkflowService()
  477. return workflow_service.get_default_block_configs()
  478. class DefaultBlockConfigApi(Resource):
  479. @setup_required
  480. @login_required
  481. @account_initialization_required
  482. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  483. def get(self, app_model: App, block_type: str):
  484. """
  485. Get default block config
  486. """
  487. if not isinstance(current_user, Account):
  488. raise Forbidden()
  489. # The role of the current user in the ta table must be admin, owner, or editor
  490. if not current_user.is_editor:
  491. raise Forbidden()
  492. parser = reqparse.RequestParser()
  493. parser.add_argument("q", type=str, location="args")
  494. args = parser.parse_args()
  495. q = args.get("q")
  496. filters = None
  497. if q:
  498. try:
  499. filters = json.loads(args.get("q", ""))
  500. except json.JSONDecodeError:
  501. raise ValueError("Invalid filters")
  502. # Get default block configs
  503. workflow_service = WorkflowService()
  504. return workflow_service.get_default_block_config(node_type=block_type, filters=filters)
  505. class ConvertToWorkflowApi(Resource):
  506. @setup_required
  507. @login_required
  508. @account_initialization_required
  509. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  510. def post(self, app_model: App):
  511. """
  512. Convert basic mode of chatbot app to workflow mode
  513. Convert expert mode of chatbot app to workflow mode
  514. Convert Completion App to Workflow App
  515. """
  516. if not isinstance(current_user, Account):
  517. raise Forbidden()
  518. # The role of the current user in the ta table must be admin, owner, or editor
  519. if not current_user.is_editor:
  520. raise Forbidden()
  521. if request.data:
  522. parser = reqparse.RequestParser()
  523. parser.add_argument("name", type=str, required=False, nullable=True, location="json")
  524. parser.add_argument("icon_type", type=str, required=False, nullable=True, location="json")
  525. parser.add_argument("icon", type=str, required=False, nullable=True, location="json")
  526. parser.add_argument("icon_background", type=str, required=False, nullable=True, location="json")
  527. args = parser.parse_args()
  528. else:
  529. args = {}
  530. # convert to workflow mode
  531. workflow_service = WorkflowService()
  532. new_app_model = workflow_service.convert_to_workflow(app_model=app_model, account=current_user, args=args)
  533. # return app id
  534. return {
  535. "new_app_id": new_app_model.id,
  536. }
  537. class WorkflowConfigApi(Resource):
  538. """Resource for workflow configuration."""
  539. @setup_required
  540. @login_required
  541. @account_initialization_required
  542. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  543. def get(self, app_model: App):
  544. return {
  545. "parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
  546. }
  547. class PublishedAllWorkflowApi(Resource):
  548. @setup_required
  549. @login_required
  550. @account_initialization_required
  551. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  552. @marshal_with(workflow_pagination_fields)
  553. def get(self, app_model: App):
  554. """
  555. Get published workflows
  556. """
  557. if not isinstance(current_user, Account):
  558. raise Forbidden()
  559. if not current_user.is_editor:
  560. raise Forbidden()
  561. parser = reqparse.RequestParser()
  562. parser.add_argument("page", type=inputs.int_range(1, 99999), required=False, default=1, location="args")
  563. parser.add_argument("limit", type=inputs.int_range(1, 100), required=False, default=20, location="args")
  564. parser.add_argument("user_id", type=str, required=False, location="args")
  565. parser.add_argument("named_only", type=inputs.boolean, required=False, default=False, location="args")
  566. args = parser.parse_args()
  567. page = int(args.get("page", 1))
  568. limit = int(args.get("limit", 10))
  569. user_id = args.get("user_id")
  570. named_only = args.get("named_only", False)
  571. if user_id:
  572. if user_id != current_user.id:
  573. raise Forbidden()
  574. user_id = cast(str, user_id)
  575. workflow_service = WorkflowService()
  576. with Session(db.engine) as session:
  577. workflows, has_more = workflow_service.get_all_published_workflow(
  578. session=session,
  579. app_model=app_model,
  580. page=page,
  581. limit=limit,
  582. user_id=user_id,
  583. named_only=named_only,
  584. )
  585. return {
  586. "items": workflows,
  587. "page": page,
  588. "limit": limit,
  589. "has_more": has_more,
  590. }
  591. class WorkflowByIdApi(Resource):
  592. @setup_required
  593. @login_required
  594. @account_initialization_required
  595. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  596. @marshal_with(workflow_fields)
  597. def patch(self, app_model: App, workflow_id: str):
  598. """
  599. Update workflow attributes
  600. """
  601. if not isinstance(current_user, Account):
  602. raise Forbidden()
  603. # Check permission
  604. if not current_user.is_editor:
  605. raise Forbidden()
  606. parser = reqparse.RequestParser()
  607. parser.add_argument("marked_name", type=str, required=False, location="json")
  608. parser.add_argument("marked_comment", type=str, required=False, location="json")
  609. args = parser.parse_args()
  610. # Validate name and comment length
  611. if args.marked_name and len(args.marked_name) > 20:
  612. raise ValueError("Marked name cannot exceed 20 characters")
  613. if args.marked_comment and len(args.marked_comment) > 100:
  614. raise ValueError("Marked comment cannot exceed 100 characters")
  615. args = parser.parse_args()
  616. # Prepare update data
  617. update_data = {}
  618. if args.get("marked_name") is not None:
  619. update_data["marked_name"] = args["marked_name"]
  620. if args.get("marked_comment") is not None:
  621. update_data["marked_comment"] = args["marked_comment"]
  622. if not update_data:
  623. return {"message": "No valid fields to update"}, 400
  624. workflow_service = WorkflowService()
  625. # Create a session and manage the transaction
  626. with Session(db.engine, expire_on_commit=False) as session:
  627. workflow = workflow_service.update_workflow(
  628. session=session,
  629. workflow_id=workflow_id,
  630. tenant_id=app_model.tenant_id,
  631. account_id=current_user.id,
  632. data=update_data,
  633. )
  634. if not workflow:
  635. raise NotFound("Workflow not found")
  636. # Commit the transaction in the controller
  637. session.commit()
  638. return workflow
  639. @setup_required
  640. @login_required
  641. @account_initialization_required
  642. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  643. def delete(self, app_model: App, workflow_id: str):
  644. """
  645. Delete workflow
  646. """
  647. if not isinstance(current_user, Account):
  648. raise Forbidden()
  649. # Check permission
  650. if not current_user.is_editor:
  651. raise Forbidden()
  652. workflow_service = WorkflowService()
  653. # Create a session and manage the transaction
  654. with Session(db.engine) as session:
  655. try:
  656. workflow_service.delete_workflow(
  657. session=session, workflow_id=workflow_id, tenant_id=app_model.tenant_id
  658. )
  659. # Commit the transaction in the controller
  660. session.commit()
  661. except WorkflowInUseError as e:
  662. abort(400, description=str(e))
  663. except DraftWorkflowDeletionError as e:
  664. abort(400, description=str(e))
  665. except ValueError as e:
  666. raise NotFound(str(e))
  667. return None, 204
  668. class DraftWorkflowNodeLastRunApi(Resource):
  669. @setup_required
  670. @login_required
  671. @account_initialization_required
  672. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  673. @marshal_with(workflow_run_node_execution_fields)
  674. def get(self, app_model: App, node_id: str):
  675. srv = WorkflowService()
  676. workflow = srv.get_draft_workflow(app_model)
  677. if not workflow:
  678. raise NotFound("Workflow not found")
  679. node_exec = srv.get_node_last_run(
  680. app_model=app_model,
  681. workflow=workflow,
  682. node_id=node_id,
  683. )
  684. if node_exec is None:
  685. raise NotFound("last run not found")
  686. return node_exec
  687. api.add_resource(
  688. DraftWorkflowApi,
  689. "/apps/<uuid:app_id>/workflows/draft",
  690. )
  691. api.add_resource(
  692. WorkflowConfigApi,
  693. "/apps/<uuid:app_id>/workflows/draft/config",
  694. )
  695. api.add_resource(
  696. AdvancedChatDraftWorkflowRunApi,
  697. "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run",
  698. )
  699. api.add_resource(
  700. DraftWorkflowRunApi,
  701. "/apps/<uuid:app_id>/workflows/draft/run",
  702. )
  703. api.add_resource(
  704. WorkflowTaskStopApi,
  705. "/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop",
  706. )
  707. api.add_resource(
  708. DraftWorkflowNodeRunApi,
  709. "/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run",
  710. )
  711. api.add_resource(
  712. AdvancedChatDraftRunIterationNodeApi,
  713. "/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run",
  714. )
  715. api.add_resource(
  716. WorkflowDraftRunIterationNodeApi,
  717. "/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run",
  718. )
  719. api.add_resource(
  720. AdvancedChatDraftRunLoopNodeApi,
  721. "/apps/<uuid:app_id>/advanced-chat/workflows/draft/loop/nodes/<string:node_id>/run",
  722. )
  723. api.add_resource(
  724. WorkflowDraftRunLoopNodeApi,
  725. "/apps/<uuid:app_id>/workflows/draft/loop/nodes/<string:node_id>/run",
  726. )
  727. api.add_resource(
  728. PublishedWorkflowApi,
  729. "/apps/<uuid:app_id>/workflows/publish",
  730. )
  731. api.add_resource(
  732. PublishedAllWorkflowApi,
  733. "/apps/<uuid:app_id>/workflows",
  734. )
  735. api.add_resource(
  736. DefaultBlockConfigsApi,
  737. "/apps/<uuid:app_id>/workflows/default-workflow-block-configs",
  738. )
  739. api.add_resource(
  740. DefaultBlockConfigApi,
  741. "/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>",
  742. )
  743. api.add_resource(
  744. ConvertToWorkflowApi,
  745. "/apps/<uuid:app_id>/convert-to-workflow",
  746. )
  747. api.add_resource(
  748. WorkflowByIdApi,
  749. "/apps/<uuid:app_id>/workflows/<string:workflow_id>",
  750. )
  751. api.add_resource(
  752. DraftWorkflowNodeLastRunApi,
  753. "/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/last-run",
  754. )