Quellcode durchsuchen

feat(api): add WorkflowNodeExecutionOffload model

tags/2.0.0-beta.1
QuantumGhost vor 2 Monaten
Ursprung
Commit
2fd337e610

+ 72
- 0
api/migrations/versions/2025_08_21_1559-b45e25c2d166_add_workflownodeexecutionoffload.py Datei anzeigen

@@ -0,0 +1,72 @@
"""add WorkflowNodeExecutionOffload

Revision ID: b45e25c2d166
Revises: 76db8b6ed8f1
Create Date: 2025-08-21 15:59:00.329004

"""

from alembic import op
import models as models
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "b45e25c2d166"
down_revision = "76db8b6ed8f1"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"workflow_node_execution_offload",
sa.Column(
"id",
models.types.StringUUID(),
server_default=sa.text("uuidv7()"),
nullable=False,
),
sa.Column(
"created_at",
sa.DateTime(),
server_default=sa.text("CURRENT_TIMESTAMP"),
nullable=False,
),
sa.Column(
"tenant_id",
models.types.StringUUID(),
nullable=False,
),
sa.Column(
"app_id",
models.types.StringUUID(),
nullable=False,
),
sa.Column(
"node_execution_id",
models.types.StringUUID(),
nullable=True,
),
sa.Column(
"type",
sa.String(20),
nullable=False,
),
sa.Column(
"file_id",
models.types.StringUUID(),
nullable=False,
),
sa.PrimaryKeyConstraint("id", name=op.f("workflow_node_execution_offload_pkey")),
sa.UniqueConstraint(
"node_execution_id",
"type",
name=op.f("workflow_node_execution_offload_node_execution_id_key"),
postgresql_nulls_not_distinct=False,
),
)


def downgrade():
op.drop_table("workflow_node_execution_offload")

+ 2
- 0
api/models/__init__.py Datei anzeigen

