Source code for torchdr.affinity.entropic

# -*- coding: utf-8 -*-
"""Affinity matrices with entropic constraints."""

# Author: Hugues Van Assel <vanasselhugues@gmail.com>
#         Titouan Vayer <titouan.vayer@inria.fr>
#         Rémi Flamary <remi.flamary@polytechnique.edu>
#
# License: BSD 3-Clause License

import torch
import numpy as np
from tqdm import tqdm
import warnings
import contextlib
import math
from typing import Tuple

from torchdr.utils import (
    entropy,
    false_position,
    wrap_vectors,
    sum_matrix_vector,
    kmin,
    kmax,
    check_NaNs,
    logsumexp_red,
    batch_transpose,
    OPTIMIZERS,
)
from torchdr.affinity.base import LogAffinity, SparseLogAffinity


@wrap_vectors
def _log_Pe(C, eps):
    r"""Return the log of the unnormalized directed entropic affinity.

    Parameters
    ----------
    C : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        Pairwise distance matrix.
    eps : torch.Tensor of shape (n) or (n_batch, batch_size)
        Dual variable of the entropic constraint.

    Returns
    -------
    log_P : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        The log of the unnormalized affinity matrix.
    """
    return -C / eps


@wrap_vectors
def _log_Pse(C, eps, mu, eps_square=False):
    r"""Return the log of the symmetric entropic affinity matrix.

    Parameters
    ----------
    C : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        Pairwise distance matrix.
    eps : torch.Tensor of shape (n) or (n_batch, batch_size)
        Dual variable of the entropic constraint.
    mu : torch.Tensor of shape (n) or (n_batch, batch_size)
        Dual variable of the normalization constraint.
    eps_square : bool, optional
        Whether to optimize on the square of the dual variables.

    Returns
    -------
    log_P : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        The log of the unnormalized affinity matrix.
    """
    _eps = eps**2 if eps_square else eps
    mu_t = batch_transpose(mu)
    _eps_t = batch_transpose(_eps)
    return (mu + mu_t - 2 * C) / (_eps + _eps_t)


@wrap_vectors
def _log_Pds(log_K, dual):
    r"""Return the log of the doubly stochastic normalization of log_K (in log domain).

    Parameters
    ----------
    log_K : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        Log of the base kernel.
    dual : torch.Tensor of shape (n) or (n_batch, batch_size)
        Dual variable of the normalization constraint.

    Returns
    -------
    log_P : torch.Tensor or pykeops.torch.LazyTensor of shape (n, n)
        or shape (n_batch, batch_size, batch_size)
        The log of the doubly stochastic normalization of log_K.
    """
    dual_t = batch_transpose(dual)
    return dual + dual_t + log_K


def _bounds_entropic_affinity(C, perplexity):
    r"""Compute the bounds derived in [4]_ for the entropic affinity root.

    Parameters
    ----------
    C : tensor or lazy tensor of shape (n_samples, n_samples)
        or (n_samples, k) if sparsity is used
        Distance matrix between the samples.
    perplexity : float
        Perplexity parameter, related to the number of 'effective' nearest neighbors.

    Returns
    -------
    begin : tensor of shape (n_samples)
        Lower bound of the root.
    end : tensor of shape (n_samples)
        Upper bound of the root.

    References
    ----------
    .. [4]  Max Vladymyrov, Miguel A. Carreira-Perpinan (2013).
            Entropic Affinities: Properties and Efficient Numerical Computation.
            International Conference on Machine Learning (ICML).
    """
    # we use the same notations as in [4] for clarity purposes
    N = C.shape[0]

    # solve a unique 1D root finding problem
    def find_p1(x):
        return np.log(np.min([np.sqrt(2 * N), perplexity])) - 2 * (1 - x) * np.log(
            N / (2 * (1 - x))
        )

    begin = 3 / 4
    end = 1 - 1e-6
    p1 = false_position(
        f=find_p1, n=1, begin=begin, end=end, max_iter=1000, tol=1e-6, verbose=False
    ).item()

    # retrieve greatest and smallest pairwise distances
    dN = kmax(C, k=1, dim=1)[0].squeeze()
    d12 = kmin(C, k=2, dim=1)[0]
    d1 = d12[:, 0]
    d2 = d12[:, 1]
    Delta_N = dN - d1
    Delta_2 = d2 - d1

    # compute bounds derived in [4]
    beta_L = torch.max(
        (N * np.log(N / perplexity)) / ((N - 1) * Delta_N),
        torch.sqrt(np.log(N / perplexity) / (dN**2 - d1**2)),
    )
    beta_U = (1 / Delta_2) * np.log((N - 1) * p1 / (1 - p1))

    # convert to our notations
    begin = 1 / beta_U
    end = 1 / beta_L

    return begin, end


