[docs]classGraphVisitor(Generic[NodeType]):""" A graph visitor takes a node in the graph and returns its successors. Typically, it visits a control flow graph, and returns successors of a CFGNode each time. This is the base class of all graph visitors. """__slots__=("_sorted_nodes","_worklist","_nodes_set","_node_to_index","_reached_fixedpoint","_back_edges_by_src","_back_edges_by_dst","_pending_nodes",)
[docs]def__init__(self):self._sorted_nodes:list[NodeType]=[]# a list of sorted nodes. do not change until we get a new graphself._worklist:list[NodeType]=[]# a list of nodes that the analysis should work on and finally exhaustself._nodes_set:set[NodeType]=set()self._node_to_index:dict[NodeType,int]={}self._reached_fixedpoint:set[NodeType]=set()self._back_edges_by_src:dict[NodeType,set[NodeType]]|None=Noneself._back_edges_by_dst:dict[NodeType,set[NodeType]]|None=Noneself._pending_nodes:dict[NodeType,set[NodeType]]=defaultdict(set)
## Interfaces#
[docs]defsuccessors(self,node:NodeType)->list[NodeType]:""" Get successors of a node. The node should be in the graph. :param node: The node to work with. :return: A list of successors. :rtype: list """raiseNotImplementedError()
[docs]defpredecessors(self,node:NodeType)->list[NodeType]:""" Get predecessors of a node. The node should be in the graph. :param node: The node to work with. :return: A list of predecessors. """raiseNotImplementedError()
[docs]defsort_nodes(self,nodes:Collection[NodeType]|None=None)->list[NodeType]:""" Get a list of all nodes sorted in an optimal traversal order. :param iterable nodes: A collection of nodes to sort. If none, all nodes in the graph will be used to sort. :return: A list of sorted nodes. """raiseNotImplementedError()
[docs]defback_edges(self)->list[tuple[NodeType,NodeType]]:""" Get a list of back edges. This function is optional. If not overriden, the traverser cannot achieve an optimal graph traversal order. :return: A list of back edges (source -> destination). """raiseNotImplementedError()
## Public methods#
[docs]defnodes(self)->Iterator[NodeType]:""" Return an iterator of nodes following an optimal traversal order. :return: """returniter(self.sort_nodes())
@deprecated(replacement="nodes")defnodes_iter(self):""" (Deprecated) Return an iterator of nodes following an optimal traversal order. Will be removed in the future. """returnself.nodes()# Traversal
[docs]defreset(self):""" Reset the internal node traversal state. Must be called prior to visiting future nodes. :return: None """self._sorted_nodes.clear()self._worklist.clear()self._nodes_set.clear()self._node_to_index.clear()self._reached_fixedpoint.clear()self._sorted_nodes=list(self.sort_nodes())fori,ninenumerate(self._sorted_nodes):self._node_to_index[n]=ibinary_insert(self._worklist,n,lambdaelem:self._node_to_index[elem])self._nodes_set.add(n)self._populate_back_edges()
[docs]defnext_node(self)->NodeType|None:""" Get the next node to visit. :return: A node in the graph. """ifnotself._worklist:returnNonenode=Noneforidxinrange(len(self._worklist)):# pylint:disable=consider-using-enumeratenode_=self._worklist[idx]ifnode_inself._pending_nodes:ifnotself._pending_nodes[node_]:# this pending node is cleared - take itnode=node_delself._pending_nodes[node_]delself._worklist[idx]break# try the next nodecontinuenode=node_delself._worklist[idx]breakifnodeisNone:# all nodes are pending... we will just pick the first onenode=self._worklist.pop(0)self._nodes_set.discard(node)# check if this node should be added to pendingifself._back_edges_by_dstandnodeinself._back_edges_by_dst:forback_edge_srcinself._back_edges_by_dst[node]:self._pending_nodes[node].add(back_edge_src)# check if this node is being pended on by any other nodeifself._back_edges_by_srcandnodeinself._back_edges_by_src:forback_edge_dstinself._back_edges_by_src[node]:self._pending_nodes[back_edge_dst].discard(node)returnnode
[docs]defall_successors(self,node:NodeType,skip_reached_fixedpoint=False)->set[NodeType]:""" Returns all successors to the specific node. :param node: A node in the graph. :return: A set of nodes that are all successors to the given node. :rtype: set """successors=set()stack=[node]whilestack:n=stack.pop()successors.add(n)stack.extend(succforsuccinself.successors(n)ifsuccnotinsuccessorsand(notskip_reached_fixedpointorsuccnotinself._reached_fixedpoint))returnsuccessors
[docs]defrevisit_successors(self,node:NodeType,include_self=True)->None:""" Revisit a node in the future. As a result, the successors to this node will be revisited as well. :param node: The node to revisit in the future. :return: None """successors=self.successors(node)# , skip_reached_fixedpoint=True)ifinclude_self:ifnodenotinself._nodes_set:binary_insert(self._worklist,node,lambdaelem:self._node_to_index[elem])self._nodes_set.add(node)forsuccinsuccessors:ifsuccnotinself._nodes_set:binary_insert(self._worklist,succ,lambdaelem:self._node_to_index[elem])self._nodes_set.add(succ)
[docs]defrevisit_node(self,node:NodeType)->None:""" Revisit a node in the future. Do not include its successors immediately. :param node: The node to revisit in the future. :return: None """ifnodenotinself._nodes_set:binary_insert(self._worklist,node,lambdaelem:self._node_to_index[elem])self._nodes_set.add(node)
[docs]defreached_fixedpoint(self,node:NodeType)->None:""" Mark a node as reached fixed-point. This node as well as all its successors will not be visited in the future. :param node: The node to mark as reached fixed-point. :return: None """self._reached_fixedpoint.add(node)