Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

workflow_execution_tasks.py 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """
  2. Celery tasks for asynchronous workflow execution storage operations.
  3. These tasks provide asynchronous storage capabilities for workflow execution data,
  4. improving performance by offloading storage operations to background workers.
  5. """
  6. import json
  7. import logging
  8. from celery import shared_task # type: ignore[import-untyped]
  9. from sqlalchemy import select
  10. from sqlalchemy.orm import sessionmaker
  11. from core.workflow.entities.workflow_execution import WorkflowExecution
  12. from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
  13. from extensions.ext_database import db
  14. from models import CreatorUserRole, WorkflowRun
  15. from models.enums import WorkflowRunTriggeredFrom
  16. logger = logging.getLogger(__name__)
  17. @shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
  18. def save_workflow_execution_task(
  19. self,
  20. execution_data: dict,
  21. tenant_id: str,
  22. app_id: str,
  23. triggered_from: str,
  24. creator_user_id: str,
  25. creator_user_role: str,
  26. ) -> bool:
  27. """
  28. Asynchronously save or update a workflow execution to the database.
  29. Args:
  30. execution_data: Serialized WorkflowExecution data
  31. tenant_id: Tenant ID for multi-tenancy
  32. app_id: Application ID
  33. triggered_from: Source of the execution trigger
  34. creator_user_id: ID of the user who created the execution
  35. creator_user_role: Role of the user who created the execution
  36. Returns:
  37. True if successful, False otherwise
  38. """
  39. try:
  40. # Create a new session for this task
  41. session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
  42. with session_factory() as session:
  43. # Deserialize execution data
  44. execution = WorkflowExecution.model_validate(execution_data)
  45. # Check if workflow run already exists
  46. existing_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == execution.id_))
  47. if existing_run:
  48. # Update existing workflow run
  49. _update_workflow_run_from_execution(existing_run, execution)
  50. logger.debug("Updated existing workflow run: %s", execution.id_)
  51. else:
  52. # Create new workflow run
  53. workflow_run = _create_workflow_run_from_execution(
  54. execution=execution,
  55. tenant_id=tenant_id,
  56. app_id=app_id,
  57. triggered_from=WorkflowRunTriggeredFrom(triggered_from),
  58. creator_user_id=creator_user_id,
  59. creator_user_role=CreatorUserRole(creator_user_role),
  60. )
  61. session.add(workflow_run)
  62. logger.debug("Created new workflow run: %s", execution.id_)
  63. session.commit()
  64. return True
  65. except Exception as e:
  66. logger.exception("Failed to save workflow execution %s", execution_data.get("id_", "unknown"))
  67. # Retry the task with exponential backoff
  68. raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
  69. def _create_workflow_run_from_execution(
  70. execution: WorkflowExecution,
  71. tenant_id: str,
  72. app_id: str,
  73. triggered_from: WorkflowRunTriggeredFrom,
  74. creator_user_id: str,
  75. creator_user_role: CreatorUserRole,
  76. ) -> WorkflowRun:
  77. """
  78. Create a WorkflowRun database model from a WorkflowExecution domain entity.
  79. """
  80. workflow_run = WorkflowRun()
  81. workflow_run.id = execution.id_
  82. workflow_run.tenant_id = tenant_id
  83. workflow_run.app_id = app_id
  84. workflow_run.workflow_id = execution.workflow_id
  85. workflow_run.type = execution.workflow_type.value
  86. workflow_run.triggered_from = triggered_from.value
  87. workflow_run.version = execution.workflow_version
  88. json_converter = WorkflowRuntimeTypeConverter()
  89. workflow_run.graph = json.dumps(json_converter.to_json_encodable(execution.graph))
  90. workflow_run.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs))
  91. workflow_run.status = execution.status.value
  92. workflow_run.outputs = (
  93. json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
  94. )
  95. workflow_run.error = execution.error_message
  96. workflow_run.elapsed_time = execution.elapsed_time
  97. workflow_run.total_tokens = execution.total_tokens
  98. workflow_run.total_steps = execution.total_steps
  99. workflow_run.created_by_role = creator_user_role.value
  100. workflow_run.created_by = creator_user_id
  101. workflow_run.created_at = execution.started_at
  102. workflow_run.finished_at = execution.finished_at
  103. return workflow_run
  104. def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: WorkflowExecution) -> None:
  105. """
  106. Update a WorkflowRun database model from a WorkflowExecution domain entity.
  107. """
  108. json_converter = WorkflowRuntimeTypeConverter()
  109. workflow_run.status = execution.status.value
  110. workflow_run.outputs = (
  111. json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
  112. )
  113. workflow_run.error = execution.error_message
  114. workflow_run.elapsed_time = execution.elapsed_time
  115. workflow_run.total_tokens = execution.total_tokens
  116. workflow_run.total_steps = execution.total_steps
  117. workflow_run.finished_at = execution.finished_at