def _check_perplexity(perplexity, n, verbose=True):
    r"""Check the perplexity parameter and return a valid value."""
    if n <= 1:
        raise ValueError(
            "[TorchDR] ERROR Affinity: Input has less than one sample : "
            f"n_samples = {n}."
        )

    if perplexity >= n or perplexity <= 1:
        new_value = n // 2
        if verbose:
            warnings.warn(
                "[TorchDR] WARNING Affinity: The perplexity parameter must be "
                "greater than 1 and smaller than the number of samples "
                f"(here n = {n}). Got perplexity = {perplexity}. "
                "Setting perplexity to {new_value}."
            )
        return new_value
    else:
        return perplexity


[docs] class EntropicAffinity(SparseLogAffinity): r"""Solve the directed entropic affinity problem introduced in [1]_. The algorithm computes the optimal dual variable :math:`\mathbf{\varepsilon}^* \in \mathbb{R}^n_{>0}` such that .. math:: \forall i, \: \mathrm{h}(\mathbf{P}^{\mathrm{e}}_{i:}) = \log (\xi) + 1 \quad \text{where} \quad \forall (i,j), \: P^{\mathrm{e}}_{ij} = \frac{\exp(- C_{ij} / \varepsilon_i^\star)}{\sum_{\ell} \exp(- C_{i\ell} / \varepsilon_i^\star)} \:. where : - :math:`\mathbf{C}`: symmetric pairwise distance matrix between the samples. - :math:`\xi`: perplexity parameter. - :math:`\mathrm{h}`: (row-wise) Shannon entropy such that :math:`\mathrm{h}(\mathbf{p}) = - \sum_{i} p_{i} (\log p_{i} - 1)`. :math:`\mathbf{\varepsilon}^*` is computed by performing one dimensional searches since rows of :math:`\mathbf{P}^{\mathrm{e}}` are independent subproblems. **Convex problem.** Corresponds to the matrix :math:`\mathbf{P}^{\mathrm{e}}` in [3]_, solving the convex optimization problem .. math:: \mathbf{P}^{\mathrm{e}} \in \mathop{\arg\min}_{\mathbf{P} \in \mathbb{R}_+^{n \times n}} \: &\langle \mathbf{C}, \mathbf{P} \rangle \\ \text{s.t.} \quad &\mathbf{P} \mathbf{1} = \mathbf{1} \\ &\forall i, \: \mathrm{h}(\mathbf{P}_{i:}) \geq \log (\xi) + 1 \:. where :math:`\mathbf{1} := (1,...,1)^\top`: is the all-ones vector. The entropic affinity matrix is akin to a **soft** :math:`k` **-NN affinity**, with the perplexity parameter :math:`\xi` acting as :math:`k`. Each point distributes a unit mass among its closest neighbors while minimizing a transport cost given by :math:`\mathbf{C}`. The entropic constraint is saturated at the optimum and governs mass spread. With small :math:`\xi`, mass concentrates on a few neighbors; with large :math:`\xi`, it spreads across more neighbors thus capturing larger scales of dependencies. .. note:: A symmetric version is also available at :class:`~torchdr.SymmetricEntropicAffinity`. It is the affinity matrix used in :class:`~SNEkhorn`/ :class:`~TSNEkhorn` [3]_. In TSNE [2]_, the entropic affinity is simply averaged with its transpose. Parameters ---------- perplexity : float, optional Perplexity parameter, related to the number of 'effective' nearest neighbors. Consider selecting a value between 2 and the number of samples. tol : float, optional Precision threshold at which the root finding algorithm stops. max_iter : int, optional Number of maximum iterations for the root finding algorithm. sparsity: bool or 'auto', optional If True, keeps only the 3 * perplexity smallest element on each row of the ground cost matrix. Recommended if perplexity is not too big. metric : str, optional Metric to use for computing distances (default "sqeuclidean"). zero_diag : bool, optional Whether to set the diagonal of the distance matrix to 0. device : str, optional Device to use for computation. keops : bool, optional Whether to use KeOps for computation. verbose : bool, optional Verbosity. Default is False. References ---------- .. [1] Geoffrey Hinton, Sam Roweis (2002). Stochastic Neighbor Embedding. Advances in neural information processing systems 15 (NeurIPS). .. [2] Laurens van der Maaten, Geoffrey Hinton (2008). Visualizing Data using t-SNE. The Journal of Machine Learning Research 9.11 (JMLR). .. [3] Hugues Van Assel, Titouan Vayer, Rémi Flamary, Nicolas Courty (2023). SNEkhorn: Dimension Reduction with Symmetric Entropic Affinities. Advances in Neural Information Processing Systems 36 (NeurIPS). """ # noqa: E501 def __init__( self, perplexity: float = 30, tol: float = 1e-3, max_iter: int = 1000, sparsity: bool | str = "auto", metric: str = "sqeuclidean", zero_diag: bool = True, device: str = "auto", keops: bool = False, verbose: bool = False, ): self.perplexity = perplexity self.tol = tol self.max_iter = max_iter super().__init__( metric=metric, zero_diag=zero_diag, device=device, keops=keops, verbose=verbose, sparsity=sparsity, ) def _sparsity_rule(self): if self.perplexity < 100: return True else: if self.verbose: warnings.warn( "[TorchDR] WARNING Affinity: perplexity is large " f"({self.perplexity}) thus we turn off sparsity for " "the EntropicAffinity. " ) return False def _compute_sparse_log_affinity(self, X: torch.Tensor): r"""Solve the problem (EA) in [1]_ to compute the entropic affinity matrix. Parameters ---------- X : torch.Tensor of shape (n_samples, n_features) Data on which affinity is computed. Returns ------- log_affinity_matrix : torch.Tensor or pykeops.torch.LazyTensor of shape (n_samples, n_samples) Log of the entropic affinity matrix. indices : torch.Tensor or None Indices of the nearest neighbors if sparsity is used. """ if self.verbose: print("[TorchDR] Affinity : computing the Entropic Affinity matrix.") C = self._distance_matrix(X) n_samples_in = C.shape[0] perplexity = _check_perplexity(self.perplexity, n_samples_in, self.verbose) target_entropy = np.log(perplexity) + 1 k = 3 * perplexity if self._sparsity: if k >= n_samples_in - 1: print( "[TorchDR] WARNING Affinity: sparsity mode disabled " f"because perplexity is too large. 3 x perplexity = {k} " ">= n_samples - 1." ) C_, indices = C, None else: if self.verbose: print( f"[TorchDR] Affinity : sparsity mode enabled, computing {k} " "nearest neighbors." ) C_, indices = kmin(C, k=k, dim=1) else: C_, indices = C, None def entropy_gap(eps): # function to find the root of log_P = _log_Pe(C_, eps) log_P_normalized = log_P - logsumexp_red(log_P, dim=1) return entropy(log_P_normalized, log=True) - target_entropy begin, end = _bounds_entropic_affinity(C_, perplexity) begin += 1e-6 # avoid numerical issues self.eps_ = false_position( f=entropy_gap, n=n_samples_in, begin=begin, end=end, tol=self.tol, max_iter=self.max_iter, verbose=self.verbose, dtype=X.dtype, device=X.device, ) log_P_final = _log_Pe(C_, self.eps_) self.log_normalization_ = logsumexp_red(log_P_final, dim=1) log_affinity_matrix = log_P_final - self.log_normalization_ log_affinity_matrix -= math.log(n_samples_in) return log_affinity_matrix, indices
[docs] class SymmetricEntropicAffinity(LogAffinity): r"""Compute the symmetric entropic affinity (SEA) introduced in [3]_. Compute the solution :math:`\mathbf{P}^{\mathrm{se}}` to the symmetric entropic affinity (SEA) problem described in [3]_. The algorithm computes the optimal dual variables :math:`\mathbf{\mu}^\star \in \mathbb{R}^n` and :math:`\mathbf{\varepsilon}^\star \in \mathbb{R}^n_{>0}` using dual ascent. The affinity matrix is then given by .. math:: \forall (i,j), \: P^{\mathrm{se}}_{ij} = \exp \left( \frac{\mu^\star_{i} + \mu^\star_j - 2 C_{ij}}{\varepsilon^\star_i + \varepsilon^\star_j} \right) \: **Convex problem.** It amounts to the following convex optimization problem: .. math:: \mathbf{P}^{\mathrm{se}} \in \mathop{\arg\min}_{\mathbf{P} \in \mathbb{R}_+^{n \times n}} \: &\langle \mathbf{C}, \mathbf{P} \rangle \\ \text{s.t.} \quad &\mathbf{P} \mathbf{1} = \mathbf{1} \\ &\forall i, \: \mathrm{h}(\mathbf{P}_{i:}) \geq \log (\xi) + 1 \\ &\mathbf{P} = \mathbf{P}^\top \:. where : - :math:`\mathbf{C}`: symmetric pairwise distance matrix between the samples. - :math:`\xi`: perplexity parameter. - :math:`\mathrm{h}`: (row-wise) Shannon entropy such that :math:`\mathrm{h}(\mathbf{p}) = - \sum_{i} p_{i} (\log p_{i} - 1)`. - :math:`\mathbf{1} := (1,...,1)^\top`: all-ones vector. It is a symmetric version of :class:`~torchdr.EntropicAffinity`, where a symmetry constraint is added in the optimization problem. .. note:: Unlike :math:`(\mathbf{P}^{\mathrm{e}} + (\mathbf{P}^{\mathrm{e}})^\top )/ 2` used in :class:`~torchdr.TSNE` where :math:`\mathbf{P}^{\mathrm{e}}` is the :class:`~torchdr.EntropicAffinity` matrix, :class:`~torchdr.affinity.SymmetricEntropicAffinity` allows to control the entropy and mass of each row/column of the affinity matrix. Parameters ---------- perplexity : float Perplexity parameter, related to the number of 'effective' nearest neighbors. Consider selecting a value between 2 and the number of samples. lr : float, optional Learning rate used to update dual variables. eps_square : bool, optional Whether to optimize on the square of the dual variables. May be more stable in practice. tol : float, optional Precision threshold at which the algorithm stops, by default 1e-5. max_iter : int, optional Number of maximum iterations for the algorithm, by default 500. optimizer : {'SGD', 'Adam', 'NAdam', 'LBFGS}, optional Which pytorch optimizer to use (default 'Adam'). tolog : bool, optional Whether to store intermediate result in a dictionary (default False). metric : str, optional Metric to use for computing distances, by default "sqeuclidean". zero_diag : bool, optional Whether to set the diagonal of the distance matrix to 0. device : str, optional Device to use for computation. keops : bool, optional Whether to use KeOps for computation. verbose : bool, optional Verbosity. Default is False. References ---------- .. [3] SNEkhorn: Dimension Reduction with Symmetric Entropic Affinities, Hugues Van Assel, Titouan Vayer, Rémi Flamary, Nicolas Courty, NeurIPS 2023. """ # noqa: E501 def __init__( self, perplexity: float = 30, lr: float = 1e-1, eps_square: bool = True, tol: float = 1e-3, max_iter: int = 500, optimizer: str = "Adam", tolog: bool = False, metric: str = "sqeuclidean", zero_diag: bool = True, device: str = "auto", keops: bool = False, verbose: bool = False, ): super().__init__( metric=metric, zero_diag=zero_diag, device=device, keops=keops, verbose=verbose, ) self.perplexity = perplexity self.lr = lr self.tol = tol self.max_iter = max_iter self.optimizer = optimizer self.tolog = tolog self.eps_square = eps_square def _compute_log_affinity(self, X: torch.Tensor): r"""Solve the problem (SEA) in [3]_. Parameters ---------- X : torch.Tensor of shape (n_samples, n_features) Data on which affinity is computed. Returns ------- log_affinity_matrix : torch.Tensor or pykeops.torch.LazyTensor of shape (n_samples, n_samples) Log of the symmetric entropic affinity matrix. """ self.log_ = {} if self.verbose: print( "[TorchDR] Affinity : computing the Symmetric Entropic Affinity matrix." ) C = self._distance_matrix(X) n_samples_in = X.shape[0] perplexity = _check_perplexity(self.perplexity, n_samples_in, self.verbose) target_entropy = np.log(perplexity) + 1 one = torch.ones(n_samples_in, dtype=X.dtype, device=X.device) # dual variables, size (n_samples) self.eps_ = torch.ones(n_samples_in, dtype=X.dtype, device=X.device) self.mu_ = torch.ones(n_samples_in, dtype=X.dtype, device=X.device) if self.optimizer == "LBFGS": self.eps_.requires_grad = True self.mu_.requires_grad = True optimizer = torch.optim.LBFGS( [self.eps_, self.mu_], lr=self.lr, max_iter=self.max_iter, tolerance_grad=self.tol, line_search_fn="strong_wolfe", ) def closure(): if torch.is_grad_enabled(): optimizer.zero_grad() _eps = self.eps_**2 if self.eps_square else self.eps_ log_P = _log_Pse(C, _eps, self.mu_, eps_square=False) H = entropy(log_P, log=True, dim=1) loss = ( # Negative Lagrangian loss -(log_P.exp() * C).sum(0).sum() - torch.inner(_eps, target_entropy - H) + torch.inner(self.mu_, log_P.logsumexp(1).squeeze().expm1()) ) if loss.requires_grad: loss.backward() return loss optimizer.step(closure) check_NaNs( [self.eps_, self.mu_], msg=( "[TorchDR] ERROR Affinity: NaN in dual variables, " "consider decreasing the learning rate." ), ) if self.tolog: self.log_["eps"] = [self.eps_.clone().detach().cpu()] self.log_["mu"] = [self.eps_.clone().detach().cpu()] log_affinity_matrix = _log_Pse( C, self.eps_, self.mu_, eps_square=self.eps_square ) else: # other optimizers including SGD and Adam optimizer = OPTIMIZERS[self.optimizer]([self.eps_, self.mu_], lr=self.lr) if self.tolog: self.log_["eps"] = [self.eps_.clone().detach().cpu()] self.log_["mu"] = [self.eps_.clone().detach().cpu()] pbar = tqdm(range(self.max_iter), disable=not self.verbose) for k in pbar: with torch.no_grad(): optimizer.zero_grad() log_P = _log_Pse(C, self.eps_, self.mu_, eps_square=self.eps_square) H = entropy(log_P, log=True, dim=1) P_sum = log_P.logsumexp(1).exp().squeeze() # squeeze for keops grad_eps = H - target_entropy if self.eps_square: # the Jacobian must be corrected by 2 * diag(eps) grad_eps = 2 * self.eps_.clone().detach() * grad_eps grad_mu = P_sum - one self.eps_.grad = grad_eps self.mu_.grad = grad_mu optimizer.step() if not self.eps_square: # optimize on eps > 0 self.eps_.clamp_(min=0) check_NaNs( [self.eps_, self.mu_], msg=( f"[TorchDR] ERROR Affinity: NaN at iter {k}, " "consider decreasing the learning rate." ), ) if self.tolog: self.log_["eps"].append(self.eps_.clone().detach()) self.log_["mu"].append(self.mu_.clone().detach()) perps = (H - 1).exp() if self.verbose: pbar.set_description( f"PERPLEXITY:{float(perps.mean().item()): .2e} " f"(std:{float(perps.std().item()): .2e}), " f"MARGINAL:{float(P_sum.mean().item()): .2e} " f"(std:{float(P_sum.std().item()): .2e})" ) if ( torch.norm(grad_eps) < self.tol and torch.norm(grad_mu) < self.tol ): if self.verbose: print( "[TorchDR] Affinity : convergence reached " f"at iter {k}." ) break if k == self.max_iter - 1 and self.verbose: warnings.warn( "[TorchDR] WARNING Affinity: max iter attained, " "algorithm stops but may not have converged." ) self.n_iter_ = k log_affinity_matrix = log_P log_affinity_matrix -= math.log(n_samples_in) return log_affinity_matrix
[docs] class SinkhornAffinity(LogAffinity): r"""Compute the symmetric doubly stochastic affinity matrix. The algorithm computes the doubly stochastic matrix :math:`\mathbf{P}^{\mathrm{ds}}` with controlled global entropy using the symmetric Sinkhorn algorithm [5]_. The algorithm computes the optimal dual variable :math:`\mathbf{f}^\star \in \mathbb{R}^n` such that .. math:: \mathbf{P}^{\mathrm{ds}} \mathbf{1} = \mathbf{1} \quad \text{where} \quad \forall (i,j), \: P^{\mathrm{ds}}_{ij} = \exp(f^\star_i + f^\star_j - C_{ij} / \varepsilon) \:. where : - :math:`\mathbf{C}`: symmetric pairwise distance matrix between the samples. - :math:`\varepsilon`: entropic regularization parameter. - :math:`\mathbf{1} := (1,...,1)^\top`: all-ones vector. :math:`\mathbf{f}^\star` is computed by performing dual ascent via the Sinkhorn fixed-point iteration (eq. 25 in [7]_). **Convex problem.** Consists in solving the following symmetric entropic optimal transport problem [6]_: .. math:: \mathbf{P}^{\mathrm{ds}} \in \mathop{\arg\min}_{\mathbf{P} \in \mathcal{DS}} \: \langle \mathbf{C}, \mathbf{P} \rangle + \varepsilon \mathrm{H}(\mathbf{P}) where : - :math:`\mathcal{DS} := \left\{ \mathbf{P} \in \mathbb{R}_+^{n \times n}: \: \mathbf{P} = \mathbf{P}^\top \:,\: \mathbf{P} \mathbf{1} = \mathbf{1} \right\}`: set of symmetric doubly stochastic matrices. - :math:`\mathrm{H}`: (global) Shannon entropy such that :math:`\mathrm{H}(\mathbf{P}) := - \sum_{ij} P_{ij} (\log P_{ij} - 1)`. **Bregman projection.** Another way to write this problem is to consider the KL projection of the Gaussian kernel :math:`\mathbf{K}_\varepsilon = \exp(- \mathbf{C} / \varepsilon)` onto the set of doubly stochastic matrices: .. math:: \mathbf{P}^{\mathrm{ds}} = \mathrm{Proj}_{\mathcal{DS}}^{\mathrm{KL}}(\mathbf{K}_\varepsilon) := \mathop{\arg\min}_{\mathbf{P} \in \mathcal{DS}} \: \mathrm{KL}(\mathbf{P} \| \mathbf{K}_\varepsilon) where :math:`\mathrm{KL}(\mathbf{P} \| \mathbf{Q}) := \sum_{ij} P_{ij} (\log (Q_{ij} / P_{ij}) - 1) + Q_{ij}` is the Kullback Leibler divergence between :math:`\mathbf{P}` and :math:`\mathbf{Q}`. Parameters ---------- eps : float, optional Regularization parameter for the Sinkhorn algorithm. tol : float, optional Precision threshold at which the algorithm stops. max_iter : int, optional Number of maximum iterations for the algorithm. base_kernel : {"gaussian", "student"}, optional Which base kernel to normalize as doubly stochastic. tolog : bool, optional Whether to store intermediate result in a dictionary. metric : str, optional Metric to use for computing distances (default "sqeuclidean"). zero_diag : bool, optional Whether to set the diagonal of the distance matrix to 0. device : str, optional Device to use for computation. keops : bool, optional Whether to use KeOps for computation. verbose : bool, optional Verbosity. Default is False. with_grad : bool, optional (default=False) If True, the Sinkhorn iterations are done with gradient tracking. If False, torch.no_grad() is used for the iterations. References ---------- .. [5] Richard Sinkhorn, Paul Knopp (1967). Concerning nonnegative matrices and doubly stochastic matrices. Pacific Journal of Mathematics, 21(2), 343-348. .. [6] Marco Cuturi (2013). Sinkhorn Distances: Lightspeed Computation of Optimal Transport. Advances in Neural Information Processing Systems 26 (NeurIPS). .. [7] Jean Feydy, Thibault Séjourné, François-Xavier Vialard, Shun-ichi Amari, Alain Trouvé, Gabriel Peyré (2019). Interpolating between Optimal Transport and MMD using Sinkhorn Divergences. International Conference on Artificial Intelligence and Statistics (AISTATS). """ # noqa: E501 def __init__( self, eps: float = 1.0, tol: float = 1e-5, max_iter: int = 1000, base_kernel: str = "gaussian", tolog: bool = False, metric: str = "sqeuclidean", zero_diag: bool = True, device: str = "auto", keops: bool = False, verbose: bool = False, with_grad: bool = False, ): super().__init__( metric=metric, zero_diag=zero_diag, device=device, keops=keops, verbose=verbose, ) self.eps = eps self.tol = tol self.max_iter = max_iter self.base_kernel = base_kernel self.tolog = tolog self.with_grad = with_grad def _compute_log_affinity(self, X: torch.Tensor, init_dual: torch.Tensor = None): r"""Compute the entropic doubly stochastic affinity matrix. Parameters ---------- X : torch.Tensor of shape (n_samples, n_features) Data on which affinity is computed. init_dual : torch.Tensor of shape (n_samples), optional Initialization for the dual variable of the Sinkhorn algorithm. Returns ------- log_affinity_matrix : torch.Tensor or pykeops.torch.LazyTensor of shape (n_samples, n_samples) Log of the doubly stochastic affinity matrix. """ C = self._distance_matrix(X) if self.base_kernel == "student": C = (1 + C).log() if self.verbose: print( "[TorchDR] Affinity : computing the (KL) Doubly Stochastic " "Affinity matrix (Sinkhorn affinity)." ) n_samples_in = C.shape[0] log_K = -C / self.eps # Performs warm-start if a dual variable f is provided self.dual_ = ( torch.zeros(n_samples_in, dtype=X.dtype, device=X.device) if init_dual is None else init_dual ) if self.tolog: self.log["dual"] = [self.dual_.clone().detach().cpu()] context_manager = ( contextlib.nullcontext() if self.with_grad else torch.no_grad() ) with context_manager: # Sinkhorn iterations for k in range(self.max_iter): # well conditioned symmetric Sinkhorn iteration (Feydy et al. 2019) reduction = -sum_matrix_vector(log_K, self.dual_).logsumexp(0).squeeze() self.dual_ = 0.5 * (self.dual_ + reduction) if self.tolog: self.log["dual"].append(self.dual_.clone().detach().cpu()) check_NaNs( self.dual_, msg=f"[TorchDR] ERROR Affinity: NaN at iter {k}." ) if torch.norm(self.dual_ - reduction) < self.tol: if self.verbose: print(f"[TorchDR] Affinity : convergence reached at iter {k}.") break if k == self.max_iter - 1 and self.verbose: warnings.warn( "[TorchDR] WARNING Affinity: max iter attained, algorithm " "stops but may not have converged." ) self.n_iter_ = k log_affinity_matrix = _log_Pds(log_K, self.dual_) log_affinity_matrix -= math.log(n_samples_in) return log_affinity_matrix
[docs] class NormalizedGaussianAffinity(LogAffinity): r"""Compute the Gaussian affinity matrix which can be normalized along a dimension. The algorithm computes :math:`\exp( - \mathbf{C} / \sigma)` where :math:`\mathbf{C}` is the pairwise distance matrix and :math:`\sigma` is the bandwidth parameter. The affinity can be normalized according to the specified normalization dimension. Parameters ---------- sigma : float, optional Bandwidth parameter. metric : str, optional Metric to use for pairwise distances computation. zero_diag : bool, optional Whether to set the diagonal of the affinity matrix to zero. device : str, optional Device to use for computations. keops : bool, optional Whether to use KeOps for computations. verbose : bool, optional Verbosity. normalization_dim : int or Tuple[int], optional Dimension along which to normalize the affinity matrix. Default is (0, 1) """ def __init__( self, sigma: float = 1.0, metric: str = "sqeuclidean", zero_diag: bool = True, device: str = "auto", keops: bool = False, verbose: bool = False, normalization_dim: int | Tuple[int] = (0, 1), ): super().__init__( metric=metric, zero_diag=zero_diag, device=device, keops=keops, verbose=verbose, ) self.sigma = sigma self.normalization_dim = normalization_dim def _compute_log_affinity(self, X: torch.Tensor): r"""Fit the normalized Gaussian affinity model to the provided data. Parameters ---------- X : torch.Tensor of shape (n_samples, n_features) Input data. Returns ------- log_affinity_matrix : torch.Tensor or pykeops.torch.LazyTensor of shape (n_samples, n_samples) Log of the normalized Gaussian affinity matrix. """ C = self._distance_matrix(X) log_affinity_matrix = -C / self.sigma if self.normalization_dim is not None: self.log_normalization_ = logsumexp_red( log_affinity_matrix, self.normalization_dim ) log_affinity_matrix = log_affinity_matrix - self.log_normalization_ if isinstance(self.normalization_dim, int): n_samples_in = X.shape[0] log_affinity_matrix -= math.log(n_samples_in) return log_affinity_matrix
[docs] class NormalizedStudentAffinity(LogAffinity): r"""Compute the Student affinity matrix which can be normalized along a dimension. Its expression is given by: .. math:: \left(1 + \frac{\mathbf{C}}{\nu}\right)^{-\frac{\nu + 1}{2}} where :math:`\nu > 0` is the degrees of freedom parameter. The affinity can be normalized according to the specified normalization dimension. Parameters ---------- degrees_of_freedom : int, optional Degrees of freedom for the Student-t distribution. metric : str, optional Metric to use for pairwise distances computation. zero_diag : bool, optional Whether to set the diagonal of the affinity matrix to zero. device : str, optional Device to use for computations. keops : bool, optional Whether to use KeOps for computations. verbose : bool, optional Verbosity. normalization_dim : int or Tuple[int], optional Dimension along which to normalize the affinity matrix. Default is (0, 1) """ def __init__( self, degrees_of_freedom: float = 1.0, metric: str = "sqeuclidean", zero_diag: bool = True, device: str = "auto", keops: bool = False, verbose: bool = False, normalization_dim: int | Tuple[int] = (0, 1), ): super().__init__( metric=metric, zero_diag=zero_diag, device=device, keops=keops, verbose=verbose, ) self.degrees_of_freedom = degrees_of_freedom self.normalization_dim = normalization_dim def _compute_log_affinity(self, X: torch.Tensor): r"""Fits the normalized Student affinity model to the provided data. Parameters ---------- X : torch.Tensor of shape(n_samples, n_features) Input data. Returns ------- log_affinity_matrix : torch.Tensor or pykeops.torch.LazyTensor of shape(n_samples, n_samples) Log of the normalized Student affinity matrix. """ C = self._distance_matrix(X) log_affinity_matrix = ( -0.5 * (self.degrees_of_freedom + 1) * (C / self.degrees_of_freedom + 1).log() ) if self.normalization_dim is not None: self.log_normalization_ = logsumexp_red( log_affinity_matrix, self.normalization_dim ) log_affinity_matrix = log_affinity_matrix - self.log_normalization_ if isinstance(self.normalization_dim, int): n_samples_in = X.shape[0] log_affinity_matrix -= math.log(n_samples_in) return log_affinity_matrix