| import re | import re | ||||
| import threading | import threading | ||||
| from core.app.entities.queue_entities import QueueAgentMessageEvent, QueueLLMChunkEvent, QueueTextChunkEvent | |||||
| from core.app.entities.queue_entities import ( | |||||
| QueueAgentMessageEvent, | |||||
| QueueLLMChunkEvent, | |||||
| QueueNodeSucceededEvent, | |||||
| QueueTextChunkEvent, | |||||
| ) | |||||
| from core.model_manager import ModelManager | from core.model_manager import ModelManager | ||||
| from core.model_runtime.entities.model_entities import ModelType | from core.model_runtime.entities.model_entities import ModelType | ||||
| self.msg_text += message.event.chunk.delta.message.content | self.msg_text += message.event.chunk.delta.message.content | ||||
| elif isinstance(message.event, QueueTextChunkEvent): | elif isinstance(message.event, QueueTextChunkEvent): | ||||
| self.msg_text += message.event.text | self.msg_text += message.event.text | ||||
| elif isinstance(message.event, QueueNodeSucceededEvent): | |||||
| self.msg_text += message.event.outputs.get('output', '') | |||||
| self.last_message = message | self.last_message = message | ||||
| sentence_arr, text_tmp = self._extract_sentence(self.msg_text) | sentence_arr, text_tmp = self._extract_sentence(self.msg_text) | ||||
| if len(sentence_arr) >= min(self.MAX_SENTENCE, 7): | if len(sentence_arr) >= min(self.MAX_SENTENCE, 7): |
| :return: | :return: | ||||
| """ | """ | ||||
| for message in self._queue_manager.listen(): | for message in self._queue_manager.listen(): | ||||
| if publisher: | |||||
| if hasattr(message.event, 'metadata') and message.event.metadata.get('is_answer_previous_node', False) and publisher: | |||||
| publisher.publish(message=message) | |||||
| elif (hasattr(message.event, 'execution_metadata') | |||||
| and message.event.execution_metadata | |||||
| and message.event.execution_metadata.get('is_answer_previous_node', False) | |||||
| and publisher): | |||||
| publisher.publish(message=message) | publisher.publish(message=message) | ||||
| event = message.event | event = message.event | ||||
| callbacks: Sequence[WorkflowCallback] | callbacks: Sequence[WorkflowCallback] | ||||
| is_answer_previous_node: bool = False | |||||
| def __init__(self, tenant_id: str, | def __init__(self, tenant_id: str, | ||||
| app_id: str, | app_id: str, | ||||
| workflow_id: str, | workflow_id: str, | ||||
| text=text, | text=text, | ||||
| metadata={ | metadata={ | ||||
| "node_type": self.node_type, | "node_type": self.node_type, | ||||
| "is_answer_previous_node": self.is_answer_previous_node, | |||||
| "value_selector": value_selector | "value_selector": value_selector | ||||
| } | } | ||||
| ) | ) |
| graph = workflow.graph_dict | graph = workflow.graph_dict | ||||
| try: | try: | ||||
| answer_prov_node_ids = [] | |||||
| for node in graph.get('nodes', []): | |||||
| if node.get('id', '') == 'answer': | |||||
| try: | |||||
| answer_prov_node_ids.append(node.get('data', {}) | |||||
| .get('answer', '') | |||||
| .replace('#', '') | |||||
| .replace('.text', '') | |||||
| .replace('{{', '') | |||||
| .replace('}}', '').split('.')[0]) | |||||
| except Exception as e: | |||||
| logger.error(e) | |||||
| predecessor_node: BaseNode | None = None | predecessor_node: BaseNode | None = None | ||||
| current_iteration_node: BaseIterationNode | None = None | current_iteration_node: BaseIterationNode | None = None | ||||
| has_entry_node = False | has_entry_node = False | ||||
| else: | else: | ||||
| next_node = self._get_node(workflow_run_state=workflow_run_state, graph=graph, node_id=next_node_id, callbacks=callbacks) | next_node = self._get_node(workflow_run_state=workflow_run_state, graph=graph, node_id=next_node_id, callbacks=callbacks) | ||||
| if next_node and next_node.node_id in answer_prov_node_ids: | |||||
| next_node.is_answer_previous_node = True | |||||
| # run workflow, run multiple target nodes in the future | # run workflow, run multiple target nodes in the future | ||||
| self._run_workflow_node( | self._run_workflow_node( | ||||
| workflow_run_state=workflow_run_state, | workflow_run_state=workflow_run_state, | ||||
| raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}") | raise ValueError(f"Node {node.node_data.title} run failed: {node_run_result.error}") | ||||
| if node.is_answer_previous_node and not isinstance(node, LLMNode): | |||||
| if not node_run_result.metadata: | |||||
| node_run_result.metadata = {} | |||||
| node_run_result.metadata["is_answer_previous_node"]=True | |||||
| workflow_nodes_and_result.result = node_run_result | workflow_nodes_and_result.result = node_run_result | ||||
| # node run success | # node run success |