Source code for pecanpy.pecanpy

"""Different strategies for generating node2vec walks."""
import numpy as np
from gensim.models import Word2Vec
from numba import njit
from numba import prange
from numba_progress import ProgressBar

from .graph import BaseGraph
from .rw import DenseRWGraph
from .rw import SparseRWGraph
from .typing import Embeddings
from .typing import Float32Array
from .typing import HasNbrs
from .typing import List
from .typing import MoveForward
from .typing import Optional
from .typing import Uint32Array
from .typing import Uint64Array
from .wrappers import Timer

try:
    from numba.np.ufunc.parallel import get_thread_id
except ImportError:  # numba<0.56
    from numba.np.ufunc.parallel import _get_thread_id as get_thread_id


[docs]class Base(BaseGraph): """Base node2vec object. This base object provides the skeleton for the node2vec walk algorithm, which consists of the ``simulate_walks`` method that generate node2vec random walks. In contrast to the original Python implementation of node2vec, it is parallelized where each process generates walks independently. Args: p (float): return parameter, value less than 1 encourages returning back to previous vertex, and discourage for value grater than 1 (default: 1). q (float): in-out parameter, value less than 1 encourages walks to go "outward", and value greater than 1 encourage walking within a localized neighborhood (default: 1) workers (int): number of threads to be spawned for running node2vec including walk generation and word2vec embedding (default: 1) verbose (bool): show progress bar for walk generation. extend (bool): use node2vec+ extension if set to :obj:`True` (default: :obj:`False`). gamma (float): Multiplication factor for the std term of edge weights added to the average edge weights as the noisy edge threshold, only used by node2vec+ (default: 0) random_state (int, optional): Random seed for generating random walks. Note that to fully ensure reproducibility, use single thread (i.e., workers=1), and potentially need to set the Python environment variable ``PYTHONHASHSEED`` to match the random_state (default: :obj:`None`). Note: The ``preprocess_transition_probs`` is required for implementations that precomputes and stores 2nd order transition probabilities. Examples: Generate node2vec embeddings >>> from pecanpy import pecanpy as node2vec >>> >>> # initialize node2vec object, similarly for SparseOTF and DenseOTF >>> g = node2vec.PreComp(p=0.5, q=1, workers=4, verbose=True) >>> # alternatively, can specify ``extend=True`` for using node2vec+ >>> >>> # load graph from edgelist file >>> g.read_edg(path_to_edg_file, weighted=True, directed=False) >>> # precompute and save 2nd order transition probs (for PreComp only) >>> g.preprocess_transition_probs() >>> >>> # generate random walks, which could then be used to train w2v >>> walks = g.simulate_walks(num_walks=10, walk_length=80) >>> >>> # alternatively, generate the embeddings directly using ``embed`` >>> emd = g.embed() """ def __init__( self, p: float = 1, q: float = 1, workers: int = 1, verbose: bool = False, extend: bool = False, gamma: float = 0, random_state: Optional[int] = None, ): super().__init__() self.p = p self.q = q self.workers = workers # TODO: not doing anything, need to fix. self.verbose = verbose self.extend = extend self.gamma = gamma self.random_state = random_state self._preprocessed: bool = False def _map_walk(self, walk_idx_ary: Uint32Array) -> List[str]: """Map walk from node index to node ID. Note: The last element in the ``walk_idx_ary`` encodes the effective walk length. Only walk indices up to the effective walk length are translated (mapped to node IDs). """ end_idx = walk_idx_ary[-1] walk = [self.nodes[i] for i in walk_idx_ary[:end_idx]] return walk
[docs] def simulate_walks( self, num_walks: int, walk_length: int, ) -> List[List[str]]: """Generate walks starting from each nodes ``num_walks`` time. Note: This is the master process that spawns worker processes, where the worker function ``node2vec_walks`` genearte a single random walk starting from a vertex of the graph. Args: num_walks (int): number of walks starting from each node. walks_length (int): length of walk. """ self._preprocess_transition_probs() nodes = np.array(range(self.num_nodes), dtype=np.uint32) start_node_idx_ary = np.concatenate([nodes] * num_walks) tot_num_jobs = start_node_idx_ary.size random_state = self.random_state np.random.seed(random_state) np.random.shuffle(start_node_idx_ary) # for balanced work load move_forward = self.get_move_forward() has_nbrs = self.get_has_nbrs() verbose = self.verbose # Acquire numba progress proxy for displaying the progress bar with ProgressBar(total=tot_num_jobs, disable=not verbose) as progress: walk_idx_mat = self._random_walks( tot_num_jobs, walk_length, random_state, start_node_idx_ary, has_nbrs, move_forward, progress, ) # Map node index back to node ID walks = [self._map_walk(walk_idx_ary) for walk_idx_ary in walk_idx_mat] return walks
@staticmethod @njit(parallel=True, nogil=True) def _random_walks( tot_num_jobs: int, walk_length: int, random_state: Optional[int], start_node_idx_ary: Uint32Array, has_nbrs: HasNbrs, move_forward: MoveForward, progress_proxy: ProgressBar, ) -> Uint32Array: """Simulate a random walk starting from start node.""" # Seed the random number generator if random_state is not None: np.random.seed(random_state + get_thread_id()) # use the last entry of each walk index array to keep track of the # effective walk length walk_idx_mat = np.zeros((tot_num_jobs, walk_length + 2), dtype=np.uint32) walk_idx_mat[:, 0] = start_node_idx_ary # initialize seeds walk_idx_mat[:, -1] = walk_length + 1 # set to full walk length by default for i in prange(tot_num_jobs): # initialize first step as normal random walk start_node_idx = walk_idx_mat[i, 0] if has_nbrs(start_node_idx): walk_idx_mat[i, 1] = move_forward(start_node_idx) else: walk_idx_mat[i, -1] = 1 continue # start bias random walk for j in range(2, walk_length + 1): cur_idx = walk_idx_mat[i, j - 1] if has_nbrs(cur_idx): prev_idx = walk_idx_mat[i, j - 2] walk_idx_mat[i, j] = move_forward(cur_idx, prev_idx) else: walk_idx_mat[i, -1] = j break progress_proxy.update(1) return walk_idx_mat
[docs] def setup_get_normalized_probs(self): """Transition probability computation setup. This function performs necessary preprocessing of computing the average edge weights array, which is used later by the transition probability computation function ``get_extended_normalized_probs``, if node2vec+ is used. Otherwise, returns the normal transition function ``get_noramlized_probs`` with a trivial placeholder for average edge weights array ``noise_thresholds``. """ if self.extend: # use n2v+ get_normalized_probs = self.get_extended_normalized_probs noise_thresholds = self.get_noise_thresholds() else: # use normal n2v get_normalized_probs = self.get_normalized_probs noise_thresholds = None return get_normalized_probs, noise_thresholds
[docs] def preprocess_transition_probs(self): """Null default preprocess method.""" pass
def _preprocess_transition_probs(self): if not self._preprocessed: self.preprocess_transition_probs() self._preprocessed = True
[docs] def embed( self, dim: int = 128, num_walks: int = 10, walk_length: int = 80, window_size: int = 10, epochs: int = 1, verbose: bool = False, ) -> Embeddings: """Generate embeddings. This is a shortcut function that combines ``simulate_walks`` with ``Word2Vec`` to generate the node2vec embedding. Note: The resulting embeddings are aligned with the graph, i.e., the index of embeddings is the same as that for the graph. Args: dim (int): dimension of the final embedding, default is 128 num_walks (int): number of random walks generated using each node as the seed node, default is 10 walk_length (int): length of the random walks, default is 80 window_size (int): context window sized for training the ``Word2Vec`` model, default is 10 epochs (int): number of epochs for training ``Word2Vec``, default is 1 verbose (bool): print time usage for random walk generation and skip-gram training if set to True Return: Embeddings: The embedding matrix, each row is a node embedding vector. The index is the same as that for the graph. """ timed_walk = Timer("generate walks", verbose)(self.simulate_walks) timed_w2v = Timer("train embeddings", verbose)(Word2Vec) walks = timed_walk(num_walks, walk_length) w2v = timed_w2v( walks, vector_size=dim, window=window_size, sg=1, min_count=0, workers=self.workers, epochs=epochs, seed=self.random_state, ) return w2v.wv[self.nodes]
[docs]class FirstOrderUnweighted(Base, SparseRWGraph): """Directly sample edges for first order random walks.""" def __init__(self, *args, **kwargs): Base.__init__(self, *args, **kwargs)
[docs] def get_move_forward(self): """Wrap ``move_forward``.""" indices = self.indices indptr = self.indptr @njit(nogil=True) def move_forward(cur_idx, prev_idx=None): start, end = indptr[cur_idx], indptr[cur_idx + 1] return indices[np.random.randint(start, end)] return move_forward
[docs]class PreCompFirstOrder(Base, SparseRWGraph): """Precompute transition probabilities for first order random walks.""" def __init__(self, *args, **kwargs): Base.__init__(self, *args, **kwargs) self.alias_j = self.alias_q = None
[docs] def get_move_forward(self): """Wrap ``move_forward``.""" indices = self.indices indptr = self.indptr alias_j = self.alias_j alias_q = self.alias_q @njit(nogil=True) def move_forward(cur_idx, prev_idx=None): start, end = indptr[cur_idx], indptr[cur_idx + 1] choice = alias_draw(alias_j[start:end], alias_q[start:end]) return indices[indptr[cur_idx] + choice] return move_forward
[docs] def preprocess_transition_probs(self): """Precompute and store first order transition probabilities.""" data = self.data indices = self.indices indptr = self.indptr # Retrieve transition probability computation callback function get_normalized_probs = self.get_normalized_probs_first_order # Determine the dimensionality of the 1st order transition probs n_nodes = indptr.size - 1 # number of nodes n_probs = indptr[-1] # total number of 1st order transition probs @njit(parallel=True, nogil=True) def compute_all_transition_probs(): alias_j = np.zeros(n_probs, dtype=np.uint32) alias_q = np.zeros(n_probs, dtype=np.float32) for idx in range(n_nodes): start, end = indptr[idx], indptr[idx + 1] probs = get_normalized_probs(data, indices, indptr, idx) alias_j[start:end], alias_q[start:end] = alias_setup(probs) return alias_j, alias_q self.alias_j, self.alias_q = compute_all_transition_probs()
[docs]class PreComp(Base, SparseRWGraph): """Precompute transition probabilities. This implementation precomputes and stores 2nd order transition probabilities first and uses read off transition probabilities during the process of random walk. The graph type used is ``SparseRWGraph``. Note: Need to call ``preprocess_transition_probs()`` first before generating walks. """ def __init__(self, *args, **kwargs): Base.__init__(self, *args, **kwargs) self.alias_dim: Optional[Uint32Array] = None self.alias_j: Optional[Uint32Array] = None self.alias_q: Optional[Float32Array] = None self.alias_indptr: Optional[Uint64Array] = None
[docs] def get_move_forward(self): """Wrap ``move_forward``. This function returns a ``numba.njit`` compiled function that takes current vertex index (and the previous vertex index if available) and returns the next vertex index by sampling from a discrete random distribution based on the transition probabilities that are read off the precomputed transition probabilities table. Note: The returned function is used by the ``simulate_walks`` method. """ data = self.data indices = self.indices indptr = self.indptr p = self.p q = self.q get_normalized_probs = self.get_normalized_probs alias_j = self.alias_j alias_q = self.alias_q alias_indptr = self.alias_indptr alias_dim = self.alias_dim @njit(nogil=True) def move_forward(cur_idx, prev_idx=None): """Move to next node based on transition probabilities.""" if prev_idx is None: normalized_probs = get_normalized_probs( data, indices, indptr, p, q, cur_idx, None, None, ) cdf = np.cumsum(normalized_probs) choice = np.searchsorted(cdf, np.random.random()) else: # Find index of neighbor (previous node) for reading alias start = indptr[cur_idx] end = indptr[cur_idx + 1] nbr_idx = np.searchsorted(indices[start:end], prev_idx) if indices[start + nbr_idx] != prev_idx: print("FATAL ERROR! Neighbor not found.") dim = alias_dim[cur_idx] start = alias_indptr[cur_idx] + dim * nbr_idx end = start + dim choice = alias_draw(alias_j[start:end], alias_q[start:end]) return indices[indptr[cur_idx] + choice] return move_forward
[docs] def preprocess_transition_probs(self): """Precompute and store 2nd order transition probabilities. Each node contains n ** 2 number of 2nd order transition probabilities, where n is the number of neighbors of that specific node, since one can pick any one of its neighbors as the previous node and / or the next node. For each second order transition probability of a node, set up the alias draw table to be used during random walk. Note: Uses uint64 instead of uint32 for tracking alias_indptr to prevent overflowing since the 2nd order transition probs grows much faster than the first order transition probs, which is the same as the total number of edges in the graph. """ data = self.data indices = self.indices indptr = self.indptr p = self.p q = self.q # Retrieve transition probability computation callback function get_normalized_probs, noise_thresholds = self.setup_get_normalized_probs() # Determine the dimensionality of the 2nd order transition probs n_nodes = self.indptr.size - 1 # number of nodes n = self.indptr[1:] - self.indptr[:-1] # number of nbrs per node n2 = np.power(n, 2) # number of 2nd order trans probs per node # Set the dimensionality of alias probability table self.alias_dim = alias_dim = n self.alias_indptr = alias_indptr = np.zeros(self.indptr.size, dtype=np.uint64) alias_indptr[1:] = np.cumsum(n2) n_probs = alias_indptr[-1] # total number of 2nd order transition probs @njit(parallel=True, nogil=True) def compute_all_transition_probs(): alias_j = np.zeros(n_probs, dtype=np.uint32) alias_q = np.zeros(n_probs, dtype=np.float32) for idx in range(n_nodes): offset = alias_indptr[idx] dim = alias_dim[idx] nbrs = indices[indptr[idx] : indptr[idx + 1]] for nbr_idx in prange(n[idx]): nbr = nbrs[nbr_idx] probs = get_normalized_probs( data, indices, indptr, p, q, idx, nbr, noise_thresholds, ) start = offset + dim * nbr_idx end = start + dim alias_j[start:end], alias_q[start:end] = alias_setup(probs) return alias_j, alias_q self.alias_j, self.alias_q = compute_all_transition_probs()
[docs]class SparseOTF(Base, SparseRWGraph): """Sparse graph transition on the fly. This implementation does *NOT* precompute transition probabilities in advance but instead calculates them on-the-fly during the process of random walk. The graph type used is ``SparseRWGraph``. """ def __init__(self, *args, **kwargs): Base.__init__(self, *args, **kwargs)
[docs] def get_move_forward(self): """Wrap ``move_forward``. This function returns a ``numba.njit`` compiled function that takes current vertex index (and the previous vertex index if available) and returns the next vertex index by sampling from a discrete random distribution based on the transition probabilities that are calculated on-the-fly. Note: The returned function is used by the ``simulate_walks`` method. """ data = self.data indices = self.indices indptr = self.indptr p = self.p q = self.q get_normalized_probs, noise_thresholds = self.setup_get_normalized_probs() @njit(nogil=True) def move_forward(cur_idx, prev_idx=None): """Move to next node.""" normalized_probs = get_normalized_probs( data, indices, indptr, p, q, cur_idx, prev_idx, noise_thresholds, ) cdf = np.cumsum(normalized_probs) choice = np.searchsorted(cdf, np.random.random()) return indices[indptr[cur_idx] + choice] return move_forward
[docs]class DenseOTF(Base, DenseRWGraph): """Dense graph transition on the fly. This implementation does *NOT* precompute transition probabilities in advance but instead calculates them on-the-fly during the process of random walk. The graph type used is ``DenseRWGraph``. """ def __init__(self, *args, **kwargs): Base.__init__(self, *args, **kwargs)
[docs] def get_move_forward(self): """Wrap ``move_forward``. This function returns a ``numba.njit`` compiled function that takes current vertex index (and the previous vertex index if available) and returns the next vertex index by sampling from a discrete random distribution based on the transition probabilities that are calculated on-the-fly. Note: The returned function is used by the ``simulate_walks`` method. """ data = self.data nonzero = self.nonzero p = self.p q = self.q get_normalized_probs, noise_thresholds = self.setup_get_normalized_probs() @njit(nogil=True) def move_forward(cur_idx, prev_idx=None): """Move to next node.""" normalized_probs = get_normalized_probs( data, nonzero, p, q, cur_idx, prev_idx, noise_thresholds, ) cdf = np.cumsum(normalized_probs) choice = np.searchsorted(cdf, np.random.random()) nbrs = np.where(nonzero[cur_idx])[0] return nbrs[choice] return move_forward
[docs]@njit(nogil=True) def alias_setup(probs): """Construct alias lookup table. This code is modified from the blog post here: https://lips.cs.princeton.edu/the-alias-method-efficient-sampling-with-many-discrete-outcomes/ , where you can find more details about how the method works. In general, the alias method improves the time complexity of sampling from a discrete random distribution to O(1) if the alias table is setup in advance. Args: probs (list(float32)): normalized transition probabilities array, could be in either list or NDArray, of float32 values. """ k = probs.size q = np.zeros(k, dtype=np.float32) j = np.zeros(k, dtype=np.uint32) smaller = np.zeros(k, dtype=np.uint32) larger = np.zeros(k, dtype=np.uint32) smaller_ptr = 0 larger_ptr = 0 for kk in range(k): q[kk] = k * probs[kk] if q[kk] < 1.0: smaller[smaller_ptr] = kk smaller_ptr += 1 else: larger[larger_ptr] = kk larger_ptr += 1 while (smaller_ptr > 0) & (larger_ptr > 0): smaller_ptr -= 1 small = smaller[smaller_ptr] larger_ptr -= 1 large = larger[larger_ptr] j[small] = large q[large] = q[large] + q[small] - 1.0 if q[large] < 1.0: smaller[smaller_ptr] = large smaller_ptr += 1 else: larger[larger_ptr] = large larger_ptr += 1 return j, q
[docs]@njit(nogil=True) def alias_draw(j, q): """Draw sample from a non-uniform discrete distribution using alias sampling.""" k = j.size kk = np.random.randint(k) if np.random.rand() < q[kk]: return kk else: return j[kk]