Signed-off-by: -LAN- <laipz8200@outlook.com>tags/2.0.0-beta.1
| for key, value in msg_metadata.items() | for key, value in msg_metadata.items() | ||||
| if key in WorkflowNodeExecutionMetadataKey.__members__.values() | if key in WorkflowNodeExecutionMetadataKey.__members__.values() | ||||
| } | } | ||||
| if message.message.json_object is not None: | |||||
| if message.message.json_object: | |||||
| json_list.append(message.message.json_object) | json_list.append(message.message.json_object) | ||||
| elif message.type == ToolInvokeMessage.MessageType.LINK: | elif message.type == ToolInvokeMessage.MessageType.LINK: | ||||
| assert isinstance(message.message, ToolInvokeMessage.TextMessage) | assert isinstance(message.message, ToolInvokeMessage.TextMessage) |
| @model_validator(mode="after") | @model_validator(mode="after") | ||||
| def validate_value_type(self) -> "DefaultValue": | def validate_value_type(self) -> "DefaultValue": | ||||
| if self.type is None: | |||||
| raise DefaultValueTypeError("type field is required") | |||||
| # Type validation configuration | # Type validation configuration | ||||
| type_validators = { | type_validators = { | ||||
| DefaultValueType.STRING: { | DefaultValueType.STRING: { |
| """ | """ | ||||
| if value is None: | if value is None: | ||||
| return None | return None | ||||
| if not isinstance(value, str): | |||||
| raise OutputValidationError(f"Output variable `{variable}` must be a string") | |||||
| if len(value) > dify_config.CODE_MAX_STRING_LENGTH: | if len(value) > dify_config.CODE_MAX_STRING_LENGTH: | ||||
| raise OutputValidationError( | raise OutputValidationError( | ||||
| def _check_boolean(self, value: bool | None, variable: str) -> bool | None: | def _check_boolean(self, value: bool | None, variable: str) -> bool | None: | ||||
| if value is None: | if value is None: | ||||
| return None | return None | ||||
| if not isinstance(value, bool): | |||||
| raise OutputValidationError(f"Output variable `{variable}` must be a boolean") | |||||
| return value | return value | ||||
| """ | """ | ||||
| if value is None: | if value is None: | ||||
| return None | return None | ||||
| if not isinstance(value, int | float): | |||||
| raise OutputValidationError(f"Output variable `{variable}` must be a number") | |||||
| if value > dify_config.CODE_MAX_NUMBER or value < dify_config.CODE_MIN_NUMBER: | if value > dify_config.CODE_MAX_NUMBER or value < dify_config.CODE_MIN_NUMBER: | ||||
| raise OutputValidationError( | raise OutputValidationError( |
| if authorization.config is None: | if authorization.config is None: | ||||
| raise AuthorizationConfigError("authorization config is required") | raise AuthorizationConfigError("authorization config is required") | ||||
| if self.auth.config.api_key is None: | |||||
| raise AuthorizationConfigError("api_key is required") | |||||
| if not authorization.config.header: | if not authorization.config.header: | ||||
| authorization.config.header = "Authorization" | authorization.config.header = "Authorization" | ||||
| if self.files and not all(f[0] == "__multipart_placeholder__" for f in self.files): | if self.files and not all(f[0] == "__multipart_placeholder__" for f in self.files): | ||||
| for file_entry in self.files: | for file_entry in self.files: | ||||
| # file_entry should be (key, (filename, content, mime_type)), but handle edge cases | # file_entry should be (key, (filename, content, mime_type)), but handle edge cases | ||||
| if len(file_entry) != 2 or not isinstance(file_entry[1], tuple) or len(file_entry[1]) < 2: | |||||
| if len(file_entry) != 2 or len(file_entry[1]) < 2: | |||||
| continue # skip malformed entries | continue # skip malformed entries | ||||
| key = file_entry[0] | key = file_entry[0] | ||||
| content = file_entry[1][1] | content = file_entry[1][1] | ||||
| body_string += f"--{boundary}\r\n" | body_string += f"--{boundary}\r\n" | ||||
| body_string += f'Content-Disposition: form-data; name="{key}"\r\n\r\n' | body_string += f'Content-Disposition: form-data; name="{key}"\r\n\r\n' | ||||
| # decode content safely | # decode content safely | ||||
| if isinstance(content, bytes): | |||||
| try: | |||||
| body_string += content.decode("utf-8") | |||||
| except UnicodeDecodeError: | |||||
| body_string += content.decode("utf-8", errors="replace") | |||||
| elif isinstance(content, str): | |||||
| body_string += content | |||||
| else: | |||||
| body_string += f"[Unsupported content type: {type(content).__name__}]" | |||||
| try: | |||||
| body_string += content.decode("utf-8") | |||||
| except UnicodeDecodeError: | |||||
| body_string += content.decode("utf-8", errors="replace") | |||||
| body_string += "\r\n" | body_string += "\r\n" | ||||
| body_string += f"--{boundary}--\r\n" | body_string += f"--{boundary}--\r\n" | ||||
| elif self.node_data.body: | elif self.node_data.body: | ||||
| if self.content: | if self.content: | ||||
| if isinstance(self.content, str): | |||||
| body_string = self.content | |||||
| elif isinstance(self.content, bytes): | |||||
| body_string = self.content.decode("utf-8", errors="replace") | |||||
| body_string = self.content.decode("utf-8", errors="replace") | |||||
| elif self.data and self.node_data.body.type == "x-www-form-urlencoded": | elif self.data and self.node_data.body.type == "x-www-form-urlencoded": | ||||
| body_string = urlencode(self.data) | body_string = urlencode(self.data) | ||||
| elif self.data and self.node_data.body.type == "form-data": | elif self.data and self.node_data.body.type == "form-data": |
| ) | ) | ||||
| result = list(filter(filter_func, variable.value)) | result = list(filter(filter_func, variable.value)) | ||||
| variable = variable.model_copy(update={"value": result}) | variable = variable.model_copy(update={"value": result}) | ||||
| elif isinstance(variable, ArrayBooleanSegment): | |||||
| if not isinstance(condition.value, bool): | |||||
| raise InvalidFilterValueError(f"Invalid filter value: {condition.value}") | |||||
| else: | |||||
| filter_func = _get_boolean_filter_func(condition=condition.comparison_operator, value=condition.value) | filter_func = _get_boolean_filter_func(condition=condition.comparison_operator, value=condition.value) | ||||
| result = list(filter(filter_func, variable.value)) | result = list(filter(filter_func, variable.value)) | ||||
| variable = variable.model_copy(update={"value": result}) | variable = variable.model_copy(update={"value": result}) | ||||
| else: | |||||
| raise AssertionError("this statment should be unreachable.") | |||||
| return variable | return variable | ||||
| def _apply_order(self, variable: _SUPPORTED_TYPES_ALIAS) -> _SUPPORTED_TYPES_ALIAS: | def _apply_order(self, variable: _SUPPORTED_TYPES_ALIAS) -> _SUPPORTED_TYPES_ALIAS: | ||||
| if isinstance(variable, (ArrayStringSegment, ArrayNumberSegment, ArrayBooleanSegment)): | if isinstance(variable, (ArrayStringSegment, ArrayNumberSegment, ArrayBooleanSegment)): | ||||
| result = sorted(variable.value, reverse=self._node_data.order_by == Order.DESC) | |||||
| result = sorted(variable.value, reverse=self._node_data.order_by.value == Order.DESC) | |||||
| variable = variable.model_copy(update={"value": result}) | variable = variable.model_copy(update={"value": result}) | ||||
| elif isinstance(variable, ArrayFileSegment): | |||||
| else: | |||||
| result = _order_file( | result = _order_file( | ||||
| order=self._node_data.order_by.value, order_by=self._node_data.order_by.key, array=variable.value | order=self._node_data.order_by.value, order_by=self._node_data.order_by.key, array=variable.value | ||||
| ) | ) | ||||
| variable = variable.model_copy(update={"value": result}) | variable = variable.model_copy(update={"value": result}) | ||||
| else: | |||||
| raise AssertionError("this statement should be unreachable") | |||||
| return variable | return variable | ||||
| if key in {"name", "extension", "mime_type", "url"} and isinstance(value, str): | if key in {"name", "extension", "mime_type", "url"} and isinstance(value, str): | ||||
| extract_func = _get_file_extract_string_func(key=key) | extract_func = _get_file_extract_string_func(key=key) | ||||
| return lambda x: _get_string_filter_func(condition=condition, value=value)(extract_func(x)) | return lambda x: _get_string_filter_func(condition=condition, value=value)(extract_func(x)) | ||||
| if key in {"type", "transfer_method"} and isinstance(value, Sequence): | |||||
| if key in {"type", "transfer_method"}: | |||||
| extract_func = _get_file_extract_string_func(key=key) | extract_func = _get_file_extract_string_func(key=key) | ||||
| return lambda x: _get_sequence_filter_func(condition=condition, value=value)(extract_func(x)) | return lambda x: _get_sequence_filter_func(condition=condition, value=value)(extract_func(x)) | ||||
| elif key == "size" and isinstance(value, str): | elif key == "size" and isinstance(value, str): |
| generator = self._fetch_context(node_data=self._node_data) | generator = self._fetch_context(node_data=self._node_data) | ||||
| context = None | context = None | ||||
| for event in generator: | for event in generator: | ||||
| if isinstance(event, RunRetrieverResourceEvent): | |||||
| context = event.context | |||||
| yield event | |||||
| context = event.context | |||||
| yield event | |||||
| if context: | if context: | ||||
| node_inputs["#context#"] = context | node_inputs["#context#"] = context | ||||
| outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason} | outputs = {"text": result_text, "usage": jsonable_encoder(usage), "finish_reason": finish_reason} | ||||
| if structured_output: | if structured_output: | ||||
| outputs["structured_output"] = structured_output.structured_output | outputs["structured_output"] = structured_output.structured_output | ||||
| if self._file_outputs is not None: | |||||
| if self._file_outputs: | |||||
| outputs["files"] = ArrayFileSegment(value=self._file_outputs) | outputs["files"] = ArrayFileSegment(value=self._file_outputs) | ||||
| # Send final chunk event to indicate streaming is complete | # Send final chunk event to indicate streaming is complete | ||||
| prompt_template = typed_node_data.prompt_template | prompt_template = typed_node_data.prompt_template | ||||
| variable_selectors = [] | variable_selectors = [] | ||||
| if isinstance(prompt_template, list) and all( | |||||
| isinstance(prompt, LLMNodeChatModelMessage) for prompt in prompt_template | |||||
| ): | |||||
| if isinstance(prompt_template, list): | |||||
| for prompt in prompt_template: | for prompt in prompt_template: | ||||
| if prompt.edition_type != "jinja2": | if prompt.edition_type != "jinja2": | ||||
| variable_template_parser = VariableTemplateParser(template=prompt.text) | variable_template_parser = VariableTemplateParser(template=prompt.text) | ||||
| return | return | ||||
| if isinstance(contents, str): | if isinstance(contents, str): | ||||
| yield contents | yield contents | ||||
| elif isinstance(contents, list): | |||||
| else: | |||||
| for item in contents: | for item in contents: | ||||
| if isinstance(item, TextPromptMessageContent): | if isinstance(item, TextPromptMessageContent): | ||||
| yield item.data | yield item.data | ||||
| else: | else: | ||||
| logger.warning("unknown item type encountered, type=%s", type(item)) | logger.warning("unknown item type encountered, type=%s", type(item)) | ||||
| yield str(item) | yield str(item) | ||||
| else: | |||||
| logger.warning("unknown contents type encountered, type=%s", type(contents)) | |||||
| yield str(contents) | |||||
| @property | @property | ||||
| def retry(self) -> bool: | def retry(self) -> bool: |
| def _validate_type(parameter_type: str) -> SegmentType: | def _validate_type(parameter_type: str) -> SegmentType: | ||||
| if not isinstance(parameter_type, str): | |||||
| raise TypeError(f"type should be str, got {type(parameter_type)}, value={parameter_type}") | |||||
| if parameter_type not in _VALID_PARAMETER_TYPES: | if parameter_type not in _VALID_PARAMETER_TYPES: | ||||
| raise ValueError(f"type {parameter_type} is not allowd to use in Parameter Extractor node.") | raise ValueError(f"type {parameter_type} is not allowd to use in Parameter Extractor node.") | ||||
| from core.memory.token_buffer_memory import TokenBufferMemory | from core.memory.token_buffer_memory import TokenBufferMemory | ||||
| from core.model_manager import ModelInstance | from core.model_manager import ModelInstance | ||||
| from core.model_runtime.entities import ImagePromptMessageContent | from core.model_runtime.entities import ImagePromptMessageContent | ||||
| from core.model_runtime.entities.llm_entities import LLMResult, LLMUsage | |||||
| from core.model_runtime.entities.llm_entities import LLMUsage | |||||
| from core.model_runtime.entities.message_entities import ( | from core.model_runtime.entities.message_entities import ( | ||||
| AssistantPromptMessage, | AssistantPromptMessage, | ||||
| PromptMessage, | PromptMessage, | ||||
| from .entities import ParameterExtractorNodeData | from .entities import ParameterExtractorNodeData | ||||
| from .exc import ( | from .exc import ( | ||||
| InvalidInvokeResultError, | |||||
| InvalidModelModeError, | InvalidModelModeError, | ||||
| InvalidModelTypeError, | InvalidModelTypeError, | ||||
| InvalidNumberOfParametersError, | InvalidNumberOfParametersError, | ||||
| ) | ) | ||||
| # handle invoke result | # handle invoke result | ||||
| if not isinstance(invoke_result, LLMResult): | |||||
| raise InvalidInvokeResultError(f"Invalid invoke result: {invoke_result}") | |||||
| text = invoke_result.message.content or "" | text = invoke_result.message.content or "" | ||||
| if not isinstance(text, str): | if not isinstance(text, str): | ||||
| # deduct quota | # deduct quota | ||||
| llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) | llm_utils.deduct_llm_quota(tenant_id=self.tenant_id, model_instance=model_instance, usage=usage) | ||||
| if text is None: | |||||
| text = "" | |||||
| return text, usage, tool_call | return text, usage, tool_call | ||||
| def _generate_function_call_prompt( | def _generate_function_call_prompt( | ||||
| return int(value) | return int(value) | ||||
| elif isinstance(value, (int, float)): | elif isinstance(value, (int, float)): | ||||
| return value | return value | ||||
| elif not isinstance(value, str): | |||||
| return None | |||||
| if "." in value: | if "." in value: | ||||
| try: | try: | ||||
| return float(value) | return float(value) | ||||
| for parameter in data.parameters: | for parameter in data.parameters: | ||||
| if parameter.type == "number": | if parameter.type == "number": | ||||
| result[parameter.name] = 0 | result[parameter.name] = 0 | ||||
| elif parameter.type == "bool": | |||||
| elif parameter.type == "boolean": | |||||
| result[parameter.name] = False | result[parameter.name] = False | ||||
| elif parameter.type in {"string", "select"}: | elif parameter.type in {"string", "select"}: | ||||
| result[parameter.name] = "" | result[parameter.name] = "" |
| elif message.type == ToolInvokeMessage.MessageType.JSON: | elif message.type == ToolInvokeMessage.MessageType.JSON: | ||||
| assert isinstance(message.message, ToolInvokeMessage.JsonMessage) | assert isinstance(message.message, ToolInvokeMessage.JsonMessage) | ||||
| # JSON message handling for tool node | # JSON message handling for tool node | ||||
| if message.message.json_object is not None: | |||||
| if message.message.json_object: | |||||
| json.append(message.message.json_object) | json.append(message.message.json_object) | ||||
| elif message.type == ToolInvokeMessage.MessageType.LINK: | elif message.type == ToolInvokeMessage.MessageType.LINK: | ||||
| assert isinstance(message.message, ToolInvokeMessage.TextMessage) | assert isinstance(message.message, ToolInvokeMessage.TextMessage) |
| case WriteMode.CLEAR: | case WriteMode.CLEAR: | ||||
| income_value = get_zero_value(original_variable.value_type) | income_value = get_zero_value(original_variable.value_type) | ||||
| if income_value is None: | |||||
| raise VariableOperatorNodeError("income value not found") | |||||
| updated_variable = original_variable.model_copy(update={"value": income_value.to_object()}) | updated_variable = original_variable.model_copy(update={"value": income_value.to_object()}) | ||||
| case _: | |||||
| raise VariableOperatorNodeError(f"unsupported write mode: {self._node_data.write_mode}") | |||||
| # Over write the variable. | # Over write the variable. | ||||
| self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable) | self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable) |
| # Only array variable can be appended or extended | # Only array variable can be appended or extended | ||||
| # Only array variable can have elements removed | # Only array variable can have elements removed | ||||
| return variable_type.is_array_type() | return variable_type.is_array_type() | ||||
| case _: | |||||
| return False | |||||
| def is_variable_input_supported(*, operation: Operation): | def is_variable_input_supported(*, operation: Operation): |
| if not variable.value: | if not variable.value: | ||||
| return variable.value | return variable.value | ||||
| return variable.value[:-1] | return variable.value[:-1] | ||||
| case _: | |||||
| raise OperationNotSupportedError(operation=operation, variable_type=variable.value_type) |