| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- """
- Celery tasks for asynchronous workflow node execution storage operations.
-
- These tasks provide asynchronous storage capabilities for workflow node execution data,
- improving performance by offloading storage operations to background workers.
- """
-
- import json
- import logging
-
- from celery import shared_task
- from sqlalchemy import select
- from sqlalchemy.orm import sessionmaker
-
- from core.workflow.entities.workflow_node_execution import (
- WorkflowNodeExecution,
- )
- from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
- from extensions.ext_database import db
- from models import CreatorUserRole, WorkflowNodeExecutionModel
- from models.workflow import WorkflowNodeExecutionTriggeredFrom
-
- logger = logging.getLogger(__name__)
-
-
- @shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
- def save_workflow_node_execution_task(
- self,
- execution_data: dict,
- tenant_id: str,
- app_id: str,
- triggered_from: str,
- creator_user_id: str,
- creator_user_role: str,
- ) -> bool:
- """
- Asynchronously save or update a workflow node execution to the database.
-
- Args:
- execution_data: Serialized WorkflowNodeExecution data
- tenant_id: Tenant ID for multi-tenancy
- app_id: Application ID
- triggered_from: Source of the execution trigger
- creator_user_id: ID of the user who created the execution
- creator_user_role: Role of the user who created the execution
-
- Returns:
- True if successful, False otherwise
- """
- try:
- # Create a new session for this task
- session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
-
- with session_factory() as session:
- # Deserialize execution data
- execution = WorkflowNodeExecution.model_validate(execution_data)
-
- # Check if node execution already exists
- existing_execution = session.scalar(
- select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == execution.id)
- )
-
- if existing_execution:
- # Update existing node execution
- _update_node_execution_from_domain(existing_execution, execution)
- logger.debug("Updated existing workflow node execution: %s", execution.id)
- else:
- # Create new node execution
- node_execution = _create_node_execution_from_domain(
- execution=execution,
- tenant_id=tenant_id,
- app_id=app_id,
- triggered_from=WorkflowNodeExecutionTriggeredFrom(triggered_from),
- creator_user_id=creator_user_id,
- creator_user_role=CreatorUserRole(creator_user_role),
- )
- session.add(node_execution)
- logger.debug("Created new workflow node execution: %s", execution.id)
-
- session.commit()
- return True
-
- except Exception as e:
- logger.exception("Failed to save workflow node execution %s", execution_data.get("id", "unknown"))
- # Retry the task with exponential backoff
- raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
-
-
- def _create_node_execution_from_domain(
- execution: WorkflowNodeExecution,
- tenant_id: str,
- app_id: str,
- triggered_from: WorkflowNodeExecutionTriggeredFrom,
- creator_user_id: str,
- creator_user_role: CreatorUserRole,
- ) -> WorkflowNodeExecutionModel:
- """
- Create a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity.
- """
- node_execution = WorkflowNodeExecutionModel()
- node_execution.id = execution.id
- node_execution.tenant_id = tenant_id
- node_execution.app_id = app_id
- node_execution.workflow_id = execution.workflow_id
- node_execution.triggered_from = triggered_from.value
- node_execution.workflow_run_id = execution.workflow_execution_id
- node_execution.index = execution.index
- node_execution.predecessor_node_id = execution.predecessor_node_id
- node_execution.node_id = execution.node_id
- node_execution.node_type = execution.node_type.value
- node_execution.title = execution.title
- node_execution.node_execution_id = execution.node_execution_id
-
- # Serialize complex data as JSON
- json_converter = WorkflowRuntimeTypeConverter()
- node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}"
- node_execution.process_data = (
- json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}"
- )
- node_execution.outputs = (
- json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
- )
- # Convert metadata enum keys to strings for JSON serialization
- if execution.metadata:
- metadata_for_json = {
- key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
- }
- node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
- else:
- node_execution.execution_metadata = "{}"
-
- node_execution.status = execution.status.value
- node_execution.error = execution.error
- node_execution.elapsed_time = execution.elapsed_time
- node_execution.created_by_role = creator_user_role.value
- node_execution.created_by = creator_user_id
- node_execution.created_at = execution.created_at
- node_execution.finished_at = execution.finished_at
-
- return node_execution
-
-
- def _update_node_execution_from_domain(node_execution: WorkflowNodeExecutionModel, execution: WorkflowNodeExecution):
- """
- Update a WorkflowNodeExecutionModel database model from a WorkflowNodeExecution domain entity.
- """
- # Update serialized data
- json_converter = WorkflowRuntimeTypeConverter()
- node_execution.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs)) if execution.inputs else "{}"
- node_execution.process_data = (
- json.dumps(json_converter.to_json_encodable(execution.process_data)) if execution.process_data else "{}"
- )
- node_execution.outputs = (
- json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
- )
- # Convert metadata enum keys to strings for JSON serialization
- if execution.metadata:
- metadata_for_json = {
- key.value if hasattr(key, "value") else str(key): value for key, value in execution.metadata.items()
- }
- node_execution.execution_metadata = json.dumps(json_converter.to_json_encodable(metadata_for_json))
- else:
- node_execution.execution_metadata = "{}"
-
- # Update other fields
- node_execution.status = execution.status.value
- node_execution.error = execution.error
- node_execution.elapsed_time = execution.elapsed_time
- node_execution.finished_at = execution.finished_at
|