Source code for pecanpy.graph

"""Lite graph objects used by pecanpy."""
import warnings

import numpy as np

from .typing import AdjMat
from .typing import AdjNonZeroMat
from .typing import CSR
from .typing import Dict
from .typing import Float32Array
from .typing import Iterator
from .typing import List
from .typing import Optional
from .typing import Sequence
from .typing import Tuple
from .typing import Uint32Array


[docs]class BaseGraph: """Base Graph object. Handles node id and provides general properties including num_nodes, and density. The num_edges property is to be specified by the derived graph objects. """ def __init__(self): self._node_ids: List[str] = [] self._node_idmap: Dict[str, int] = {} # id -> index @property def nodes(self) -> List[str]: """Return the list of node IDs.""" return self._node_ids @property def num_nodes(self) -> int: """Return the number of nodes in the graph.""" return len(self.nodes) @property def num_edges(self) -> int: """Return the number of edges in the graph.""" raise NotImplementedError( f"{self.__class__.__name__} does not have num_edges, use the " f"derived classes like SparseGraph and DenseGraph instead.", ) @property def density(self) -> float: """Return the edge density of the graph.""" return self.num_edges / self.num_nodes / (self.num_nodes - 1)
[docs] def set_node_ids( self, node_ids: Optional[Sequence[str]], implicit_ids: bool = False, num_nodes: Optional[int] = None, ): """Update ID list and mapping. Set _node_ids given the input node IDs and also set the corresponding _node_idmap based on it, which maps from node ID to the index. Args: node_ids (:obj:`list` of :obj:`str`, optional): List of node IDs to use. If not available, will implicitly set node IDs to the canonical ordering of nodes with a warning message, which is suppressed if `implicit_ids` is set to True. implicit_ids (bool): Implicitly set the node IDs to the canonical node ordering. If set to False and node IDs are not available, it will also set implicit node IDs, but with a warning message. The warning message can be suppressed if `implicit_ids` is set to True as a confirmation of the behavior. num_nodes (int, optional): Number of nodes, used when try to set implicit node IDs. """ if (node_ids is not None) and (not implicit_ids): self._node_ids = list(node_ids) elif num_nodes is None: raise ValueError( "Need to specify `num_nodes` when setting implicit node IDs.", ) else: self.set_node_ids(list(map(str, range(num_nodes)))) if not implicit_ids: warnings.warn( "WARNING: Implicitly set node IDs to the canonical node " "ordering due to missing IDs field in the raw CSR npz " "file. This warning message can be suppressed by setting " "implicit_ids to True in the read_npz function call, or " "by setting the --implicit_ids flag in the CLI", stacklevel=2, ) self._node_idmap = {j: i for i, j in enumerate(self._node_ids)}
[docs] def get_has_nbrs(self): """Abstract method to be specified by derived classes.""" raise NotImplementedError
[docs] def get_move_forward(self): """Abstract method to be specified by derived classes.""" raise NotImplementedError
[docs]class AdjlstGraph(BaseGraph): """Adjacency list Graph object used for reading/writing edge list files. Sparse Graph object that stores graph as adjacency list. Note: AdjlstGraph is only used for reading/writing edge list files and do not support random walk computations since Numba njit do not work with Python data structures like list and dict. Examples: Read ``.edg`` file and create ``SparseGraph`` object using ``.read_edg`` method. >>> from pecanpy.graph import AdjlstGraph >>> >>> # initialize SparseGraph object >>> g = AdjlstGraph() >>> >>> # read graph from edgelist >>> g.read(path_to_edg_file, weighted=True, directed=False) >>> >>> indptr, indices, data = g.to_csr() # convert to csr >>> >>> dense_mat = g.to_dense() # convert to dense adjacency matrix >>> >>> g.save(edg_outpath) # save the graph to an edge list file """ def __init__(self): super().__init__() self._data: List[Dict[int, float]] = [] # list of nbrs idx -> weights self._num_edges: int = 0 @property def edges_iter(self) -> Iterator[Tuple[int, int, float]]: """Return an iterator that iterates over all edges.""" for head, head_nbrs in enumerate(self._data): for tail in sorted(head_nbrs): yield head, tail, head_nbrs[tail] @property def edges(self) -> List[Tuple[int, int, float]]: """Return a list of triples (head, tail, weight) representing edges.""" return list(self.edges_iter) @property def num_edges(self): """Return the number of edges in the graph.""" return self._num_edges @staticmethod def _read_edge_line( edge_line: str, weighted: bool, delimiter: str, ) -> Tuple[str, str, float]: """Read a line from the edge list file.""" terms = edge_line.strip().split(delimiter) id1, id2 = terms[0].strip(), terms[1].strip() weight = 1.0 if weighted: if len(terms) != 3: raise ValueError( f"Expecting three columns in the edge list file for a " f"weighted graph, got {len(terms)} instead: {edge_line!r}", ) weight = float(terms[-1]) return id1, id2, weight @staticmethod def _is_valid_edge_weight(id1: str, id2: str, weight: float) -> bool: """Check if the edge weight is non-negative.""" if weight <= 0: edg_str = f"w({id1},{id2}) = {weight}" warnings.warn( f"Non-positive edge ignored: {edg_str}", RuntimeWarning, stacklevel=2, ) return False return True def _check_edge_existence( self, id1: str, id2: str, idx1: int, idx2: int, weight: float, ): """Check if an edge exists. If the edge to be added already exists and the new edge weight is different from the existing edge weights, print warning message. """ if idx2 in self._data[idx1] and self._data[idx1][idx2] != weight: warnings.warn( f"edge from {id1} to {id2} exists, with " f"value of {self._data[idx1][idx2]:.2f}. " f"Now overwrite to {weight:.2f}.", RuntimeWarning, stacklevel=2, )
[docs] def get_node_idx(self, node_id: str) -> int: """Get index of the node and create new node when necessary.""" self.add_node(node_id) return self._node_idmap[node_id]
[docs] def add_node(self, node_id: str): """Create a new node. Add a new node to the graph if not already existing, by updating the ID list, ID map, and the adjacency list data. Otherwise pass through without further actions. Note: Does not raise error even if the node alrealy exists. """ if node_id not in self._node_idmap: self._node_idmap[node_id] = self.num_nodes self.nodes.append(node_id) self._data.append({})
def _add_edge_from_idx(self, idx1: int, idx2: int, weight: float): """Add an edge based on the head and tail node index with weight.""" self._data[idx1][idx2] = weight self._num_edges += 1
[docs] def add_edge( self, id1: str, id2: str, weight: float = 1.0, directed: bool = False, ): """Add an edge to the graph. Note: Non-positive edges are ignored. Args: id1 (str): first node id. id2 (str): second node id. weight (float): the edge weight, default is 1.0 directed (bool): whether the edge is directed or not. """ if self._is_valid_edge_weight(id1, id2, weight): idx1, idx2 = map(self.get_node_idx, (id1, id2)) self._check_edge_existence(id1, id2, idx1, idx2, weight) self._add_edge_from_idx(idx1, idx2, weight) if not directed: self._add_edge_from_idx(idx2, idx1, weight)
[docs] def read( self, path: str, weighted: bool, directed: bool, delimiter: str = "\t", ): """Read an edgelist file and create sparse graph. Note: Implicitly discard zero weighted edges; if the same edge is defined multiple times with different edge weights, then the last specified weight will be used (warning for such behavior will be printed). Args: path (str): path to edgelist file, where the file is tab separated and contains 2 or 3 columns depending on whether the input graph is weighted, where the the first column contains the source nodes and the second column contains the destination nodes that interact with the corresponding source nodes. weighted (bool): whether the graph is weighted. If unweighted, only two columns are expected in the edgelist file, and the edge weights are implicitly set to 1 for all interactions. If weighted, a third column encoding the weight of the interaction in numeric value is expected. directed (bool): whether the graph is directed, if undirected, the edge connecting from destination node to source node is created with same edge weight from source node to destination node. delimiter (str): delimiter of the edge list file, default is tab. """ with open(path) as f: for edge_line in f: edge = self._read_edge_line(edge_line, weighted, delimiter) self.add_edge(*edge, directed)
[docs] def save(self, path: str, unweighted: bool = False, delimiter: str = "\t"): """Save AdjLst as an ``.edg`` edge list file. Args: unweighted (bool): If set to True, only write two columns, corresponding to the head and tail nodes of the edges, and ignore the edge weights (default: :obj:`False`). delimiter (str): Delimiter for separating fields. """ with open(path, "w") as f: for h, t, w in self.edges_iter: h_id, t_id = self.nodes[h], self.nodes[t] terms = (h_id, t_id) if unweighted else (h_id, t_id, str(w)) f.write(f"{delimiter.join(terms)}\n")
[docs] def to_csr(self) -> CSR: """Construct compressed sparse row matrix.""" indptr = np.zeros(len(self.nodes) + 1, dtype=np.uint32) for i, row_data in enumerate(self._data): indptr[i + 1] = indptr[i] + len(row_data) # last element of indptr indicates the total number of nonzero entries indices = np.zeros(indptr[-1], dtype=np.uint32) data = np.zeros(indptr[-1], dtype=np.float32) for i, nbrs in enumerate(self._data): if len(nbrs) == 0: continue new_indices, new_data = zip(*[(j, nbrs[j]) for j in sorted(nbrs)]) chunk = slice(indptr[i], indptr[i + 1]) indices[chunk] = np.array(new_indices, dtype=np.uint32) data[chunk] = np.array(new_data, dtype=np.float32) return indptr, indices, data
[docs] def to_dense(self) -> AdjMat: """Construct dense adjacency matrix. Note: This method does not return a DenseGraph object, but instead returns a dense adjacency matrix as NDArray, where the index is the same as that of ``nodes``. Return: NDArray: Full adjacency matrix as 2d numpy array. """ n_nodes = len(self.nodes) mat = np.zeros((n_nodes, n_nodes)) for src_node, src_nbrs in enumerate(self._data): for dst_node in src_nbrs: mat[src_node, dst_node] = src_nbrs[dst_node] return mat
[docs] @classmethod def from_mat(cls, adj_mat: AdjMat, node_ids: List[str], **kwargs): """Construct graph using adjacency matrix and node IDs. Args: adj_mat(NDArray): 2D numpy array of adjacency matrix node_ids(:obj:`list` of str): node ID list Return: An adjacency graph object representing the adjacency matrix. """ g = cls(**kwargs) # Setup node idmap in the order of node_ids for node_id in node_ids: g.add_node(node_id) # Fill in edge data for idx1, idx2 in zip(*np.where(adj_mat != 0)): g._add_edge_from_idx(idx1, idx2, adj_mat[idx1, idx2]) return g
[docs]class SparseGraph(BaseGraph): """Sparse Graph object that stores graph as adjacency list. Examples: Read ``.edg`` file and create ``SparseGraph`` object using ``.read_edg`` method. >>> from pecanpy.graph import SparseGraph >>> >>> # initialize SparseGraph object >>> g = SparseGraph() >>> >>> # read graph from edgelist >>> g.read_edg(path_to_edg_file, weighted=True, directed=False) >>> >>> # save the csr graph as npz file to be used later >>> g.save(npz_outpath) """ def __init__(self): super().__init__() self.data: Optional[Float32Array] = None self.indptr: Optional[Uint32Array] = None self.indices: Optional[Uint32Array] = None @property def num_edges(self) -> int: """Return the number of edges in the graph.""" if self.indptr is not None: return self.indptr[-1] else: raise ValueError("Empty graph.")
[docs] def read_edg( self, path: str, weighted: bool, directed: bool, delimiter: str = "\t", ): """Create CSR sparse graph from edge list. First create ``AdjlstGraph`` by reading the edge list file, and then convert to ``SparseGraph`` via ``to_csr``. Args: path (str): path to edgelist file. weighted (bool): whether the graph is weighted. directed (bool): whether the graph is directed. delimiter (str): delimiter used between node IDs. """ g = AdjlstGraph() g.read(path, weighted, directed, delimiter) self.set_node_ids(g.nodes) self.indptr, self.indices, self.data = g.to_csr()
[docs] def read_npz(self, path: str, weighted: bool, implicit_ids: bool = False): """Directly read a CSR sparse graph. Note: To generate a CSR file compatible with PecanPy, first load the graph as a sparse graph using the SparseGraph (with ``csr=True``). Then save the sparse graph to a csr file using the ``save`` method from ``SparseGraph``. The saved ``.npz`` file can then be loaded directly by ``SparseGraph`` later. Args: path (str): path to the csr file, which is an npz file with four arrays with keys 'IDs', 'data', 'indptr', 'indices', which correspond to the node IDs, the edge weights, the offset array for each node, and the indices of the edges. weighted (bool): whether the graph is weighted, if unweighted, all edge weights will be converted to 1. directed (bool): not used, for compatibility with ``SparseGraph``. implicit_ids (bool): Implicitly set the node IDs to the canonical node ordering from the CSR graph. If unset and the `IDs` field is not found in the input CSR graph, a warning message will be displayed on screen. The missing `IDs` field can happen, for example, when the user uses the CSR graph prepared by `scipy.sparse.csr`. """ raw = np.load(path) self.indptr = raw["indptr"].astype(np.uint32) self.indices = raw["indices"].astype(np.uint32) self.data = raw["data"].astype(np.float32) if self.data is None: raise ValueError("Adjacency matrix data not found.") elif not weighted: self.data[:] = 1.0 # overwrite edge weights with constant self.set_node_ids( raw.get("IDs"), implicit_ids=implicit_ids, num_nodes=int(self.indptr.size - 1), )
[docs] def save(self, path: str): """Save CSR as ``.csr.npz`` file.""" np.savez( path, IDs=self.nodes, data=self.data, indptr=self.indptr, indices=self.indices, )
[docs] @classmethod def from_adjlst_graph(cls, adjlst_graph, **kwargs): """Construct csr graph from adjacency list graph. Args: adjlst_graph (:obj:`pecanpy.graph.AdjlstGraph`): Adjacency list graph to be converted. """ g = cls(**kwargs) g.set_node_ids(adjlst_graph.nodes) g.indptr, g.indices, g.data = adjlst_graph.to_csr() return g
[docs] @classmethod def from_mat(cls, adj_mat: AdjMat, node_ids: List[str], **kwargs): """Construct csr graph using adjacency matrix and node IDs. Note: Only consider positive valued edges. Args: adj_mat(NDArray): 2D numpy array of adjacency matrix node_ids(:obj:`list` of str): node ID list """ g = cls(**kwargs) g.set_node_ids(node_ids) adjlst_graph = AdjlstGraph.from_mat(adj_mat, node_ids) g.indptr, g.indices, g.data = adjlst_graph.to_csr() return g
[docs]class DenseGraph(BaseGraph): """Dense Graph object that stores graph as array. Examples: Read ``.npz`` files and create ``DenseGraph`` object using ``read_npz`` >>> from pecanpy.graph import DenseGraph >>> >>> g = DenseGraph() # initialize DenseGraph object >>> >>> g.read_npz(paht_to_npz_file, weighted=True, directed=False) Read ``.edg`` files and create ``DenseGraph`` object using ``read_edg`` >>> from pecanpy.graph import DenseGraph >>> >>> # initialize DenseGraph object >>> g = DenseGraph() >>> >>> # read graph from edgelist >>> g.read_edg(path_to_edg_file, weighted=True, directed=False) >>> >>> # save the dense graph as npz file to be used later >>> g.save(npz_outpath) """ def __init__(self): super().__init__() self._data: Optional[AdjMat] = None self._nonzero: Optional[AdjNonZeroMat] = None @property def num_edges(self) -> int: """Return the number of edges in the graph.""" if self.nonzero is not None: return self.nonzero.sum() else: raise ValueError("Empty graph.") @property def data(self) -> Optional[AdjMat]: """Return the adjacency matrix.""" return self._data @data.setter def data(self, data: AdjMat): """Set adjacency matrix and the corresponding nonzero matrix.""" self._data = data.astype(float) self._nonzero = np.array(self._data != 0, dtype=bool) @property def nonzero(self) -> Optional[AdjNonZeroMat]: """Return the nonzero mask for the adjacency matrix.""" return self._nonzero
[docs] def read_npz(self, path: str, weighted: bool, implicit_ids: bool = False): """Read ``.npz`` file and create dense graph. Args: path (str): path to ``.npz`` file. weighted (bool): whether the graph is weighted, if unweighted, all none zero weights will be converted to 1. implicit_ids (bool): Implicitly set the node IDs to the canonical ordering from the dense adjacency matrix object. If unset and the `IDs` field is not found in the object, a warning message will be displayed on screen. This warning message can be suppressed if `implicit_ids` is set to True as a confirmation of the behavior. """ raw = np.load(path) self.data = raw["data"] if not weighted: # overwrite edge weights with constant self.data = self.nonzero * 1.0 # type: ignore self.set_node_ids( raw.get("IDs"), implicit_ids=implicit_ids, num_nodes=self.data.shape[0], )
[docs] def read_edg( self, path: str, weighted: bool, directed: bool, delimiter: str = "\t", ): """Read an edgelist file and construct dense graph.""" g = AdjlstGraph() g.read(path, weighted, directed, delimiter) self.set_node_ids(g.nodes) self.data = g.to_dense()
[docs] def save(self, path: str): """Save dense graph as ``.dense.npz`` file.""" np.savez(path, data=self.data, IDs=self.nodes)
[docs] @classmethod def from_adjlst_graph(cls, adjlst_graph, **kwargs): """Construct dense graph from adjacency list graph. Args: adjlst_graph (:obj:`pecanpy.graph.AdjlstGraph`): Adjacency list graph to be converted. """ g = cls(**kwargs) g.set_node_ids(adjlst_graph.nodes) g.data = adjlst_graph.to_dense() return g
[docs] @classmethod def from_mat(cls, adj_mat: AdjMat, node_ids: List[str], **kwargs): """Construct dense graph using adjacency matrix and node IDs. Args: adj_mat(NDArray): 2D numpy array of adjacency matrix node_ids(:obj:`list` of str): node ID list """ g = cls(**kwargs) g.data = adj_mat g.set_node_ids(node_ids) return g