Source code for geometric_kernels.spaces.product

"""
This module provides the :class:`ProductDiscreteSpectrumSpace` space and the
respective :class:`~.eigenfunctions.Eigenfunctions` subclass
:class:`ProductEigenfunctions`.

See :doc:`this page </theory/product_spaces>` for a brief account on
theory behind product spaces and the :doc:`Torus.ipynb </examples/Torus>`
notebook for a tutorial on how to use them.
"""

import itertools
import math

import lab as B
import numpy as np
from beartype.typing import List, Optional, Tuple

from geometric_kernels.lab_extras import from_numpy, int_like, take_along_axis
from geometric_kernels.spaces.base import DiscreteSpectrumSpace
from geometric_kernels.spaces.eigenfunctions import Eigenfunctions
from geometric_kernels.utils.product import make_product, project_product
from geometric_kernels.utils.utils import chain


def _find_lowest_sum_combinations(arr: B.Numeric, k: int) -> B.Numeric:
    """
    For an [N, D] array `arr`, with `arr[i, :]` assumed sorted, find `k`
    smallest sums of one element per each row, i.e. sums of form
    `sum_i arr[i, j_i]`, return the array of indices of the summands.

    :param arr:
        An [N, D]-shaped array.
    :param k:
        Number of smallest sums to find.

    :return:
        A [k, N]-shaped array of the same backend as `arr`.
    """
    N, D = arr.shape

    index_array = B.stack(*[B.zeros(int_like(arr), N) for i in range(k)], axis=0)
    # prebuild array for indexing the first index of the eigenvalue array
    first_index = B.linspace(0, N - 1, N).astype(int)
    i = 0

    # first eigenvalue is the sum of the first eigenvalues of the individual spaces
    curr_idx = B.zeros(int, N)[None, :]
    while i < k:
        # compute eigenvalues of the proposals
        sum_values = arr[first_index, curr_idx].sum(axis=1)

        # Compute tied smallest new eigenvalues
        lowest_sum_index = int(sum_values.argmin())
        tied_sums = sum_values == sum_values[lowest_sum_index]
        tied_sums_indexes = B.linspace(0, len(tied_sums) - 1, len(tied_sums)).astype(
            int
        )[tied_sums]

        # Add new eigenvalues to indexing array
        for index in tied_sums_indexes:
            index_array[i, :] = curr_idx[index]
            i += 1
            if i >= k:
                break

        # keep unaccepted ones around
        old_indices = curr_idx[~tied_sums]
        # mutate just accepted ones by adding one to each eigenindex
        new_indices = curr_idx[tied_sums][..., None, :] + B.eye(int, curr_idx.shape[-1])
        new_indices = new_indices.reshape((-1, new_indices.shape[-1]))
        new_indices = B.minimum(new_indices, D - 1)
        curr_idx = B.concat(old_indices, new_indices, axis=0)
        curr_idx = np.unique(
            B.to_numpy(curr_idx.reshape((-1, curr_idx.shape[-1]))),
            axis=0,
        )
        # Filter out already searched combinations.
        # See https://stackoverflow.com/a/40056251.
        dims = np.maximum(curr_idx.max(0), index_array.max(0)) + 1
        curr_idx = curr_idx[
            ~np.in1d(
                np.ravel_multi_index(curr_idx.T, dims),
                np.ravel_multi_index(index_array.T, dims),
            )
        ]

    return index_array


def _num_per_level_to_mapping(num_per_level: List[int]) -> List[List[int]]:
    """
    Given a list of numbers of eigenfunctions per level, return a list `mapping`
    such that `mapping[i][j]` is the index of the `j`-th eigenfunction of the
    `i`-th level.

    :param num_per_level:
        A `num_eigenfunctions_per_level` list of some space.
    """
    mapping = []
    i = 0
    for num in num_per_level:
        mapping.append([i + j for j in range(num)])
        i += num
    return mapping


