| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 | 
							- """
 - Celery tasks for asynchronous workflow execution storage operations.
 - 
 - These tasks provide asynchronous storage capabilities for workflow execution data,
 - improving performance by offloading storage operations to background workers.
 - """
 - 
 - import json
 - import logging
 - 
 - from celery import shared_task
 - from sqlalchemy import select
 - from sqlalchemy.orm import sessionmaker
 - 
 - from core.workflow.entities.workflow_execution import WorkflowExecution
 - from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
 - from extensions.ext_database import db
 - from models import CreatorUserRole, WorkflowRun
 - from models.enums import WorkflowRunTriggeredFrom
 - 
 - logger = logging.getLogger(__name__)
 - 
 - 
 - @shared_task(queue="workflow_storage", bind=True, max_retries=3, default_retry_delay=60)
 - def save_workflow_execution_task(
 -     self,
 -     execution_data: dict,
 -     tenant_id: str,
 -     app_id: str,
 -     triggered_from: str,
 -     creator_user_id: str,
 -     creator_user_role: str,
 - ) -> bool:
 -     """
 -     Asynchronously save or update a workflow execution to the database.
 - 
 -     Args:
 -         execution_data: Serialized WorkflowExecution data
 -         tenant_id: Tenant ID for multi-tenancy
 -         app_id: Application ID
 -         triggered_from: Source of the execution trigger
 -         creator_user_id: ID of the user who created the execution
 -         creator_user_role: Role of the user who created the execution
 - 
 -     Returns:
 -         True if successful, False otherwise
 -     """
 -     try:
 -         # Create a new session for this task
 -         session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
 - 
 -         with session_factory() as session:
 -             # Deserialize execution data
 -             execution = WorkflowExecution.model_validate(execution_data)
 - 
 -             # Check if workflow run already exists
 -             existing_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == execution.id_))
 - 
 -             if existing_run:
 -                 # Update existing workflow run
 -                 _update_workflow_run_from_execution(existing_run, execution)
 -                 logger.debug("Updated existing workflow run: %s", execution.id_)
 -             else:
 -                 # Create new workflow run
 -                 workflow_run = _create_workflow_run_from_execution(
 -                     execution=execution,
 -                     tenant_id=tenant_id,
 -                     app_id=app_id,
 -                     triggered_from=WorkflowRunTriggeredFrom(triggered_from),
 -                     creator_user_id=creator_user_id,
 -                     creator_user_role=CreatorUserRole(creator_user_role),
 -                 )
 -                 session.add(workflow_run)
 -                 logger.debug("Created new workflow run: %s", execution.id_)
 - 
 -             session.commit()
 -             return True
 - 
 -     except Exception as e:
 -         logger.exception("Failed to save workflow execution %s", execution_data.get("id_", "unknown"))
 -         # Retry the task with exponential backoff
 -         raise self.retry(exc=e, countdown=60 * (2**self.request.retries))
 - 
 - 
 - def _create_workflow_run_from_execution(
 -     execution: WorkflowExecution,
 -     tenant_id: str,
 -     app_id: str,
 -     triggered_from: WorkflowRunTriggeredFrom,
 -     creator_user_id: str,
 -     creator_user_role: CreatorUserRole,
 - ) -> WorkflowRun:
 -     """
 -     Create a WorkflowRun database model from a WorkflowExecution domain entity.
 -     """
 -     workflow_run = WorkflowRun()
 -     workflow_run.id = execution.id_
 -     workflow_run.tenant_id = tenant_id
 -     workflow_run.app_id = app_id
 -     workflow_run.workflow_id = execution.workflow_id
 -     workflow_run.type = execution.workflow_type.value
 -     workflow_run.triggered_from = triggered_from.value
 -     workflow_run.version = execution.workflow_version
 -     json_converter = WorkflowRuntimeTypeConverter()
 -     workflow_run.graph = json.dumps(json_converter.to_json_encodable(execution.graph))
 -     workflow_run.inputs = json.dumps(json_converter.to_json_encodable(execution.inputs))
 -     workflow_run.status = execution.status.value
 -     workflow_run.outputs = (
 -         json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
 -     )
 -     workflow_run.error = execution.error_message
 -     workflow_run.elapsed_time = execution.elapsed_time
 -     workflow_run.total_tokens = execution.total_tokens
 -     workflow_run.total_steps = execution.total_steps
 -     workflow_run.created_by_role = creator_user_role.value
 -     workflow_run.created_by = creator_user_id
 -     workflow_run.created_at = execution.started_at
 -     workflow_run.finished_at = execution.finished_at
 - 
 -     return workflow_run
 - 
 - 
 - def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: WorkflowExecution) -> None:
 -     """
 -     Update a WorkflowRun database model from a WorkflowExecution domain entity.
 -     """
 -     json_converter = WorkflowRuntimeTypeConverter()
 -     workflow_run.status = execution.status.value
 -     workflow_run.outputs = (
 -         json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
 -     )
 -     workflow_run.error = execution.error_message
 -     workflow_run.elapsed_time = execution.elapsed_time
 -     workflow_run.total_tokens = execution.total_tokens
 -     workflow_run.total_steps = execution.total_steps
 -     workflow_run.finished_at = execution.finished_at
 
 
  |