|
|
|
@@ -1,13 +1,18 @@ |
|
|
|
from collections.abc import Mapping, Sequence |
|
|
|
from typing import Any, cast |
|
|
|
from typing import Any, Generator, cast |
|
|
|
|
|
|
|
from sqlalchemy import select |
|
|
|
from sqlalchemy.orm import Session |
|
|
|
|
|
|
|
from core.datasource.entities.datasource_entities import ( |
|
|
|
DatasourceInvokeMessage, |
|
|
|
DatasourceParameter, |
|
|
|
DatasourceProviderType, |
|
|
|
GetOnlineDocumentPageContentRequest, |
|
|
|
GetOnlineDocumentPageContentResponse, |
|
|
|
) |
|
|
|
from core.datasource.online_document.online_document_plugin import OnlineDocumentDatasourcePlugin |
|
|
|
from core.datasource.utils.message_transformer import DatasourceFileMessageTransformer |
|
|
|
from core.file import File |
|
|
|
from core.file.enums import FileTransferMethod, FileType |
|
|
|
from core.plugin.impl.exc import PluginDaemonClientSideError |
|
|
|
@@ -19,8 +24,11 @@ from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution |
|
|
|
from core.workflow.enums import SystemVariableKey |
|
|
|
from core.workflow.nodes.base import BaseNode |
|
|
|
from core.workflow.nodes.enums import NodeType |
|
|
|
from core.workflow.nodes.event.event import RunCompletedEvent, RunStreamChunkEvent |
|
|
|
from core.workflow.nodes.tool.exc import ToolFileError |
|
|
|
from core.workflow.utils.variable_template_parser import VariableTemplateParser |
|
|
|
from extensions.ext_database import db |
|
|
|
from factories import file_factory |
|
|
|
from models.model import UploadFile |
|
|
|
|
|
|
|
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey |
|
|
|
@@ -36,7 +44,7 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
_node_data_cls = DatasourceNodeData |
|
|
|
_node_type = NodeType.DATASOURCE |
|
|
|
|
|
|
|
def _run(self) -> NodeRunResult: |
|
|
|
def _run(self) -> Generator: |
|
|
|
""" |
|
|
|
Run the datasource node |
|
|
|
""" |
|
|
|
@@ -65,13 +73,15 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
datasource_type=DatasourceProviderType.value_of(datasource_type), |
|
|
|
) |
|
|
|
except DatasourceNodeError as e: |
|
|
|
return NodeRunResult( |
|
|
|
yield RunCompletedEvent( |
|
|
|
run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.FAILED, |
|
|
|
inputs={}, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
error=f"Failed to get datasource runtime: {str(e)}", |
|
|
|
error_type=type(e).__name__, |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
# get parameters |
|
|
|
datasource_parameters = datasource_runtime.entity.parameters |
|
|
|
@@ -91,25 +101,22 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
match datasource_type: |
|
|
|
case DatasourceProviderType.ONLINE_DOCUMENT: |
|
|
|
datasource_runtime = cast(OnlineDocumentDatasourcePlugin, datasource_runtime) |
|
|
|
online_document_result: GetOnlineDocumentPageContentResponse = ( |
|
|
|
online_document_result: Generator[DatasourceInvokeMessage, None, None] = ( |
|
|
|
datasource_runtime._get_online_document_page_content( |
|
|
|
user_id=self.user_id, |
|
|
|
datasource_parameters=GetOnlineDocumentPageContentRequest(**parameters), |
|
|
|
provider_type=datasource_type, |
|
|
|
) |
|
|
|
) |
|
|
|
return NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.SUCCEEDED, |
|
|
|
inputs=parameters_for_log, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
outputs={ |
|
|
|
**online_document_result.result.model_dump(), |
|
|
|
"datasource_type": datasource_type, |
|
|
|
}, |
|
|
|
yield from self._transform_message( |
|
|
|
messages=online_document_result, |
|
|
|
parameters_for_log=parameters_for_log, |
|
|
|
datasource_info=datasource_info, |
|
|
|
) |
|
|
|
|
|
|
|
case DatasourceProviderType.WEBSITE_CRAWL: |
|
|
|
|
|
|
|
return NodeRunResult( |
|
|
|
yield RunCompletedEvent(run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.SUCCEEDED, |
|
|
|
inputs=parameters_for_log, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
@@ -117,7 +124,7 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
**datasource_info, |
|
|
|
"datasource_type": datasource_type, |
|
|
|
}, |
|
|
|
) |
|
|
|
)) |
|
|
|
case DatasourceProviderType.LOCAL_FILE: |
|
|
|
related_id = datasource_info.get("related_id") |
|
|
|
if not related_id: |
|
|
|
@@ -149,7 +156,7 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
variable_key_list=new_key_list, |
|
|
|
variable_value=value, |
|
|
|
) |
|
|
|
return NodeRunResult( |
|
|
|
yield RunCompletedEvent(run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.SUCCEEDED, |
|
|
|
inputs=parameters_for_log, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
@@ -157,25 +164,25 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
"file_info": datasource_info, |
|
|
|
"datasource_type": datasource_type, |
|
|
|
}, |
|
|
|
) |
|
|
|
)) |
|
|
|
case _: |
|
|
|
raise DatasourceNodeError(f"Unsupported datasource provider: {datasource_type}") |
|
|
|
except PluginDaemonClientSideError as e: |
|
|
|
return NodeRunResult( |
|
|
|
yield RunCompletedEvent(run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.FAILED, |
|
|
|
inputs=parameters_for_log, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
error=f"Failed to transform datasource message: {str(e)}", |
|
|
|
error_type=type(e).__name__, |
|
|
|
) |
|
|
|
)) |
|
|
|
except DatasourceNodeError as e: |
|
|
|
return NodeRunResult( |
|
|
|
yield RunCompletedEvent(run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.FAILED, |
|
|
|
inputs=parameters_for_log, |
|
|
|
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info}, |
|
|
|
error=f"Failed to invoke datasource: {str(e)}", |
|
|
|
error_type=type(e).__name__, |
|
|
|
) |
|
|
|
)) |
|
|
|
|
|
|
|
def _generate_parameters( |
|
|
|
self, |
|
|
|
@@ -279,3 +286,136 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): |
|
|
|
result = {node_id + "." + key: value for key, value in result.items()} |
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _transform_message( |
|
|
|
self, |
|
|
|
messages: Generator[DatasourceInvokeMessage, None, None], |
|
|
|
parameters_for_log: dict[str, Any], |
|
|
|
datasource_info: dict[str, Any], |
|
|
|
) -> Generator: |
|
|
|
""" |
|
|
|
Convert ToolInvokeMessages into tuple[plain_text, files] |
|
|
|
""" |
|
|
|
# transform message and handle file storage |
|
|
|
message_stream = DatasourceFileMessageTransformer.transform_datasource_invoke_messages( |
|
|
|
messages=messages, |
|
|
|
user_id=self.user_id, |
|
|
|
tenant_id=self.tenant_id, |
|
|
|
conversation_id=None, |
|
|
|
) |
|
|
|
|
|
|
|
text = "" |
|
|
|
files: list[File] = [] |
|
|
|
json: list[dict] = [] |
|
|
|
|
|
|
|
variables: dict[str, Any] = {} |
|
|
|
|
|
|
|
for message in message_stream: |
|
|
|
if message.type in { |
|
|
|
DatasourceInvokeMessage.MessageType.IMAGE_LINK, |
|
|
|
DatasourceInvokeMessage.MessageType.BINARY_LINK, |
|
|
|
DatasourceInvokeMessage.MessageType.IMAGE, |
|
|
|
}: |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage) |
|
|
|
|
|
|
|
url = message.message.text |
|
|
|
if message.meta: |
|
|
|
transfer_method = message.meta.get("transfer_method", FileTransferMethod.DATASOURCE_FILE) |
|
|
|
else: |
|
|
|
transfer_method = FileTransferMethod.DATASOURCE_FILE |
|
|
|
|
|
|
|
datasource_file_id = str(url).split("/")[-1].split(".")[0] |
|
|
|
|
|
|
|
with Session(db.engine) as session: |
|
|
|
stmt = select(UploadFile).where(UploadFile.id == datasource_file_id) |
|
|
|
datasource_file = session.scalar(stmt) |
|
|
|
if datasource_file is None: |
|
|
|
raise ToolFileError(f"Tool file {datasource_file_id} does not exist") |
|
|
|
|
|
|
|
mapping = { |
|
|
|
"datasource_file_id": datasource_file_id, |
|
|
|
"type": file_factory.get_file_type_by_mime_type(datasource_file.mime_type), |
|
|
|
"transfer_method": transfer_method, |
|
|
|
"url": url, |
|
|
|
} |
|
|
|
file = file_factory.build_from_mapping( |
|
|
|
mapping=mapping, |
|
|
|
tenant_id=self.tenant_id, |
|
|
|
) |
|
|
|
files.append(file) |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.BLOB: |
|
|
|
# get tool file id |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage) |
|
|
|
assert message.meta |
|
|
|
|
|
|
|
datasource_file_id = message.message.text.split("/")[-1].split(".")[0] |
|
|
|
with Session(db.engine) as session: |
|
|
|
stmt = select(UploadFile).where(UploadFile.id == datasource_file_id) |
|
|
|
datasource_file = session.scalar(stmt) |
|
|
|
if datasource_file is None: |
|
|
|
raise ToolFileError(f"datasource file {datasource_file_id} not exists") |
|
|
|
|
|
|
|
mapping = { |
|
|
|
"datasource_file_id": datasource_file_id, |
|
|
|
"transfer_method": FileTransferMethod.DATASOURCE_FILE, |
|
|
|
} |
|
|
|
|
|
|
|
files.append( |
|
|
|
file_factory.build_from_mapping( |
|
|
|
mapping=mapping, |
|
|
|
tenant_id=self.tenant_id, |
|
|
|
) |
|
|
|
) |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.TEXT: |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage) |
|
|
|
text += message.message.text |
|
|
|
yield RunStreamChunkEvent( |
|
|
|
chunk_content=message.message.text, from_variable_selector=[self.node_id, "text"] |
|
|
|
) |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.JSON: |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.JsonMessage) |
|
|
|
if self.node_type == NodeType.AGENT: |
|
|
|
msg_metadata = message.message.json_object.pop("execution_metadata", {}) |
|
|
|
agent_execution_metadata = { |
|
|
|
key: value |
|
|
|
for key, value in msg_metadata.items() |
|
|
|
if key in WorkflowNodeExecutionMetadataKey.__members__.values() |
|
|
|
} |
|
|
|
json.append(message.message.json_object) |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.LINK: |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.TextMessage) |
|
|
|
stream_text = f"Link: {message.message.text}\n" |
|
|
|
text += stream_text |
|
|
|
yield RunStreamChunkEvent(chunk_content=stream_text, from_variable_selector=[self.node_id, "text"]) |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.VARIABLE: |
|
|
|
assert isinstance(message.message, DatasourceInvokeMessage.VariableMessage) |
|
|
|
variable_name = message.message.variable_name |
|
|
|
variable_value = message.message.variable_value |
|
|
|
if message.message.stream: |
|
|
|
if not isinstance(variable_value, str): |
|
|
|
raise ValueError("When 'stream' is True, 'variable_value' must be a string.") |
|
|
|
if variable_name not in variables: |
|
|
|
variables[variable_name] = "" |
|
|
|
variables[variable_name] += variable_value |
|
|
|
|
|
|
|
yield RunStreamChunkEvent( |
|
|
|
chunk_content=variable_value, from_variable_selector=[self.node_id, variable_name] |
|
|
|
) |
|
|
|
else: |
|
|
|
variables[variable_name] = variable_value |
|
|
|
elif message.type == DatasourceInvokeMessage.MessageType.FILE: |
|
|
|
assert message.meta is not None |
|
|
|
files.append(message.meta["file"]) |
|
|
|
|
|
|
|
yield RunCompletedEvent( |
|
|
|
run_result=NodeRunResult( |
|
|
|
status=WorkflowNodeExecutionStatus.SUCCEEDED, |
|
|
|
outputs={"json": json, "files": files, **variables, "text": text}, |
|
|
|
metadata={ |
|
|
|
WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info, |
|
|
|
}, |
|
|
|
inputs=parameters_for_log, |
|
|
|
) |
|
|
|
) |