def _eigenlevelindices_to_eigenfunctionindices(
    eigenlevelindices: B.Numeric, nums_per_level: List[List[int]]
) -> B.Numeric:
    """
    Given an `eigenlevelindices` which maps the product space's level indices to
    the indices of the factor spaces' levels, return an `eigenfunctionindices`
    array that maps the product space's eigenfunction index to the indices of
    the factor spaces' individual eigenfunctions.

    :param eigenlevelindices:
        A [L, S]-shaped array, where `S` is the number of factor spaces and `L`
        is the number of levels, such that `eigenindicies[i, :]` are the indices
        of the levels of the factor spaces that correspond to the `i`'th level
        of the product space.
    :param nums_per_level:
        Each `nums_per_level[j]` represents the `num_eigenfunctions_per_level`
        list of the `j`-th factor space.

    :return:
        A [J, S]-shaped array, where `J` is the total number of eigenfunctions.
    """
    L = eigenlevelindices.shape[0]
    S = eigenlevelindices.shape[1]
    levels_to_eigenfunction_indices: List[List[List[int]]] = [
        _num_per_level_to_mapping(npl) for npl in nums_per_level
    ]  # S lists of lists of length L

    eigenfunctionindices = []
    for cur_level in range(L):
        cur_factor_eigenfunction_indices: List[List[int]] = [
            levels_to_eigenfunction_indices[s][eigenlevelindices[cur_level, s]]
            for s in range(S)
        ]
        new_indices: List[Tuple[int, ...]] = list(
            itertools.product(*cur_factor_eigenfunction_indices)
        )  # part of the resulting eigenfunctionindices for the cur_level level.
        eigenfunctionindices += new_indices

    out = from_numpy(eigenlevelindices, np.array(eigenfunctionindices))
    return out


