diff --git a/docs/refs.bib b/docs/refs.bib index 005bc0112..a40d1a108 100644 --- a/docs/refs.bib +++ b/docs/refs.bib @@ -31,6 +31,14 @@ @article{baillo+grane_2009_local keywords = {62G08,62G30,Cross-validation,Fourier expansion,Functional data,Kernel regression,Local linear regression,Nonparametric smoothing} } +@book{berlinet+thomas-agnan_2011_reproducing, + title={Reproducing kernel Hilbert spaces in probability and statistics}, + author={Berlinet, Alain and Thomas-Agnan, Christine}, + year={2011}, + publisher={Springer Science \& Business Media} +} + + @article{berrendero++_2016_mrmr, title = {The {{mRMR}} Variable Selection Method: A Comparative Study for Functional Data}, shorttitle = {The {{mRMR}} Variable Selection Method}, @@ -308,6 +316,16 @@ @article{ghosh+chaudhuri_2005_maximum keywords = {Bayes risk,cross-validation,data depth,elliptic symmetry,kernel density estimation,location shift model,Mahalanobis distance,misclassification rate,Vapnik Chervonenkis dimension} } +@article{gutierrez++_1992_numerical, + title={On the numerical expansion of a second order stochastic process}, + author={Guti{\'e}rrez, Ram{\'o}n and Ruiz, Juan Carlos and Valderrama, Mariano J}, + journal={Applied stochastic models and data analysis}, + volume={8}, + number={2}, + pages={67--77}, + year={1992}, + publisher={Wiley Online Library} + @article{hastie++_1995_penalized, title = {Penalized Discriminant Analysis}, author = {Hastie, Trevor and Buja, Andreas and Tibshirani, Robert}, diff --git a/examples/plot_rkhs_inner_product.py b/examples/plot_rkhs_inner_product.py new file mode 100644 index 000000000..310d2d622 --- /dev/null +++ b/examples/plot_rkhs_inner_product.py @@ -0,0 +1,194 @@ +""" +Reproducing Kernel Hilbert Space Inner Product for the Brownian Bridge +======================================================================= + +This example shows how to compute the inner product of two functions in the +reproducing kernel Hilbert space (RKHS) of the Brownian Bridge. +""" + +# Author: Martín Sánchez Signorini +# License: MIT + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +from skfda.misc.rkhs_product import rkhs_inner_product +from skfda.representation import FDataGrid +from skfda.typing._numpy import NDArrayFloat + +############################################################################### +# The kernel corresponding to a Brownian Bridge process +# :footcite:p:`gutierrez++_1992_numerical` in the interval :math:`[0, 1]` is +# +# .. math:: +# k(s, t) = \min(s, t) - st +# +# The RKHS inner product method +# :func:`~skfda.misc.rkhs_product.rkhs_inner_product` requires the kernel to +# be defined as a function of two vector arguments that returns the matrix of +# values of the kernel in the corresponding grid. +# The following function defines the kernel as such a function. + + +def brownian_bridge_covariance( + t: NDArrayFloat, + s: NDArrayFloat, +) -> NDArrayFloat: + """ + Covariance function of the Brownian Bridge process. + + This function must receive two vectors of points while returning the + matrix of values of the covariance function in the corresponding grid. + """ + t_col = t[:, None] + s_row = s[None, :] + return np.minimum(t_col, s_row) - t_col * s_row + + +############################################################################### +# The RKHS of this kernel :footcite:p:`berlinet+thomas-agnan_2011_reproducing` +# is the set of functions +# +# .. math:: +# f: [0, 1] \to \mathbb{R} \quad \text{ such that } +# f \text{ is absolutely continuous, } +# f(0) = f(1) = 0 \text{ and } +# f' \in L^2([0, 1]). +# +# For this example we will be using the following functions in this RKHS: +# +# .. math:: +# \begin{align} +# f(t) &= 1 - (2t - 1)^2 \\ +# g(t) &= \sin(\pi t) +# \end{align} +# +# The following code defines a method to compute the inner product of these +# two functions in the RKHS of the Brownian Bridge, using a variable number of +# points of discretization of the functions. + +def brownian_bridge_rkhs_inner_product( + num_points: int, +) -> float: + """Inner product of two functions in the RKHS of the Brownian Bridge.""" + # Define the functions + # Remove first and last points to avoid a singular covariance matrix + grid_points = np.linspace(0, 1, num_points + 2)[1:-1] + f = FDataGrid( + [1 - (2 * grid_points - 1)**2], + grid_points, + ) + g = FDataGrid( + [np.sin(np.pi * grid_points)], + grid_points, + ) + + # Compute the inner product + return rkhs_inner_product( # type: ignore[no-any-return] + fdata1=f, + fdata2=g, + cov_function=brownian_bridge_covariance, + )[0] + + +# Plot the functions f and g in the same figure +FDataGrid( + np.concatenate( + [ + 1 - (2 * np.linspace(0, 1, 100) - 1)**2, + np.sin(np.pi * np.linspace(0, 1, 100)), + ], + axis=0, + ).reshape(2, 100), + np.linspace(0, 1, 100), +).plot() +plt.show() + +############################################################################### +# The inner product of two functions :math:`f, g` in this RKHS +# :footcite:p:`berlinet+thomas-agnan_2011_reproducing` is +# +# .. math:: +# \langle f, g \rangle = \int_0^1 f'(t) g'(t) dt. +# +# Therefore, the exact value of the product of these two functions in the RKHS +# of the Brownian Bridge can be explicitly calculated. +# First, we have that their derivatives are +# +# .. math:: +# \begin{align} +# f'(t) &= 4(1 - 2t) \\ +# g'(t) &= \pi \cos(\pi t) +# \end{align} +# +# Then, the inner product in :math:`L^2([0, 1])` of these derivatives is +# +# .. math:: +# \begin{align} +# \langle f', g' \rangle &= \int_0^1 f'(t) g'(t) dt \\ +# &= \int_0^1 4(1 - 2t) \pi \cos(\pi t) dt \\ +# &= \frac{16}{\pi}. +# \end{align} +# +# Which is the exact value of their inner product in the RKHS of the Brownian +# Bridge. +# Thus, we measure the difference between the exact value and the value +# computed by the method :func:`~skfda.misc.rkhs_product.rkhs_inner_product` +# for increasing numbers of discretization points. +# In particular, we will be using from 500 to 10000 points with a step of 500. +# +# The following code computes the inner product for each number of points and +# plots the difference between the exact value and the computed value. + +num_points_list = np.arange( + start=500, + stop=10001, + step=500, +) +expected_value = 16 / np.pi + +errors_df = pd.DataFrame( + columns=["Number of points of discretization", "Absolute error"], +) + +for num_points in num_points_list: + computed_value = brownian_bridge_rkhs_inner_product(num_points) + error = np.abs(computed_value - expected_value) + + # Add new row to the dataframe + errors_df.loc[len(errors_df)] = [num_points, error] + +# Plot the errors +errors_df.plot( + x="Number of points of discretization", + y="Absolute error", + title="Absolute error of the inner product", + xlabel="Number of points of discretization", + ylabel="Absolute error", +) +plt.show() + + +############################################################################### +# The following code plots the errors using a logarithmic scale in the y-axis. + +errors_df.plot( + x="Number of points of discretization", + y="Absolute error", + title="Absolute error of the inner product", + xlabel="Number of points of discretization", + ylabel="Absolute error", + logy=True, +) +plt.show() + +############################################################################### +# This example shows the convergence of the method +# :func:`~skfda.misc.rkhs_product.rkhs_inner_product` for the Brownian Bridge +# kernel, while also showing how to apply this method using a custom covariance +# function. + +############################################################################### +# **References:** +# .. footbibliography:: diff --git a/skfda/misc/covariances.py b/skfda/misc/covariances.py index 43635eb02..64aeb44de 100644 --- a/skfda/misc/covariances.py +++ b/skfda/misc/covariances.py @@ -808,6 +808,8 @@ def __call__(self, x: ArrayLike, y: ArrayLike) -> NDArrayFloat: Returns: Covariance function evaluated at the grid formed by x and y. """ + x = _transform_to_2d(x) + y = _transform_to_2d(y) return self.cov_fdata([x, y], grid=True)[0, ..., 0] diff --git a/skfda/misc/rkhs_product.py b/skfda/misc/rkhs_product.py new file mode 100644 index 000000000..ac0074473 --- /dev/null +++ b/skfda/misc/rkhs_product.py @@ -0,0 +1,256 @@ +"""RKHS inner product of two FData objects.""" +from __future__ import annotations + +from typing import Callable, Optional + +import multimethod +import numpy as np +import scipy.linalg + +from ..misc.covariances import EmpiricalBasis +from ..representation import FData, FDataBasis, FDataGrid +from ..representation.basis import Basis, TensorBasis +from ..typing._numpy import NDArrayFloat +from ._math import inner_product +from .validation import check_fdata_dimensions + + +def _broadcast_samples( + fdata1: FData, + fdata2: FData, +) -> tuple[FData, FData]: + """Broadcast samples of two FData objects. + + Args: + fdata1: First FData object. + fdata2: Second FData object. + + Returns: + Tuple of FData objects with the same number of samples. + + Raises: + ValueError: If the number of samples is not the same or if the number + """ + if fdata1.n_samples == 1: + fdata1 = fdata1.repeat(fdata2.n_samples) + elif fdata2.n_samples == 1: + fdata2 = fdata2.repeat(fdata1.n_samples) + elif fdata1.n_samples != fdata2.n_samples: + raise ValueError( + "Invalid number of samples for functional data objects:" + f"{fdata1.n_samples} != {fdata2.n_samples}.", + ) + return fdata1, fdata2 + + +def _get_coeff_matrix( + cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat], + basis1: Basis, + basis2: Basis, +) -> NDArrayFloat: + """Return the matrix of coefficients of a function in a tensor basis. + + Args: + cov_function: Covariance function as a callable. It is expected to + receive two arrays, s and t, and return the corresponding + covariance matrix. + basis1: First basis. + basis2: Second basis. + + Returns: + Matrix of coefficients of the covariance function in the tensor + basis formed by the two given bases. + """ + # In order to use inner_product, the callable must follow the + # same convention as the evaluation on FDataGrids, that is, it + # is expected to receive a single bidimensional point as input + def cov_function_pointwise( # noqa: WPS430 + x: NDArrayFloat, + ) -> NDArrayFloat: + t = np.array([x[0]]) + s = np.array([x[1]]) + return cov_function(t, s)[..., np.newaxis] + + tensor_basis = TensorBasis([basis1, basis2]) + + # Solving the system yields the coefficients of the covariance + # as a vector that is reshaped to form a matrix + return np.linalg.solve( + tensor_basis.gram_matrix(), + inner_product( + cov_function_pointwise, + tensor_basis, + _domain_range=tensor_basis.domain_range, + ).T, + ).reshape(basis1.n_basis, basis2.n_basis) + + +@multimethod.multidispatch +def rkhs_inner_product( + fdata1: FData, + fdata2: FData, + *, + cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat], +) -> NDArrayFloat: + """RKHS inner product of two FData objects. + + Args: + fdata1: First FData object. + fdata2: Second FData object. + cov_function: Covariance function as a callable. + + Returns: + Matrix of the RKHS inner product between paired samples of + fdata1 and fdata2. + """ + raise NotImplementedError( + "RKHS inner product not implemented for the given types.", + ) + + +@rkhs_inner_product.register(FDataGrid, FDataGrid) +def rkhs_inner_product_fdatagrid( + fdata1: FDataGrid, + fdata2: FDataGrid, + *, + cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat], + cond: Optional[float] = None, +) -> NDArrayFloat: + r"""RKHS inner product of two univariate FDataGrid objects. + + This is the most general method for calculating the RKHS inner product + using a discretization of the domain. The RKHS inner product is calculated + as the inner product between the square root inverse of the covariance + operator applied to the functions. + Assuming the existence of such inverse, using the self-adjointness of the + covariance operator, the RKHS inner product can be calculated as: + \langle f, \mathcal{K}^{-1} g \rangle + When discretizing a common domain, this is equivalent to doing the matrix + product between the discretized functions and the inverse of the + covariance matrix. + In case of having different discretization grids, the left inverse of the + transposed covariace matrix is used instead. + + If one of the FDataGrid terms consists of only one sample, it is repeated + to match the number of samples of the other term. + + Args: + fdata1: First FDataGrid object. + fdata2: Second FDataGrid object. + cov_function: Covariance function as a callable. It is expected to + receive two arrays, s and t, and return the corresponding + covariance matrix. + cond: Cutoff for small singular values of the covariance matrix. + Default uses scipy default. + + Returns: + Matrix of the RKHS inner product between paired samples of + fdata1 and fdata2. + """ + # Check univariate and apply broadcasting + check_fdata_dimensions(fdata1, dim_domain=1, dim_codomain=1) + check_fdata_dimensions(fdata2, dim_domain=1, dim_codomain=1) + fdata1, fdata2 = _broadcast_samples(fdata1, fdata2) + + data_matrix_1 = fdata1.data_matrix + data_matrix_2 = fdata2.data_matrix + grid_points_1 = fdata1.grid_points[0] + grid_points_2 = fdata2.grid_points[0] + cov_matrix_1_2 = cov_function(grid_points_1, grid_points_2) + + # Calculate the inverse operator applied to fdata2 + if np.array_equal(grid_points_1, grid_points_2): + inv_fd2_matrix = np.linalg.solve( + cov_matrix_1_2, + data_matrix_2, + ) + else: + inv_fd2_matrix = np.asarray( + scipy.linalg.lstsq( + cov_matrix_1_2.T, + data_matrix_2[..., 0].T, + cond=cond, + )[0], + ).T[..., np.newaxis] + + products = ( + np.transpose(data_matrix_1, axes=(0, 2, 1)) + @ inv_fd2_matrix + ) + # Remove redundant dimensions + return products.reshape(products.shape[0]) + + +@rkhs_inner_product.register(FDataBasis, FDataBasis) +def rkhs_inner_product_fdatabasis( + fdata1: FDataBasis, + fdata2: FDataBasis, + *, + cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat], +) -> NDArrayFloat: + """RKHS inner product of two FDataBasis objects. + + In case of using a basis expression, the RKHS inner product can be + computed obtaining first a basis representation of the covariance + function in the tensor basis. Then, the inverse operator applied to + the second term is calculated. + In case of using a common basis, that step is done by solving the + system given by the inverse of the matrix of coefficients of the + covariance function in the tensor basis formed by the two given bases. + In case of using different bases, the left inverse of the transposed + matrix of coefficients of the covariance function is used instead. + Finally, the inner product between each pair of samples is calculated. + + In case of knowing the matrix of coefficients of the covariance function + in the tensor basis formed by the two given bases, it can be passed as + an argument to avoid the calculation of it. + + If one of the FDataBasis terms consists of only one sample, it is repeated + to match the number of samples of the other term. + + Args: + fdata1: First FDataBasis object. + fdata2: Second FDataBasis object. + cov_function: Covariance function as a callable. It is expected to + receive two arrays, s and t, and return the corresponding + covariance matrix. + + Returns: + Matrix of the RKHS inner product between paired samples of + fdata1 and fdata2. + """ + # Check univariate and apply broadcasting + check_fdata_dimensions(fdata1, dim_domain=1, dim_codomain=1) + check_fdata_dimensions(fdata2, dim_domain=1, dim_codomain=1) + fdata1, fdata2 = _broadcast_samples(fdata1, fdata2) + + if isinstance(cov_function, EmpiricalBasis): + cov_coeff_matrix = cov_function.coeff_matrix + else: + # Express the covariance function in the tensor basis + # NOTE: The alternative is to convert to FDatagrid the two FDataBasis + cov_coeff_matrix = _get_coeff_matrix( + cov_function, + fdata1.basis, + fdata2.basis, + ) + + if fdata1.basis == fdata2.basis: + inv_fd2_coefs = np.linalg.solve( + cov_coeff_matrix, + fdata2.coefficients.T, + ) + else: + inv_fd2_coefs = np.linalg.lstsq( + cov_coeff_matrix.T, + fdata2.coefficients.T, + rcond=None, + )[0] + + # Following einsum is equivalent to doing the matrix multiplication + # and then taking the diagonal of the result + return np.einsum( # type: ignore[no-any-return] + "ij,ji->i", + fdata1.coefficients, + inv_fd2_coefs, + ) diff --git a/skfda/tests/test_rkhs_inner_product.py b/skfda/tests/test_rkhs_inner_product.py new file mode 100644 index 000000000..12bf66503 --- /dev/null +++ b/skfda/tests/test_rkhs_inner_product.py @@ -0,0 +1,221 @@ +"""Test the RKHS inner product method.""" +from typing import Any + +import numpy as np +import pytest + +from skfda._utils import constants +from skfda.misc.rkhs_product import rkhs_inner_product +from skfda.representation import FDataBasis, FDataGrid +from skfda.representation.basis import ( + Basis, + BSplineBasis, + FourierBasis, + TensorBasis, +) +from skfda.typing._numpy import NDArrayFloat + +N_BASIS = 5 +DOMAIN_RANGE = (0, 1) + + +@pytest.fixture( + params=[ + FourierBasis, + BSplineBasis, + ], + ids=[ + "FourierBasis", + "BSplineBasis", + ], +) +def basis( + request: Any, +) -> Any: + """Fixture for classes to test.""" + return request.param( + n_basis=N_BASIS, + domain_range=DOMAIN_RANGE, + ) + + +def test_rkhs_inner_product_grid() -> None: + """Test the RKHS product for specific FDataGrid. + + The tested functions are: + x(t) = t + y(t) = t/3 + 1/2 + K(t,s) = t*s + 1 + + The expected result of the RKHS product is: + _K = 1/3 + """ + grid_points = np.linspace( + *DOMAIN_RANGE, + constants.N_POINTS_FINE_MESH, + ) + x = FDataGrid( + [grid_points], + grid_points, + ) + y = (x / 3) + (1 / 2) + + def cov_function( # noqa: WPS430 + t: NDArrayFloat, + s: NDArrayFloat, + ) -> NDArrayFloat: + return np.outer(t, s) + 1 + + result = rkhs_inner_product( + fdata1=x, + fdata2=y, + cov_function=cov_function, + ) + expected_result = [1 / 3] + np.testing.assert_allclose( + result, + expected_result, + rtol=1e-13, + ) + + # Test the product is symmetric + result = rkhs_inner_product( + fdata1=y, + fdata2=x, + cov_function=cov_function, + ) + np.testing.assert_allclose( + result, + expected_result, + rtol=1e-13, + ) + + +def test_rkhs_inner_product_basis( + basis: Basis, +) -> None: + """Test the RKHS product for specific FDataBasis. + + The tested functions are: + x(t) = sin(2*pi*t) + y(t) = phi_0(t) + phi_2(t) + K(t,s) = Phi(t)^T B Phi(s) + Where Phi = {phi_0, phi_1, phi_2} is the Fourier basis and B is a + specific symmetric definite-positive matrix. + The inverse of the operator applied to y is phi_2(t). + + The expected result of the RKHS product is: + _K = 0 + """ + grid_points = np.linspace( + *DOMAIN_RANGE, + constants.N_POINTS_FINE_MESH, + ) + x = FDataGrid( + [np.sin(2 * np.pi * grid_points)], + grid_points, + ) + + # Define y and cov_function in term of basis_1 + basis_1 = FourierBasis( + n_basis=3, + domain_range=DOMAIN_RANGE, + ) + y = FDataBasis( + basis=basis_1, + coefficients=[[1, 0, 1]], + ) + kernel = FDataBasis( + basis=TensorBasis([basis_1, basis_1]), + coefficients=np.array([ + [3, 1, 1], + [1, 2, 0], + [1, 0, 1], + ]).flatten(), + ) + + def cov_function( # noqa: WPS430 + t: NDArrayFloat, + s: NDArrayFloat, + ) -> NDArrayFloat: + return kernel([s, t], grid=True)[0, ..., 0] + + x_basis = x.to_basis(basis_1) + y_basis = y.to_basis(basis) + + result = rkhs_inner_product( + fdata1=x_basis, + fdata2=y_basis, + cov_function=cov_function, + ) + expected_result = [0] + np.testing.assert_allclose( + result, + expected_result, + atol=1e-4, + ) + + # Test the product is symmetric + result = rkhs_inner_product( + fdata1=y_basis, + fdata2=x_basis, + cov_function=cov_function, + ) + np.testing.assert_allclose( + result, + expected_result, + atol=1e-4, + ) + + +def test_rkhs_inner_product_empirical_covariance( + basis: Basis, +) -> None: + """Test the RKHS product using the empirical covariance. + + This test checks that the function rkhs_inner_product works with a given + matrix of coefficients, that is, the covariance function comes already + expressed in the tensor basis via the empirical covariance matrix. + In this test, we use a known set of coefficients from which we have + computed the empirical covariance matrix and its inner product. + + The coefficients for the dataset are: + [1,2,3,4,5], = x + [1,0,1,1,0], = y + [0,1,0,1,1], + [1,2,1,1,1], + [1,1,1,0,1], + [2,1,3,4,5] + The covariance function is given by its empirical covariance. + The terms to compute the inner product are the first and second functions + of the dataset. + The expected result of the RKHS product is: + _K = 236*5/225 + """ + coefficients = np.array([ + [1, 2, 3, 4, 5], + [1, 0, 1, 1, 0], + [0, 1, 0, 1, 1], + [1, 2, 1, 1, 1], + [1, 1, 1, 0, 1], + [2, 1, 3, 4, 5], + ]) + + fdata = FDataBasis( + basis=basis, + coefficients=coefficients, + ) + x = fdata[0] + y = fdata[1] + + result = rkhs_inner_product( + fdata1=x, + fdata2=y, + cov_function=fdata.cov(), + ) + expected_result = [236 * 5 / 225] + np.testing.assert_allclose( + result, + expected_result, + rtol=1e-13, + )