You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workflow_node_execution_tasks.py 6.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. """
  2. Celery tasks for asynchronous workflow node execution storage operations.
  3. These tasks provide asynchronous storage capabilities for workflow node execution data,
  4. improving performance by offloading storage operations to background workers.
  5. """
  6. import json
  7. import logging
  8. from celery import shared_task
  9. from sqlalchemy import select
  10. from sqlalchemy.orm import sessionmaker
  11. from core.workflow.entities.workflow_node_execution import (
  12. WorkflowNodeExecution,
  13. )
  14. from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
  15. from extensions.ext_database import db
  16. from models import CreatorUserRole, WorkflowNodeExecutionModel
  17. from models.workflow import WorkflowNodeExecutionTriggeredFrom
  18. logger = logging.getLogger(__name__)
  19. @shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
  20. def save_workflow_node_execution_task(
  21. self,
  22. execution_data: dict,
  23. tenant_id: str,
  24. app_id: str,
  25. triggered_from: str,
  26. creator_user_id: str,
  27. creator_user_role: str,
  28. ) -> bool:
  29. """
  30. Asynchronously save or update a workflow node execution to the database.
  31. Args:
  32. execution_data: Serialized WorkflowNodeExecution data
  33. tenant_id: Tenant ID for multi-tenancy
  34. app_id: Application ID
  35. triggered_from: Source of the execution trigger
  36. creator_user_id: ID of the user who created the execution
  37. creator_user_role: Role of the user who created the execution
  38. Returns:
  39. True if successful, False otherwise
  40. """
  41. try:
  42. # Create a new session for this task
  43. session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
  44. with session_factory() as session:
  45. # Deserialize execution data
  46. execution = WorkflowNodeExecution.model_validate(execution_data)
  47. # Check if node execution already exists
  48. existing_execution = session.scalar(
  49. select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id)
  50. )
  51. if existing_execution:
  52. # Update existing node execution
  53. _update_node_execution_from_domain(existing_execution, execution)
  54. logger.debug("Updated existing workflow node execution: %s", execution.id)
  55. else:
  56. # Create new node execution
  57. node_execution = _create_node_execution_from_domain(
  58. execution=execution,
  59. tenant_id=tenant_id,
  60. app_id=app_id,
  61. triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from),
  62. creator_user_id=creator_user_id,
  63. creator_user_role=CreatorUserRole(creator_user_role),
  64. )
  65. session.add(node_execution)
  66. logger.debug("Created new workflow node execution: %s", execution.id)
  67. session.commit()
  68. return True
  69. except Exception as e:
  70. logger.exception("Failed to save workflow node execution %s", execution_data.get("id", "unknown"))
  71. # Retry the task with exponential backoff
  72. raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
  73. def _create_node_execution_from_domain(
  74. execution: WorkflowNodeExecution,
  75. tenant_id: str,
  76. app_id: str,
  77. triggered_from: WorkflowNodeExecutionTriggeredFrom,
  78. creator_user_id: str,
  79. creator_user_role: CreatorUserRole,
  80. ) -> WorkflowNodeExecutionModel:
  81. """
  82. Create a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity.
  83. """
  84. node_execution = WorkflowNodeExecutionModel()
  85. node_execution.id = execution.id
  86. node_execution.tenant_id = tenant_id
  87. node_execution.app_id = app_id
  88. node_execution.workflow_id = execution.workflow_id
  89. node_execution.triggered_from = triggered_from.value
  90. node_execution.workflow_run_id = execution.workflow_execution_id
  91. node_execution.index = execution.index
  92. node_execution.predecessor_node_id = execution.predecessor_node_id
  93. node_execution.node_id = execution.node_id
  94. node_execution.node_type = execution.node_type.value
  95. node_execution.title = execution.title
  96. node_execution.node_execution_id = execution.node_execution_id
  97. # Serialize complex data as JSON
  98. json_converter = WorkflowRuntimeTypeConverter()
  99. node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}"
  100. node_execution.process_data = (
  101. json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}"
  102. )
  103. node_execution.outputs = (
  104. json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
  105. )
  106. # Convert metadata enum keys to strings for JSON serialization
  107. if execution.metadata:
  108. metadata_for_json = {
  109. key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
  110. }
  111. node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
  112. else:
  113. node_execution.execution_metadata = "{}"
  114. node_execution.status = execution.status.value
  115. node_execution.error = execution.error
  116. node_execution.elapsed_time = execution.elapsed_time
  117. node_execution.created_by_role = creator_user_role.value
  118. node_execution.created_by = creator_user_id
  119. node_execution.created_at = execution.created_at
  120. node_execution.finished_at = execution.finished_at
  121. return node_execution
  122. def _update_node_execution_from_domain(
  123. node_execution: WorkflowNodeExecutionModel, execution: WorkflowNodeExecution
  124. ) -> None:
  125. """
  126. Update a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity.
  127. """
  128. # Update serialized data
  129. json_converter = WorkflowRuntimeTypeConverter()
  130. node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}"
  131. node_execution.process_data = (
  132. json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}"
  133. )
  134. node_execution.outputs = (
  135. json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
  136. )
  137. # Convert metadata enum keys to strings for JSON serialization
  138. if execution.metadata:
  139. metadata_for_json = {
  140. key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
  141. }
  142. node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
  143. else:
  144. node_execution.execution_metadata = "{}"
  145. # Update other fields
  146. node_execution.status = execution.status.value
  147. node_execution.error = execution.error
  148. node_execution.elapsed_time = execution.elapsed_time
  149. node_execution.finished_at = execution.finished_at