| @@ -57,11 +57,19 @@ class StreamProcessor(ABC): | |||
| # The branch_identify parameter is added to ensure that | |||
| # only nodes in the correct logical branch are included. | |||
| reachable_node_ids.append(edge.target_node_id) | |||
| ids = self._fetch_node_ids_in_reachable_branch(edge.target_node_id, run_result.edge_source_handle) | |||
| reachable_node_ids.extend(ids) | |||
| else: | |||
| # if the condition edge in parallel, and the target node is not in parallel, we should not remove it | |||
| # Issues: #13626 | |||
| if ( | |||
| finished_node_id in self.graph.node_parallel_mapping | |||
| and edge.target_node_id not in self.graph.parallel_mapping | |||
| ): | |||
| continue | |||
| unreachable_first_node_ids.append(edge.target_node_id) | |||
| unreachable_first_node_ids = list(set(unreachable_first_node_ids) - set(reachable_node_ids)) | |||
| for node_id in unreachable_first_node_ids: | |||
| self._remove_node_ids_in_unreachable_branch(node_id, reachable_node_ids) | |||