Source code for torchdr.neighbor_embedding.base

"""Base classes for Neighbor Embedding methods."""

# Author: Hugues Van Assel <>
# License: BSD 3-Clause License

import warnings

import torch

from torchdr.affinity import (
from torchdr.affinity_matcher import AffinityMatcher
from torchdr.utils import OPTIMIZERS, cross_entropy_loss

[docs] class NeighborEmbedding(AffinityMatcher): r"""Solves the neighbor embedding problem. It amounts to solving: .. math:: \min_{\mathbf{Z}} \: - \lambda \sum_{ij} P_{ij} \log Q_{ij} + \mathcal{L}_{\mathrm{rep}}( \mathbf{Q}) where :math:`\mathbf{P}` is the input affinity matrix, :math:`\mathbf{Q}` is the output affinity matrix, :math:`\mathcal{L}_{\mathrm{rep}}` is the repulsive term of the loss function, :math:`\lambda` is the :attr:`early_exaggeration_coeff` parameter. Note that the early exaggeration coefficient :math:`\lambda` is set to :math:`1` after the early exaggeration phase which duration is controlled by the :attr:`early_exaggeration_iter` parameter. Parameters ---------- affinity_in : Affinity The affinity object for the input space. affinity_out : Affinity The affinity object for the output embedding space. kwargs_affinity_out : dict, optional Additional keyword arguments for the affinity_out method. n_components : int, optional Number of dimensions for the embedding. Default is 2. lr : float or 'auto', optional Learning rate for the optimizer. Default is 1e0. optimizer : str or 'auto', optional Optimizer to use for the optimization. Default is "Adam". optimizer_kwargs : dict or 'auto', optional Additional keyword arguments for the optimizer. scheduler : str, optional Learning rate scheduler. Default is "constant". scheduler_kwargs : dict, optional Additional keyword arguments for the scheduler. min_grad_norm : float, optional Tolerance for stopping criterion. Default is 1e-7. max_iter : int, optional Maximum number of iterations. Default is 2000. init : str, optional Initialization method for the embedding. Default is "pca". init_scaling : float, optional Scaling factor for the initial embedding. Default is 1e-4. device : str, optional Device to use for computations. Default is "auto". backend : {"keops", "faiss", None}, optional Which backend to use for handling sparsity and memory efficiency. Default is None. verbose : bool, optional Verbosity of the optimization process. Default is False. random_state : float, optional Random seed for reproducibility. Default is None. early_exaggeration_coeff : float, optional Coefficient for the attraction term during the early exaggeration phase. Default is 1.0. early_exaggeration_iter : int, optional Number of iterations for early exaggeration. Default is None. """ # noqa: E501 def __init__( self, affinity_in: Affinity, affinity_out: Affinity, kwargs_affinity_out: dict = {}, n_components: int = 2, lr: float | str = 1e0, optimizer: str = "Adam", optimizer_kwargs: dict | str = None, scheduler: str = "constant", scheduler_kwargs: dict = None, min_grad_norm: float = 1e-7, max_iter: int = 2000, init: str = "pca", init_scaling: float = 1e-4, device: str = "auto", backend: str = None, verbose: bool = False, random_state: float = None, early_exaggeration_coeff: float = 1.0, early_exaggeration_iter: int = None, **kwargs, ): super().__init__( affinity_in=affinity_in, affinity_out=affinity_out, kwargs_affinity_out=kwargs_affinity_out, n_components=n_components, optimizer=optimizer, optimizer_kwargs=optimizer_kwargs, lr=lr, scheduler=scheduler, scheduler_kwargs=scheduler_kwargs, min_grad_norm=min_grad_norm, max_iter=max_iter, init=init, init_scaling=init_scaling, device=device, backend=backend, verbose=verbose, random_state=random_state, ) self.early_exaggeration_coeff = early_exaggeration_coeff self.early_exaggeration_iter = early_exaggeration_iter # improve consistency with the sklearn API if "learning_rate" in kwargs: = kwargs["learning_rate"] if "min_grad_norm" in kwargs: self.min_grad_norm = kwargs["min_grad_norm"] if "early_exaggeration" in kwargs: self.early_exaggeration_coeff = kwargs["early_exaggeration"] def _additional_updates(self, step): if ( # stop early exaggeration phase self.early_exaggeration_coeff_ > 1 and step == self.early_exaggeration_iter ): self.early_exaggeration_coeff_ = 1 # reinitialize optim self._set_learning_rate() self._set_optimizer() self._set_scheduler() return self def _check_n_neighbors(self, n): param_list = ["perplexity", "n_neighbors"] for param_name in param_list: if hasattr(self, param_name): param_value = getattr(self, param_name) if n <= param_value: if self.verbose: warnings.warn( "[TorchDR] WARNING : Number of samples is smaller than " f"{param_name} ({n} <= {param_value}), setting " f"{param_name} to {n // 2} (which corresponds to n//2)." ) new_value = n // 2 setattr(self, param_name + "_", new_value) setattr(self.affinity_in, param_name, new_value) return self def _fit(self, X: torch.Tensor): self._check_n_neighbors(X.shape[0]) self.early_exaggeration_coeff_ = ( self.early_exaggeration_coeff ) # early_exaggeration_ may change during the optimization super()._fit(X) def _loss(self): raise NotImplementedError("[TorchDR] ERROR : _loss method must be implemented.") def _set_learning_rate(self): if == "auto": if self.optimizer not in ["auto", "SGD"]: if self.verbose: warnings.warn( "[TorchDR] WARNING : when 'auto' is used for the learning " "rate, the optimizer should be 'SGD'." ) # from the sklearn TSNE implementation self.lr_ = max(self.n_samples_in_ / self.early_exaggeration_coeff_ / 4, 50) else: self.lr_ = def _set_optimizer(self): optimizer = "SGD" if self.optimizer == "auto" else self.optimizer # from the sklearn TSNE implementation if self.optimizer_kwargs == "auto": if self.optimizer == "SGD": if self.early_exaggeration_coeff_ > 1: optimizer_kwargs = {"momentum": 0.5} else: optimizer_kwargs = {"momentum": 0.8} else: optimizer_kwargs = {} else: optimizer_kwargs = self.optimizer_kwargs self.optimizer_ = OPTIMIZERS[optimizer]( self.params_, lr=self.lr_, **(optimizer_kwargs or {}) ) return self.optimizer_ def _set_scheduler(self): if self.early_exaggeration_coeff_ > 1: n_iter = min(self.early_exaggeration_iter, self.max_iter) else: n_iter = self.max_iter - self.early_exaggeration_iter super()._set_scheduler(n_iter)
[docs] class SparseNeighborEmbedding(NeighborEmbedding): r"""Solves the neighbor embedding problem with a sparse input affinity matrix. It amounts to solving: .. math:: \min_{\mathbf{Z}} \: - \lambda \sum_{ij} P_{ij} \log Q_{ij} + \mathcal{L}_{\mathrm{rep}}( \mathbf{Q}) where :math:`\mathbf{P}` is the input affinity matrix, :math:`\mathbf{Q}` is the output affinity matrix, :math:`\mathcal{L}_{\mathrm{rep}}` is the repulsive term of the loss function, :math:`\lambda` is the :attr:`early_exaggeration_coeff` parameter. **Fast attraction.** This class should be used when the input affinity matrix is a :class:`~torchdr.SparseLogAffinity` and the output affinity matrix is an :class:`~torchdr.UnnormalizedAffinity`. In such cases, the attractive term can be computed with linear complexity. Parameters ---------- affinity_in : Affinity The affinity object for the input space. affinity_out : Affinity The affinity object for the output embedding space. kwargs_affinity_out : dict, optional Additional keyword arguments for the affinity_out method. n_components : int, optional Number of dimensions for the embedding. Default is 2. lr : float or 'auto', optional Learning rate for the optimizer. Default is 1e0. optimizer : str or 'auto', optional Optimizer to use for the optimization. Default is "Adam". optimizer_kwargs : dict or 'auto', optional Additional keyword arguments for the optimizer. scheduler : str, optional Learning rate scheduler. Default is "constant". scheduler_kwargs : dict, optional Additional keyword arguments for the scheduler. min_grad_norm : float, optional Tolerance for stopping criterion. Default is 1e-7. max_iter : int, optional Maximum number of iterations. Default is 2000. init : str, optional Initialization method for the embedding. Default is "pca". init_scaling : float, optional Scaling factor for the initial embedding. Default is 1e-4. device : str, optional Device to use for computations. Default is "auto". backend : {"keops", "faiss", None}, optional Which backend to use for handling sparsity and memory efficiency. Default is None. verbose : bool, optional Verbosity of the optimization process. Default is False. random_state : float, optional Random seed for reproducibility. Default is None. early_exaggeration_coeff : float, optional Coefficient for the attraction term during the early exaggeration phase. Default is 1.0. early_exaggeration_iter : int, optional Number of iterations for early exaggeration. Default is None. """ # noqa: E501 def __init__( self, affinity_in: Affinity, affinity_out: Affinity, kwargs_affinity_out: dict = {}, n_components: int = 2, lr: float | str = 1e0, optimizer: str = "Adam", optimizer_kwargs: dict | str = None, scheduler: str = "constant", scheduler_kwargs: dict = None, min_grad_norm: float = 1e-7, max_iter: int = 2000, init: str = "pca", init_scaling: float = 1e-4, device: str = "auto", backend: str = None, verbose: bool = False, random_state: float = None, early_exaggeration_coeff: float = 1.0, early_exaggeration_iter: int = None, ): # check affinity affinity_in if not isinstance(affinity_in, SparseLogAffinity): raise NotImplementedError( "[TorchDR] ERROR : when using SparseNeighborEmbedding, affinity_in " "must be a sparse affinity." ) # check affinity affinity_out if not isinstance(affinity_out, UnnormalizedAffinity): raise NotImplementedError( "[TorchDR] ERROR : when using SparseNeighborEmbedding, affinity_out " "must be an UnnormalizedAffinity object." ) super().__init__( affinity_in=affinity_in, affinity_out=affinity_out, kwargs_affinity_out=kwargs_affinity_out, n_components=n_components, optimizer=optimizer, optimizer_kwargs=optimizer_kwargs, lr=lr, scheduler=scheduler, scheduler_kwargs=scheduler_kwargs, min_grad_norm=min_grad_norm, max_iter=max_iter, init=init, init_scaling=init_scaling, device=device, backend=backend, verbose=verbose, random_state=random_state, early_exaggeration_coeff=early_exaggeration_coeff, early_exaggeration_iter=early_exaggeration_iter, ) def _attractive_loss(self): if isinstance(self.affinity_out, UnnormalizedLogAffinity): log_Q = self.affinity_out( self.embedding_, log=True, indices=self.NN_indices_ ) return cross_entropy_loss(self.PX_, log_Q, log=True) else: Q = self.affinity_out(self.embedding_, indices=self.NN_indices_) return cross_entropy_loss(self.PX_, Q) def _repulsive_loss(self): raise NotImplementedError( "[TorchDR] ERROR : _repulsive_loss method must be implemented." ) def _loss(self): loss = ( self.early_exaggeration_coeff_ * self._attractive_loss() + self._repulsive_loss() ) return loss
[docs] class SampledNeighborEmbedding(SparseNeighborEmbedding): r"""Solves the neighbor embedding problem with both sparsity and sampling. It amounts to solving: .. math:: \min_{\mathbf{Z}} \: - \lambda \sum_{ij} P_{ij} \log Q_{ij} + \mathcal{L}_{\mathrm{rep}}( \mathbf{Q}) where :math:`\mathbf{P}` is the input affinity matrix, :math:`\mathbf{Q}` is the output affinity matrix, :math:`\mathcal{L}_{\mathrm{rep}}` is the repulsive term of the loss function, :math:`\lambda` is the :attr:`early_exaggeration_coeff` parameter. **Fast attraction.** This class should be used when the input affinity matrix is a :class:`~torchdr.SparseLogAffinity` and the output affinity matrix is an :class:`~torchdr.UnnormalizedAffinity`. In such cases, the attractive term can be computed with linear complexity. **Fast repulsion.** A stochastic estimation of the repulsive term is used to reduce its complexity to linear. This is done by sampling a fixed number of negative samples :attr:`n_negatives` for each point. Parameters ---------- affinity_in : Affinity The affinity object for the input space. affinity_out : Affinity The affinity object for the output embedding space. kwargs_affinity_out : dict, optional Additional keyword arguments for the affinity_out method. n_components : int, optional Number of dimensions for the embedding. Default is 2. lr : float or 'auto', optional Learning rate for the optimizer. Default is 1e0. optimizer : str or 'auto', optional Optimizer to use for the optimization. Default is "Adam". optimizer_kwargs : dict or 'auto', optional Additional keyword arguments for the optimizer. scheduler : str, optional Learning rate scheduler. Default is "constant". scheduler_kwargs : dict, optional Additional keyword arguments for the scheduler. min_grad_norm : float, optional Tolerance for stopping criterion. Default is 1e-7. max_iter : int, optional Maximum number of iterations. Default is 2000. init : str, optional Initialization method for the embedding. Default is "pca". init_scaling : float, optional Scaling factor for the initial embedding. Default is 1e-4. device : str, optional Device to use for computations. Default is "auto". backend : {"keops", "faiss", None}, optional Which backend to use for handling sparsity and memory efficiency. Default is None. verbose : bool, optional Verbosity of the optimization process. Default is False. random_state : float, optional Random seed for reproducibility. Default is None. early_exaggeration_coeff : float, optional Coefficient for the attraction term during the early exaggeration phase. Default is 1.0. early_exaggeration_iter : int, optional Number of iterations for early exaggeration. Default is None. n_negatives : int, optional Number of negative samples for the repulsive loss. """ # noqa: E501 def __init__( self, affinity_in: Affinity, affinity_out: Affinity, kwargs_affinity_out: dict = {}, n_components: int = 2, optimizer: str = "Adam", optimizer_kwargs: dict = None, lr: float | str = 1e0, scheduler: str = "constant", scheduler_kwargs: dict | str = None, min_grad_norm: float = 1e-7, max_iter: int = 2000, init: str = "pca", init_scaling: float = 1e-4, device: str = "auto", backend: str = None, verbose: bool = False, random_state: float = None, early_exaggeration_coeff: float = 1.0, early_exaggeration_iter: int = None, n_negatives: int = 5, ): self.n_negatives = n_negatives super().__init__( affinity_in=affinity_in, affinity_out=affinity_out, kwargs_affinity_out=kwargs_affinity_out, n_components=n_components, optimizer=optimizer, optimizer_kwargs=optimizer_kwargs, lr=lr, scheduler=scheduler, scheduler_kwargs=scheduler_kwargs, min_grad_norm=min_grad_norm, max_iter=max_iter, init=init, init_scaling=init_scaling, device=device, backend=backend, verbose=verbose, random_state=random_state, early_exaggeration_coeff=early_exaggeration_coeff, early_exaggeration_iter=early_exaggeration_iter, ) def _sample_negatives(self, discard_NNs=False): # Negatives are all other points except NNs (if discard_NNs) and point itself n_possible_negatives = self.n_samples_in_ - 1 # Exclude the self-index discard_NNs_ = discard_NNs and self.NN_indices_ is not None if discard_NNs_: n_possible_negatives -= self.NN_indices_.shape[-1] # Exclude the NNs if not hasattr(self, "n_negatives_"): if self.n_negatives > n_possible_negatives: if self.verbose: warnings.warn( "[TorchDR] WARNING: n_negatives is too large. " "Setting n_negatives to the difference between the number of " "samples and the number of neighbors." ) self.n_negatives_ = n_possible_negatives else: self.n_negatives_ = self.n_negatives device = getattr(self.NN_indices_, "device", "cpu") indices = torch.randint( 1, n_possible_negatives, (self.n_samples_in_, self.n_negatives_), device=device, ) exclude_indices = ( torch.arange(self.n_samples_in_).unsqueeze(1).to(device=device) ) # Self indices if discard_NNs_: exclude_indices = ( exclude_indices, self.NN_indices_, ), # Concatenate self and NNs dim=1, ) # Adjusts sampled indices to take into account excluded indices exclude_indices.sort(axis=1) adjustments = torch.searchsorted(exclude_indices, indices, right=True) indices += adjustments return indices