@@ -86,6 +86,7 @@ from .workflow import (
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowNodeExecutionModel,
WorkflowNodeExecutionOffload,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowType,
@@ -172,6 +173,7 @@ __all__ = [
"WorkflowAppLog",
"WorkflowAppLogCreatedFrom",
"WorkflowNodeExecutionModel",
"WorkflowNodeExecutionOffload",
"WorkflowNodeExecutionTriggeredFrom",
"WorkflowRun",
"WorkflowRunTriggeredFrom",

+ 6
- 0
api/models/enums.py Datei anzeigen

@@ -30,3 +30,9 @@ class MessageStatus(StrEnum):

NORMAL = "normal"
ERROR = "error"


class ExecutionOffLoadType(StrEnum):
INPUTS = "inputs"
PROCESS_DATA = "process_data"
OUTPUTS = "outputs"

+ 145
- 3
api/models/workflow.py Datei anzeigen

@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Any, Optional, Union
from uuid import uuid4

import sqlalchemy as sa
from sqlalchemy import DateTime, exists, orm, select
from sqlalchemy import DateTime, Select, exists, orm, select

from core.file.constants import maybe_file_object
from core.file.models import File
@@ -15,6 +15,7 @@ from core.variables import utils as variable_utils
from core.variables.variables import FloatVariable, IntegerVariable, StringVariable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from core.workflow.nodes.enums import NodeType
from extensions.ext_storage import Storage
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
@@ -36,7 +37,7 @@ from libs import helper
from .account import Account
from .base import Base
from .engine import db
from .enums import CreatorUserRole, DraftVariableType
from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType
from .types import EnumText, StringUUID

logger = logging.getLogger(__name__)
@@ -612,7 +613,7 @@ class WorkflowNodeExecutionTriggeredFrom(StrEnum):
WORKFLOW_RUN = "workflow-run"


class WorkflowNodeExecutionModel(Base):
class WorkflowNodeExecutionModel(Base): # This model is expected to have `offload_data` preloaded in most cases.
"""
Workflow Node Execution

@@ -728,6 +729,32 @@ class WorkflowNodeExecutionModel(Base):
created_by: Mapped[str] = mapped_column(StringUUID)
finished_at: Mapped[Optional[datetime]] = mapped_column(DateTime)

offload_data: Mapped[list["WorkflowNodeExecutionOffload"]] = orm.relationship(
"WorkflowNodeExecutionOffload",
primaryjoin="WorkflowNodeExecutionModel.id == foreign(WorkflowNodeExecutionOffload.node_execution_id)",
uselist=True,
lazy="raise",
back_populates="execution",
)

@staticmethod
def preload_offload_data(
query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"],
):
return query.options(orm.selectinload(WorkflowNodeExecutionModel.offload_data))

@staticmethod
def preload_offload_data_and_files(
query: Select[tuple["WorkflowNodeExecutionModel"]] | orm.Query["WorkflowNodeExecutionModel"],
):
return query.options(
orm.selectinload(WorkflowNodeExecutionModel.offload_data).options(
# Using `joinedload` instead of `selectinload` to minimize database roundtrips,
# as `selectinload` would require separate queries for `inputs_file` and `outputs_file`.
orm.selectinload(WorkflowNodeExecutionOffload.file),
)
)

@property
def created_by_account(self):
created_by_role = CreatorUserRole(self.created_by_role)
@@ -779,6 +806,121 @@ class WorkflowNodeExecutionModel(Base):

return extras

def _get_offload_by_type(self, type_: ExecutionOffLoadType) -> Optional["WorkflowNodeExecutionOffload"]:
return next(iter([i for i in self.offload_data if i.type_ == type_]), None)

@property
def inputs_truncated(self) -> bool:
"""Check if inputs were truncated (offloaded to external storage)."""
return self._get_offload_by_type(ExecutionOffLoadType.INPUTS) is not None

@property
def outputs_truncated(self) -> bool:
"""Check if outputs were truncated (offloaded to external storage)."""
return self._get_offload_by_type(ExecutionOffLoadType.OUTPUTS) is not None

@property
def process_data_truncated(self) -> bool:
"""Check if process_data were truncated (offloaded to external storage)."""
return self._get_offload_by_type(ExecutionOffLoadType.PROCESS_DATA) is not None

@staticmethod
def _load_full_content(session: orm.Session, file_id: str, storage: Storage):
from .model import UploadFile

stmt = sa.select(UploadFile).where(UploadFile.id == file_id)
file = session.scalars(stmt).first()
assert file is not None, f"UploadFile with id {file_id} should exist but not"
content = storage.load(file.key)
return json.loads(content)

def load_full_inputs(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None:
offload = self._get_offload_by_type(ExecutionOffLoadType.INPUTS)
if offload is None:
return self.inputs_dict

return self._load_full_content(session, offload.file_id, storage)

def load_full_outputs(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None:
offload: WorkflowNodeExecutionOffload | None = self._get_offload_by_type(ExecutionOffLoadType.OUTPUTS)
if offload is None:
return self.outputs_dict

return self._load_full_content(session, offload.file_id, storage)

def load_full_process_data(self, session: orm.Session, storage: Storage) -> Mapping[str, Any] | None:
offload: WorkflowNodeExecutionOffload | None = self._get_offload_by_type(ExecutionOffLoadType.PROCESS_DATA)
if offload is None:
return self.process_data_dict

return self._load_full_content(session, offload.file_id, storage)


class WorkflowNodeExecutionOffload(Base):
__tablename__ = "workflow_node_execution_offload"
__table_args__ = (
UniqueConstraint(
"node_execution_id",
"type",
# Treat `NULL` as distinct for this unique index, so
# we can have mutitple records with `NULL` node_exeution_id, simplify garbage collection process.
postgresql_nulls_not_distinct=False,
),
)
_HASH_COL_SIZE = 64

id: Mapped[str] = mapped_column(
StringUUID,
primary_key=True,
server_default=sa.text("uuidv7()"),
)

created_at: Mapped[datetime] = mapped_column(
DateTime, default=naive_utc_now, server_default=func.current_timestamp()
)

tenant_id: Mapped[str] = mapped_column(StringUUID)
app_id: Mapped[str] = mapped_column(StringUUID)

# `node_execution_id` indicates the `WorkflowNodeExecutionModel` associated with this offload record.
# A value of `None` signifies that this offload record is not linked to any execution record
# and should be considered for garbage collection.
node_execution_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
type_: Mapped[ExecutionOffLoadType] = mapped_column(EnumText(ExecutionOffLoadType), name="type", nullable=False)

# Design Decision: Combining inputs and outputs into a single object was considered to reduce I/O
# operations. However, due to the current design of `WorkflowNodeExecutionRepository`,
# the `save` method is called at two distinct times:
#
# - When the node starts execution: the `inputs` field exists, but the `outputs` field is absent
# - When the node completes execution (either succeeded or failed): the `outputs` field becomes available
#
# It's difficult to correlate these two successive calls to `save` for combined storage.
# Converting the `WorkflowNodeExecutionRepository` to buffer the first `save` call and flush
# when execution completes was also considered, but this would make the execution state unobservable
# until completion, significantly damaging the observability of workflow execution.
#
# Given these constraints, `inputs` and `outputs` are stored separately to maintain real-time
# observability and system reliability.

# `file_id` references to the offloaded storage object containing the data.
file_id: Mapped[str] = mapped_column(StringUUID, nullable=False)

execution: Mapped[WorkflowNodeExecutionModel] = orm.relationship(
foreign_keys=[node_execution_id],
lazy="raise",
uselist=False,
primaryjoin="WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id",
back_populates="offload_data",
)

file: Mapped[Optional["UploadFile"]] = orm.relationship(
foreign_keys=[file_id],
lazy="raise",
uselist=False,
primaryjoin="WorkflowNodeExecutionOffload.file_id == UploadFile.id",
)


class WorkflowAppLogCreatedFrom(Enum):
"""

Laden…
Abbrechen
Speichern