Signed-off-by: yihong0618 <zouzou0208@gmail.com>tags/0.13.1
| skipped_count = 0 | skipped_count = 0 | ||||
| total_count = 0 | total_count = 0 | ||||
| vector_type = dify_config.VECTOR_STORE | vector_type = dify_config.VECTOR_STORE | ||||
| upper_colletion_vector_types = { | |||||
| upper_collection_vector_types = { | |||||
| VectorType.MILVUS, | VectorType.MILVUS, | ||||
| VectorType.PGVECTOR, | VectorType.PGVECTOR, | ||||
| VectorType.RELYT, | VectorType.RELYT, | ||||
| VectorType.ORACLE, | VectorType.ORACLE, | ||||
| VectorType.ELASTICSEARCH, | VectorType.ELASTICSEARCH, | ||||
| } | } | ||||
| lower_colletion_vector_types = { | |||||
| lower_collection_vector_types = { | |||||
| VectorType.ANALYTICDB, | VectorType.ANALYTICDB, | ||||
| VectorType.CHROMA, | VectorType.CHROMA, | ||||
| VectorType.MYSCALE, | VectorType.MYSCALE, | ||||
| continue | continue | ||||
| collection_name = "" | collection_name = "" | ||||
| dataset_id = dataset.id | dataset_id = dataset.id | ||||
| if vector_type in upper_colletion_vector_types: | |||||
| if vector_type in upper_collection_vector_types: | |||||
| collection_name = Dataset.gen_collection_name_by_id(dataset_id) | collection_name = Dataset.gen_collection_name_by_id(dataset_id) | ||||
| elif vector_type == VectorType.QDRANT: | elif vector_type == VectorType.QDRANT: | ||||
| if dataset.collection_binding_id: | if dataset.collection_binding_id: | ||||
| else: | else: | ||||
| collection_name = Dataset.gen_collection_name_by_id(dataset_id) | collection_name = Dataset.gen_collection_name_by_id(dataset_id) | ||||
| elif vector_type in lower_colletion_vector_types: | |||||
| elif vector_type in lower_collection_vector_types: | |||||
| collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower() | collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower() | ||||
| else: | else: | ||||
| raise ValueError(f"Vector store {vector_type} is not supported.") | raise ValueError(f"Vector store {vector_type} is not supported.") |
| Due to the presence of tasks in App Runner that require long execution times, such as LLM generation and external requests, Flask-Sqlalchemy's strategy for database connection pooling is to allocate one connection (transaction) per request. This approach keeps a connection occupied even during non-DB tasks, leading to the inability to acquire new connections during high concurrency requests due to multiple long-running tasks. | Due to the presence of tasks in App Runner that require long execution times, such as LLM generation and external requests, Flask-Sqlalchemy's strategy for database connection pooling is to allocate one connection (transaction) per request. This approach keeps a connection occupied even during non-DB tasks, leading to the inability to acquire new connections during high concurrency requests due to multiple long-running tasks. | ||||
| Therefore, the database operations in App Runner and Task Pipeline must ensure connections are closed immediately after use, and it's better to pass IDs rather than Model objects to avoid deattach errors. | |||||
| Therefore, the database operations in App Runner and Task Pipeline must ensure connections are closed immediately after use, and it's better to pass IDs rather than Model objects to avoid detach errors. | |||||
| Examples: | Examples: | ||||
| """ | """ | ||||
| ``` | ``` | ||||
| 也可以直接抛出对应Erros,并做如下定义,这样在之后的调用中可以直接抛出`InvokeConnectionError`等异常。 | |||||
| 也可以直接抛出对应 Errors,并做如下定义,这样在之后的调用中可以直接抛出`InvokeConnectionError`等异常。 | |||||
| ```python | ```python | ||||
| @property | @property |
| params["input"] = query | params["input"] = query | ||||
| else: | else: | ||||
| finished = True | finished = True | ||||
| if "souces" in response_data["queryresult"]: | |||||
| if "sources" in response_data["queryresult"]: | |||||
| return self.create_link_message(response_data["queryresult"]["sources"]["url"]) | return self.create_link_message(response_data["queryresult"]["sources"]["url"]) | ||||
| elif "pods" in response_data["queryresult"]: | elif "pods" in response_data["queryresult"]: | ||||
| result = response_data["queryresult"]["pods"][0]["subpods"][0]["plaintext"] | result = response_data["queryresult"]["pods"][0]["subpods"][0]["plaintext"] |
| """ | """ | ||||
| node_inputs: dict[str, list] = {"conditions": []} | node_inputs: dict[str, list] = {"conditions": []} | ||||
| process_datas: dict[str, list] = {"condition_results": []} | |||||
| process_data: dict[str, list] = {"condition_results": []} | |||||
| input_conditions = [] | input_conditions = [] | ||||
| final_result = False | final_result = False | ||||
| operator=case.logical_operator, | operator=case.logical_operator, | ||||
| ) | ) | ||||
| process_datas["condition_results"].append( | |||||
| process_data["condition_results"].append( | |||||
| { | { | ||||
| "group": case.model_dump(), | "group": case.model_dump(), | ||||
| "results": group_result, | "results": group_result, | ||||
| selected_case_id = "true" if final_result else "false" | selected_case_id = "true" if final_result else "false" | ||||
| process_datas["condition_results"].append( | |||||
| process_data["condition_results"].append( | |||||
| {"group": "default", "results": group_result, "final_result": final_result} | {"group": "default", "results": group_result, "final_result": final_result} | ||||
| ) | ) | ||||
| except Exception as e: | except Exception as e: | ||||
| return NodeRunResult( | return NodeRunResult( | ||||
| status=WorkflowNodeExecutionStatus.FAILED, inputs=node_inputs, process_data=process_datas, error=str(e) | |||||
| status=WorkflowNodeExecutionStatus.FAILED, inputs=node_inputs, process_data=process_data, error=str(e) | |||||
| ) | ) | ||||
| outputs = {"result": final_result, "selected_case_id": selected_case_id} | outputs = {"result": final_result, "selected_case_id": selected_case_id} | ||||
| data = NodeRunResult( | data = NodeRunResult( | ||||
| status=WorkflowNodeExecutionStatus.SUCCEEDED, | status=WorkflowNodeExecutionStatus.SUCCEEDED, | ||||
| inputs=node_inputs, | inputs=node_inputs, | ||||
| process_data=process_datas, | |||||
| process_data=process_data, | |||||
| edge_source_handle=selected_case_id or "false", # Use case ID or 'default' | edge_source_handle=selected_case_id or "false", # Use case ID or 'default' | ||||
| outputs=outputs, | outputs=outputs, | ||||
| ) | ) |
| class OperationNotSupportedError(VariableOperatorNodeError): | class OperationNotSupportedError(VariableOperatorNodeError): | ||||
| def __init__(self, *, operation: Operation, varialbe_type: str): | |||||
| super().__init__(f"Operation {operation} is not supported for type {varialbe_type}") | |||||
| def __init__(self, *, operation: Operation, variable_type: str): | |||||
| super().__init__(f"Operation {operation} is not supported for type {variable_type}") | |||||
| class InputTypeNotSupportedError(VariableOperatorNodeError): | class InputTypeNotSupportedError(VariableOperatorNodeError): |
| # Check if operation is supported | # Check if operation is supported | ||||
| if not helpers.is_operation_supported(variable_type=variable.value_type, operation=item.operation): | if not helpers.is_operation_supported(variable_type=variable.value_type, operation=item.operation): | ||||
| raise OperationNotSupportedError(operation=item.operation, varialbe_type=variable.value_type) | |||||
| raise OperationNotSupportedError(operation=item.operation, variable_type=variable.value_type) | |||||
| # Check if variable input is supported | # Check if variable input is supported | ||||
| if item.input_type == InputType.VARIABLE and not helpers.is_variable_input_supported( | if item.input_type == InputType.VARIABLE and not helpers.is_variable_input_supported( | ||||
| case Operation.DIVIDE: | case Operation.DIVIDE: | ||||
| return variable.value / value | return variable.value / value | ||||
| case _: | case _: | ||||
| raise OperationNotSupportedError(operation=operation, varialbe_type=variable.value_type) | |||||
| raise OperationNotSupportedError(operation=operation, variable_type=variable.value_type) |