Co-authored-by: zhangshibo <zhangshibo@didiglobal.com>tags/1.4.0
| @@ -95,7 +95,12 @@ class StreamProcessor(ABC): | |||
| if node_id not in self.rest_node_ids: | |||
| return | |||
| if node_id in reachable_node_ids: | |||
| return | |||
| self.rest_node_ids.remove(node_id) | |||
| self.rest_node_ids.extend(set(reachable_node_ids) - set(self.rest_node_ids)) | |||
| for edge in self.graph.edge_mapping.get(node_id, []): | |||
| if edge.target_node_id in reachable_node_ids: | |||
| continue | |||