[docs] class ProductEigenfunctions(Eigenfunctions): r""" Eigenfunctions of the Laplacian on the product space are outer products of the eigenfunctions on factors. Levels correspond to tuples of levels of the factors. :param element_shapes: Shapes of the elements in each factor. Can be obtained as properties `space.element_shape` of any given factor `space`. :param element_dtypes: Abstract lab data types of the elements in each factor. Can be obtained as properties `space.element_dtype` of any given factor `space`. :param eigenindicies: A [L, S]-shaped array, where `S` is the number of factor spaces and `L` is the number of levels, such that `eigenindicies[i, :]` are the indices of the levels of the factor spaces that correspond to the `i`'th level of the product space. This parameter implicitly determines the number of levels. :param ``*eigenfunctions``: The :class:`~.Eigenfunctions` subclasses corresponding to each of the factor spaces. :param dimension_indices: Determines how a vector `x` representing a product space element is to be mapped into the arrays `xi` that represent elements of the factor spaces. `xi` are assumed to be equal to `x[dimension_indices[i]]`, possibly up to a reshape. Such a reshape might be necessary to accommodate the spaces whose elements are matrices rather than vectors, as determined by `element_shapes`. The transformation of `x` into the list of `xi`\ s is performed by :func:`~.project_product`. If None, assumes the each input is layed-out flattened and concatenated, in the same order as the factor spaces. In this case, the inverse to :func:`~.project_product` is :func:`~.make_product`. Defaults to None. """ def __init__( self, element_shapes: List[List[int]], element_dtypes: List[B.DType], eigenindicies: B.Numeric, *eigenfunctions: Eigenfunctions, dimension_indices: B.Numeric = None, ): self._num_levels = eigenindicies.shape[0] self.element_shapes = element_shapes self.element_dtypes = element_dtypes dimensions = [math.prod(element_shape) for element_shape in self.element_shapes] if dimension_indices is None: self.dimension_indices = [] i = 0 for dim in dimensions: self.dimension_indices.append([*range(i, i + dim)]) i += dim self.eigenindicies = eigenindicies self.eigenfunctions = eigenfunctions self.nums_per_level: List[List[int]] = [ eigenfunctions_factor.num_eigenfunctions_per_level for eigenfunctions_factor in self.eigenfunctions ] # [S, LFactor] where `LFactor` is eigenfunctions[0].num_levels self._eigenfunctionindices = _eigenlevelindices_to_eigenfunctionindices( self.eigenindicies, self.nums_per_level ) assert self.eigenindicies.shape[-1] == len(self.eigenfunctions)
[docs] def __call__(self, X: B.Numeric, **kwargs) -> B.Numeric: """ Evaluate the individual eigenfunctions at a batch of input locations. .. warning:: Will `raise NotImplementedError` if some of the factors do not implement their `__call__`. :param X: Points to evaluate the eigenfunctions at, an array of shape [N, D], where `N` is the number of points and `D` is the dimension of the vectors that represent points in the product space. Each point `x` in the product space is a `D`-dimensional vector such that `x[dimension_indices[i]]` is the vector that represents the flattened element of the `i`-th factor space. .. note:: If the instance was created with `dimension_indices=None`, then you can use the :func:`~.make_product` function to convert a list of (batches of) points in factor spaces into a (batch of) points in the product space. :param ``**kwargs``: Any additional parameters. :return: An [N, J]-shaped array, where `J` is the number of eigenfunctions. """ Xs = project_product( X, self.dimension_indices, self.element_shapes, self.element_dtypes ) # List of S arrays, each of shape [N, *element_shape_s] factor_eigenfunction_values = [ eigenfunction(X, **kwargs) # [N, Js], Js different for each factor for eigenfunction, X in zip(self.eigenfunctions, Xs) ] # List of length S, contains arrays of different shapes (not stackable) eigenfunction_values = [] for j in range(self.num_eigenfunctions): m_idx = self._eigenfunctionindices[j] # (S,)-shaped array eigenfunction_values.append( B.prod( B.stack( *( factor_eigenfunction_values[s][ :, self._eigenfunctionindices[j, s] ] # [N,] for s in range(len(m_idx)) ), axis=-1, ), # [N, S] axis=1, ) # [N,] ) eigenfunction_values = B.stack(*eigenfunction_values, axis=-1) # [N, J] return eigenfunction_values
@property def num_eigenfunctions(self) -> int: return self._eigenfunctionindices.shape[0] @property def num_levels(self) -> int: return self._num_levels
[docs] def phi_product( self, X: B.Numeric, X2: Optional[B.Numeric] = None, **kwargs ) -> B.Numeric: if X2 is None: X2 = X Xs = project_product( X, self.dimension_indices, self.element_shapes, self.element_dtypes ) # List of S arrays, each of shape [N, *element_shape_s] Xs2 = project_product( X2, self.dimension_indices, self.element_shapes, self.element_dtypes ) # List of S arrays, each of shape [N2, *element_shape_s] factor_phi_products = [ take_along_axis( eigenfunction.phi_product(X1, X2, **kwargs), # [N, N2, L_s] from_numpy(X1, self.eigenindicies[None, None, :, s]), -1, ) # [N, N2, L] for s, (eigenfunction, X1, X2) in enumerate( zip(self.eigenfunctions, Xs, Xs2) ) ] common_dtype = B.promote_dtypes(*[B.dtype(x) for x in factor_phi_products]) phis = B.stack( *[B.cast(common_dtype, x) for x in factor_phi_products], axis=-1 ) # [N, N2, L, S] prod_phis = B.prod(phis, axis=-1) # [N, N2, L, S] -> [N, N2, L] return prod_phis
[docs] def phi_product_diag(self, X: B.Numeric, **kwargs): Xs = project_product( X, self.dimension_indices, self.element_shapes, self.element_dtypes ) phis = B.stack( *[ take_along_axis( eigenfunction.phi_product_diag(X1, **kwargs), # [N, L_s] from_numpy(X1, self.eigenindicies[None, :, s]), -1, ) # [N, L] for s, (eigenfunction, X1) in enumerate(zip(self.eigenfunctions, Xs)) ], axis=-1, ) # [N, L, S] prod_phis = B.prod(phis, axis=-1) # [N, L] return prod_phis
@property def num_eigenfunctions_per_level(self) -> List[int]: L = self.eigenindicies.shape[0] S = self.eigenindicies.shape[1] totals = [] for cur_level in range(L): totals.append( math.prod( self.nums_per_level[s][self.eigenindicies[cur_level, s]] for s in range(S) ) ) return totals
[docs] class ProductDiscreteSpectrumSpace(DiscreteSpectrumSpace): r""" The GeometricKernels space representing a product .. math:: \mathcal{S} = \mathcal{S}_1 \times \ldots \mathcal{S}_S of :class:`~.spaces.DiscreteSpectrumSpace`-s $\mathcal{S}_i$. Levels are indexed by tuples of levels of the factors. .. admonition:: Precomputing optimal levels The eigenvalue corresponding to a level of the product space represented by a tuple $(j_1, .., j_S)$ is the sum .. math:: \lambda^{(1)}_{j_1} + \ldots + \lambda^{(S)}_{j_S} of the eigenvalues $\lambda^{(s)}_{j_s}$ of the factors. Computing top `num_levels` smallest eigenvalues is thus a combinatorial problem, the solution to which we precompute. To make this precomputation possible you need to provide the `num_levels` parameter when constructing the :class:`ProductDiscreteSpectrumSpace`, unlike for other spaces. What is more, the `num` parameter of the :meth:`get_eigenfunctions` cannot be larger than the `num_levels` value. .. note:: See :doc:`this page </theory/product_spaces>` for a brief account on theory behind product spaces and the :doc:`Torus.ipynb </examples/Torus>` notebook for a tutorial on how to use them. An alternative to using :class:`ProductDiscreteSpectrumSpace` is to use the :class:`~.kernels.ProductGeometricKernel` kernel. The latter is more flexible, allowing more general spaces as factors and automatic relevance determination -like behavior. :param spaces: The factors, subclasses of :class:`~.spaces.DiscreteSpectrumSpace`. :param num_levels: The number of levels to pre-compute for this product space. :param num_levels_per_space: Number of levels to fetch for each of the factor spaces, to compute the product-space levels. This is a single number rather than a list, because we currently only support fetching the same number of levels for all the factor spaces. If not given, `num_levels` levels will be fetched for each factor. .. admonition:: Citation If you use this GeometricKernels space in your research, please consider citing :cite:t:`borovitskiy2020`. """ def __init__( self, *spaces: DiscreteSpectrumSpace, num_levels: int = 25, num_levels_per_space: Optional[int] = None, ): for space in spaces: assert isinstance( space, DiscreteSpectrumSpace ), "One of the spaces is not an instance of DiscreteSpectrumSpace." self.factor_spaces = spaces # List of length S self.num_levels = num_levels # Perform a breadth-first search for the smallest eigenvalues. # Assuming that the eigenvalues come sorted, the next biggest eigenvalue # can be found by taking a one-index step in any direction from the # current edge of the search space. if num_levels_per_space is None: num_levels_per_space = num_levels assert num_levels <= num_levels_per_space ** len( spaces ), "Cannot have more levels than there are possible combinations" # prefetch the eigenvalues of the subspaces factor_space_eigenvalues = B.stack( *[ space.get_eigenvalues(num_levels_per_space)[:, 0] for space in self.factor_spaces ], axis=0, ) # [S, num_levels_per_space] self.factor_space_eigenindices = _find_lowest_sum_combinations( factor_space_eigenvalues, self.num_levels ) # [self.num_levels, S] self.factor_space_eigenvalues = factor_space_eigenvalues self._eigenvalues = factor_space_eigenvalues[ range(len(self.factor_spaces)), self.factor_space_eigenindices, ].sum(axis=1) def __str__(self): return f"ProductDiscreteSpectrumSpace({', '.join(str(space) for space in self.factor_spaces)})" @property def dimension(self) -> int: """ Returns the dimension of the product space, equal to the sum of dimensions of the factors. """ return sum([space.dimension for space in self.factor_spaces])
[docs] def random(self, key, number): """ Sample random points on the product space by concatenating random points on the factor spaces via :func:`~.make_product`. :param key: Either `np.random.RandomState`, `tf.random.Generator`, `torch.Generator` or `jax.tensor` (representing random state). :param number: Number of samples to draw. :return: An array of `number` uniformly random samples on the space. """ random_points = [] for factor in self.factor_spaces: key, factor_random_points = factor.random(key, number) random_points.append(factor_random_points) return key, make_product(random_points)
[docs] def get_eigenfunctions(self, num: int) -> Eigenfunctions: """ Returns the :class:`~.ProductEigenfunctions` object with `num` levels. :param num: Number of levels. Cannot be larger than the `num_levels` parameter of the constructor. """ assert num <= self.num_levels max_level = int(self.factor_space_eigenindices[:num, :].max() + 1) factor_space_eigenfunctions = [ space.get_eigenfunctions(max_level) for space in self.factor_spaces ] return ProductEigenfunctions( [space.element_shape for space in self.factor_spaces], [space.element_dtype for space in self.factor_spaces], self.factor_space_eigenindices[:num], *factor_space_eigenfunctions, )
[docs] def get_eigenvalues(self, num: int) -> B.Numeric: """ Eigenvalues of the Laplacian corresponding to the first `num` levels. :param num: Number of levels. Cannot be larger than the `num_levels` parameter of the constructor. :return: (num, 1)-shaped array containing the eigenvalues. """ assert num <= self.num_levels return self._eigenvalues[:num, None]
[docs] def get_repeated_eigenvalues(self, num: int) -> B.Numeric: """ Eigenvalues of the Laplacian corresponding to the first `num` levels, repeated according to their multiplicity within levels. :param num: Number of levels. Cannot be larger than the `num_levels` parameter of the constructor. :return: (J, 1)-shaped array containing the repeated eigenvalues,`J is the resulting number of the repeated eigenvalues. """ assert num <= self.num_levels eigenfunctions = self.get_eigenfunctions(num) eigenvalues = self._eigenvalues[:num] multiplicities = eigenfunctions.num_eigenfunctions_per_level repeated_eigenvalues = chain(eigenvalues, multiplicities) return B.reshape(repeated_eigenvalues, -1, 1) # [J, 1]
@property def element_shape(self): """ :return: Sum of the products of the element shapes of the factor spaces. """ return sum(math.prod(space.element_shape) for space in self.factor_spaces) @property def element_dtype(self): """ :return: B.Numeric. """ return B.Numeric