You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workflow_run_service.py 4.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import threading
  2. from collections.abc import Sequence
  3. from typing import Optional
  4. from sqlalchemy.orm import sessionmaker
  5. import contexts
  6. from extensions.ext_database import db
  7. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  8. from models import (
  9. Account,
  10. App,
  11. EndUser,
  12. WorkflowNodeExecutionModel,
  13. WorkflowRun,
  14. WorkflowRunTriggeredFrom,
  15. )
  16. from repositories.factory import DifyAPIRepositoryFactory
  17. class WorkflowRunService:
  18. def __init__(self):
  19. """Initialize WorkflowRunService with repository dependencies."""
  20. session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
  21. self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
  22. session_maker
  23. )
  24. self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
  25. def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  26. """
  27. Get advanced chat app workflow run list
  28. Only return triggered_from == advanced_chat
  29. :param app_model: app model
  30. :param args: request args
  31. """
  32. class WorkflowWithMessage:
  33. message_id: str
  34. conversation_id: str
  35. def __init__(self, workflow_run: WorkflowRun):
  36. self._workflow_run = workflow_run
  37. def __getattr__(self, item):
  38. return getattr(self._workflow_run, item)
  39. pagination = self.get_paginate_workflow_runs(app_model, args)
  40. with_message_workflow_runs = []
  41. for workflow_run in pagination.data:
  42. message = workflow_run.message
  43. with_message_workflow_run = WorkflowWithMessage(workflow_run=workflow_run)
  44. if message:
  45. with_message_workflow_run.message_id = message.id
  46. with_message_workflow_run.conversation_id = message.conversation_id
  47. with_message_workflow_runs.append(with_message_workflow_run)
  48. pagination.data = with_message_workflow_runs
  49. return pagination
  50. def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  51. """
  52. Get debug workflow run list
  53. Only return triggered_from == debugging
  54. :param app_model: app model
  55. :param args: request args
  56. """
  57. limit = int(args.get("limit", 20))
  58. last_id = args.get("last_id")
  59. return self._workflow_run_repo.get_paginated_workflow_runs(
  60. tenant_id=app_model.tenant_id,
  61. app_id=app_model.id,
  62. triggered_from=WorkflowRunTriggeredFrom.DEBUGGING.value,
  63. limit=limit,
  64. last_id=last_id,
  65. )
  66. def get_workflow_run(self, app_model: App, run_id: str) -> Optional[WorkflowRun]:
  67. """
  68. Get workflow run detail
  69. :param app_model: app model
  70. :param run_id: workflow run id
  71. """
  72. return self._workflow_run_repo.get_workflow_run_by_id(
  73. tenant_id=app_model.tenant_id,
  74. app_id=app_model.id,
  75. run_id=run_id,
  76. )
  77. def get_workflow_run_node_executions(
  78. self,
  79. app_model: App,
  80. run_id: str,
  81. user: Account | EndUser,
  82. ) -> Sequence[WorkflowNodeExecutionModel]:
  83. """
  84. Get workflow run node execution list
  85. """
  86. workflow_run = self.get_workflow_run(app_model, run_id)
  87. contexts.plugin_tool_providers.set({})
  88. contexts.plugin_tool_providers_lock.set(threading.Lock())
  89. if not workflow_run:
  90. return []
  91. # Get tenant_id from user
  92. tenant_id = user.tenant_id if isinstance(user, EndUser) else user.current_tenant_id
  93. if tenant_id is None:
  94. raise ValueError("User tenant_id cannot be None")
  95. return self._node_execution_service_repo.get_executions_by_workflow_run(
  96. tenant_id=tenant_id,
  97. app_id=app_model.id,
  98. workflow_run_id=run_id,
  99. )