From c1b95c9e925e30ca196f642a6a39cd785b2da501 Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Thu, 11 Apr 2024 09:04:49 +0200 Subject: [PATCH 1/6] included euler_maruyama function --- docs/modules/datasets.rst | 1 + skfda/datasets/__init__.py | 2 + skfda/datasets/_samples_generators.py | 198 +++++++- skfda/tests/test_euler_maruyama.py | 645 ++++++++++++++++++++++++++ 4 files changed, 844 insertions(+), 2 deletions(-) create mode 100644 skfda/tests/test_euler_maruyama.py diff --git a/docs/modules/datasets.rst b/docs/modules/datasets.rst index f5ed9f8da..b795df85b 100644 --- a/docs/modules/datasets.rst +++ b/docs/modules/datasets.rst @@ -47,6 +47,7 @@ The following functions are used to make synthetic functional datasets: .. autosummary:: :toctree: autosummary + skfda.datasets.euler_maruyama skfda.datasets.make_gaussian skfda.datasets.make_gaussian_process skfda.datasets.make_sinusoidal_process diff --git a/skfda/datasets/__init__.py b/skfda/datasets/__init__.py index 8abd0dfda..01f53197e 100644 --- a/skfda/datasets/__init__.py +++ b/skfda/datasets/__init__.py @@ -23,6 +23,7 @@ "fetch_bone_density", ], "_samples_generators": [ + "euler_maruyama", "make_gaussian", "make_gaussian_process", "make_multimodal_landmarks", @@ -51,6 +52,7 @@ fetch_weather as fetch_weather, ) from ._samples_generators import ( + euler_maruyama as euler_maruyama, make_gaussian as make_gaussian, make_gaussian_process as make_gaussian_process, make_multimodal_landmarks as make_multimodal_landmarks, diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index 65c13e6c1..f22739b2b 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -1,11 +1,12 @@ from __future__ import annotations import itertools -from typing import Callable, Sequence, Union +from typing import Callable, Sequence, Union, Any import numpy as np import scipy.integrate from scipy.stats import multivariate_normal +from typing_extensions import Protocol from .._utils import _cartesian_product, _to_grid_points, normalize_warping from ..misc.covariances import Brownian, CovarianceLike, _execute_covariance @@ -13,7 +14,7 @@ from ..representation import FDataGrid from ..representation.interpolation import SplineInterpolation from ..typing._base import DomainRangeLike, GridPointsLike, RandomStateLike -from ..typing._numpy import NDArrayFloat +from ..typing._numpy import NDArrayFloat, ArrayLike MeanCallable = Callable[[np.ndarray], np.ndarray] CovarianceCallable = Callable[[np.ndarray, np.ndarray], np.ndarray] @@ -21,6 +22,199 @@ MeanLike = Union[float, NDArrayFloat, MeanCallable] +class InitialValueGenerator(Protocol): + def __call__( + self, + size: int, + random_state: RandomStateLike + ) -> np.typing.NDArray[np.floating[Any]]: + ... + + +def euler_maruyama( + initial_condition: ArrayLike | InitialValueGenerator, + n_grid_points: int = 100, + drift: Callable[[float, np.typing.NDArray[np.floating[Any]]], + np.typing.NDArray[np.floating[Any]]] | None = None, + diffusion: Callable[[float, np.typing.NDArray[np.floating[Any]]], + np.typing.NDArray[np.floating[Any]]] | None = None, + n_samples: int | None = None, + start: float = 0.0, + stop: float = 1.0, + dim_noise: int | None = None, + random_state: RandomStateLike = None, +) -> FDataGrid: + r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + + .. math:: + + d X(t) = a(t, X(t)) d t + b(t,X(t)) d W(t). + + Args: + initial_condition: Initial condition of the SDE. It can have one of + three formats: + + - An starting initial value from which to + calculate *n_samples* trajectories. + + - An array of initial values. For each starting + point a trajectory will be calculated. + + - A function that generates random numbers or + vectors. It should have two parameters called + size and random_state and it should return an array. + + n_grid_points: The total number of points of evaluation. + drift: drift coefficient (:math:`a(t,X_t)` in the equation). + diffusion: diffusion coefficient (:math:`b(t,X_t)` in the equation). + n_samples: Number of trajectories integrated. + start: Starting time of the trajectories. + stop: Ending time of the trajectories. + dim_noise: dimension of the noise factor. By default is the data + dimension. + random_state: Random state. + + + Returns: + :class:`FDataGrid` object comprising all the trajectories. + + See also: + :func:`make_gaussian_process`: Simpler function for generating + Gaussian processes. + + Examples: + Example of the use of euler_maruyama for an SDE with constant + drift and diffusion and an initial condition which follows a + standard normal distribution. + + >>> from scipy.stats import norm + >>> def example_drift(t: float, x: np.ndarray) -> np.ndarray: + ... return 2 + >>> def example_diffusion(t: float, x: np.ndarray) -> np.ndarray: + ... return 0.5 + >>> initial_condition = norm().rvs + >>> + >>> trajectories = euler_maruyama( + ... initial_condition=initial_condition, + ... n_samples=10, + ... drift=example_drift, + ... diffusion=example_diffusion, + ... ) + >>> trajectories.show() + + """ + random_state = validate_random_state(random_state) + + if n_samples is None: + if ( + not isinstance(initial_condition, np.ndarray) + and not isinstance(initial_condition, list) + and not isinstance(initial_condition, float) + and not isinstance(initial_condition, int) + ): + raise ValueError( + "Invalid initial conditions. If a function is given, the \ + n_samples argument must be included." + ) + + initial_values = np.atleast_1d(initial_condition) + n_samples = len(initial_values) + else: + if callable(initial_condition): + initial_values = initial_condition( + size=n_samples, + random_state=random_state + ) + else: + initial_condition = np.atleast_1d(initial_condition) + dim_codomain = len(initial_condition) + initial_values = initial_condition * \ + np.ones((n_samples, dim_codomain)) + + if initial_values.ndim == 1: + initial_values = initial_values[:, np.newaxis] + elif initial_values.ndim > 2: + raise ValueError( + "Invalid initial conditions. Each of the starting points\ + must be a flat array." + ) + (n_samples, dim_codomain) = initial_values.shape + + if dim_noise is None: + dim_noise = dim_codomain + + def default_drift( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array(0) + + def default_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.eye(dim_codomain, dim_noise) + + if drift is None: + drift = default_drift + + if diffusion is None: + diffusion = default_diffusion + + test_diffusion_dim = np.atleast_1d(diffusion(start, initial_values)) + + if test_diffusion_dim.ndim == 1: + if (len(test_diffusion_dim) != dim_codomain + and len(test_diffusion_dim) != 1): + raise ValueError( + f"The diffusion function returns the wrong dimensions. \ + The expected dimension is {dim_codomain} or 1." + ) + formatting_matrix = np.eye(dim_codomain, dim_noise) + + elif test_diffusion_dim.shape[-2:] != (dim_codomain, dim_noise): + raise ValueError( + f"The diffusion function returns the wrong dimensions. \ + The expected dimensiones are {dim_codomain} x {dim_noise}." + ) + + else: + formatting_matrix = np.ones((dim_codomain, dim_noise)) + + def formatted_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return diffusion(t, x) * formatting_matrix + + data_matrix = np.zeros((n_samples, n_grid_points, dim_codomain)) + times = np.linspace(start, stop, n_grid_points) + step_size = times[1] - times[0] + noise = random_state.standard_normal( + size=(n_samples, n_grid_points - 1, dim_noise) + ) + data_matrix[:, 0] = initial_values + + for n in range(n_grid_points - 1): + t_n = times[n] + x_n = data_matrix[:, n] + + data_matrix[:, n + 1] = ( + x_n + + step_size * drift(t_n, x_n) + + np.einsum( + '...dj, ...j -> ...d', + formatted_diffusion(t_n, x_n), + noise[:, n] + ) * np.sqrt(step_size) + ) + + return FDataGrid( + grid_points=times, + data_matrix=data_matrix, + ) + + def make_gaussian( n_samples: int = 100, *, diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py new file mode 100644 index 000000000..9b673ffcf --- /dev/null +++ b/skfda/tests/test_euler_maruyama.py @@ -0,0 +1,645 @@ +"""Tests of Euler Maruyama""" + +import numpy as np + +from skfda.datasets import euler_maruyama +from typing import Any +from scipy.stats import norm, multivariate_normal + + +def test_one_initial_point() -> None: + """Case 1 -> One initial point + n_samples > 0 + Tests: + - initial_condition = 0, n_samples = 2 + - initial_condition = [0], n_samples = 2. + # The result should be the same as the first one + - initial_condition = np.array([0, 0]), n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + # Case 1 Starting point = float + n_samples + initial_float = 0 + + expected_result = np.array([[[0.], [0.81217268], + [0.50629448], [0.2422086], + [-0.29427571]], + [[0.], [0.43270381], + [-0.71806553], [0.15434035], + [-0.2262631]]]) + + fd = euler_maruyama( + initial_float, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 2 Starting point = float in an array format + n_samples + # The result must be the same as in Case 1 + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_float_in_list = [0] + + expected_result = np.array([[[0.], [0.81217268], + [0.50629448], [0.2422086], + [-0.29427571]], + [[0.], [0.43270381], + [-0.71806553], [0.15434035], + [-0.2262631]]]) + + fd = euler_maruyama( + initial_float_in_list, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 3 Starting point = array + n_samples + # random_state = np.random.RandomState(1) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_array = np.array([0, 0]) + + expected_result = np.array([[[0., 0.], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532]], + [[0., 0.], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835]]]) + + fd = euler_maruyama( + initial_array, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_initial_data_generator() -> None: + """Case 2 -> A function that generates random variables or vectors + + n_samples > 0 + Tests: + - initial_condition = random_state.standard_normal, + n_samples = 2. + - initial_condition = random_state.standard_normal_2d, + n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + + # Case 1 Starting generator of floats + n_samples + # standard_normal_generator = random_state.standard_normal + random_state = np.random.RandomState(1) + + def standard_normal_generator( + size: int, + random_state: np.random.RandomState + ) -> np.typing.NDArray[np.floating[Any]]: + return ( + random_state.standard_normal(size) + ) + + expected_result = np.array([[[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964]], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962]]]) + + fd = euler_maruyama( + standard_normal_generator, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 2 Starting generator of vecotrs + n_samples + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def standard_normal_generator_2d( + size: int, + random_state: np.random.RandomState + ) -> np.typing.NDArray[np.floating[Any]]: + return random_state.standard_normal((size, 2)) + + expected_result = np.array([[[1.62434536, -0.61175641], + [2.05704918, -1.76252576], + [2.92945506, -2.14312921], + [3.08897461, -2.2678144], + [3.82002858, -3.29788476]], + [[-0.52817175, -1.07296862], + [-0.68938035, -1.2649958], + [-0.12249563, -1.81494143], + [-0.20870974, -2.25387064], + [-0.18760286, -1.96246304]]]) + + fd = euler_maruyama( + standard_normal_generator_2d, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_initial_distribution() -> None: + """Case 3 -> A scipy.stats distribution that generates data with the + rvs method + Tests: + - initial_condition = scipy.stats.norm, n_samples = 2 + - initial_condition = scipy.stats.multivariate_norm, + n_samples = 2. + """ + # random_state = np.random.RandomState(1) + n_samples = 2 + n_grid_points = 5 + + # Case 1 Starting 1-d distribution + n_samples + # The result should be the same as with the generator function + random_state = np.random.RandomState(1) + normal_distribution = norm().rvs + + expected_result = np.array([[[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964]], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962]]]) + + fd = euler_maruyama( + normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 2 Starting n-d distribution + n_samples + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + mean = np.array([0, 1]) + covarianze_matrix = np.array([[1, 0], [0, 1]]) + + multivariate_normal_distribution = ( + multivariate_normal(mean, covarianze_matrix).rvs + ) + + expected_result = np.array([[[1.62434536, 0.38824359], + [2.05704918, -0.76252576], + [2.92945506, -1.14312921], + [3.08897461, -1.2678144], + [3.82002858, -2.29788476]], + [[-0.52817175, -0.07296862], + [-0.68938035, -0.2649958], + [-0.12249563, -0.81494143], + [-0.20870974, -1.25387064], + [-0.18760286, -0.96246304]]]) + + fd = euler_maruyama( + multivariate_normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_vector_initial_points() -> None: + """Caso 4 -> A vector of initial points + Tests: + - initial_condition = 0 + - initial_condition = np.array([0, 1]) + - initial_condition = np.array([[0, 1], + [2, 0]]) + """ + n_grid_points = 5 + + # Case 1. One Starting 1d point = float + initial_float = 0 + random_state = np.random.RandomState(1) + expected_result = np.array([[[0.], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571]]]) + + fd = euler_maruyama( + initial_float, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 2. Two Starting 1d points + n_grid_points = 5 + initial_array = np.array([0, 1]) + random_state = np.random.RandomState(1) + expected_result = np.array([[[0.], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571]], + [[1.], + [1.43270381], + [0.28193447], + [1.15434035], + [0.7737369]]]) + + fd = euler_maruyama( + initial_array, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # Case 3. Starting vector + n_grid_points = 5 + initial_array = np.array([[0, 1], [2, 0]]) + random_state = np.random.RandomState(1) + expected_result = np.array([[[0., 1.], + [0.81217268, 0.69412179], + [0.54808681, 0.15763748], + [0.98079062, -0.99313187], + [1.8531965, -1.37373532]], + [[2., 0.], + [2.15951955, -0.12468519], + [2.89057352, -1.15475554], + [2.72936491, -1.34678272], + [3.29624964, -1.89672835]]]) + + fd = euler_maruyama( + initial_array, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_drift_cases() -> None: + """Test for different drift inputs""" + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def base_drift( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> float: + return 0 + + def base_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> float: + return 1 + + expected_result = np.array([[[0., 0.], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532]], + [[0., 0.], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835]]]) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=base_drift, + diffusion=base_diffusion, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def vector_drift( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array([0, 0]) + + expected_result = np.array([[[0., 0.], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532]], + [[0., 0.], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835]]]) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=vector_drift, + diffusion=base_diffusion, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_diffusion_cases() -> None: + """Test for different diffusion inputs""" + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def base_drift( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> float: + return 0 + + def vector_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array([1, 2]) + + # The result should be the same as the previous test, but with + # the second column doubled + expected_result = np.array([[[0., 0.], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532]], + [[0., 0.], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835]]]) + expected_result[:, :, 1] *= 2 + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=base_drift, + diffusion=vector_diffusion, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + # The expected result should be the same than in test 2 of case 1 of + # initial_conditions tests but adding a second column that is the + # first doubled + + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def matrix_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array([[1], [2]]) + + expected_result = np.array([[[0.], [0.81217268], + [0.50629448], [0.2422086], + [-0.29427571]], + [[0.], [0.43270381], + [-0.71806553], [0.15434035], + [-0.2262631]]]) + expected_result = np.concatenate( + (expected_result, 2 * expected_result), + axis=2 + ) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=base_drift, + diffusion=matrix_diffusion, + random_state=random_state, + dim_noise=1 + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) + + +def test_grid_points() -> None: + """Test for checking that the grid_points are correct""" + random_state = np.random.RandomState(1) + initial_condition = np.array([0, 1]) + start = 0 + stop = 10 + n_grid_points = 105 + expected_grid_points = np.atleast_2d( + np.linspace(start, stop, n_grid_points) + ) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=10, + start=start, + stop=stop, + random_state=random_state + ) + + np.testing.assert_array_equal( + fd.grid_points, + expected_grid_points + ) + + +def test_initial_condition_negative_cases() -> None: + """Test for checking initial_condition related error cases""" + random_state = np.random.RandomState(1) + + # Case 1 Starting vector of points and n_samples + initial_condition = np.zeros((2, 2)) + n_samples = 3 + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + n_samples=n_samples, + random_state=random_state, + ) + + # Case 2: Inital condition is an array of more than 2d + initial_condition = np.zeros((1, 1, 1)) + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + random_state=random_state, + ) + + # Case 3: generator function without n_samples + initial_generator = random_state.standard_normal + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_generator, + random_state=random_state, + ) + + # Case 4: n_samples not greater than 0 + + with np.testing.assert_raises(ValueError): + euler_maruyama( + 0, + n_samples=-1, + random_state=random_state, + ) + + +def test_diffusion_negative_cases() -> None: + """Test for checking diffusion related error cases""" + initial_condition = np.array([0, 0]) + n_samples = 2 + random_state = np.random.RandomState(1) + + def bad_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array([[1, 2]]) + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + n_samples=n_samples, + diffusion=bad_diffusion, + random_state=random_state, + ) + + def vector_diffusion( + t: float, + x: np.typing.NDArray[np.floating[Any]] + ) -> np.typing.NDArray[np.floating[Any]]: + return np.array([1, 2]) + + initial_condition = np.array([0, 0, 0]) + # random_state = np.random.RandomState(1) + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + n_samples=n_samples, + diffusion=vector_diffusion, + random_state=random_state, + ) + + +def test_random_generator() -> None: + n_samples = 2 + n_grid_points = 5 + random_state = np.random.default_rng(seed=1) + normal_distribution = norm().rvs + + expected_result = np.array([[[0.34558419], + [0.51080273], + [-0.14077589], + [0.31190205], + [0.53508933]], + + [[0.82161814], + [0.55314153], + [0.84370058], + [1.02598678], + [1.17305302]]]) + + fd = euler_maruyama( + normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result + ) From 492a7756cd18d41685196b0ca31ff70a8577b935 Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Fri, 12 Apr 2024 13:52:03 +0200 Subject: [PATCH 2/6] Fixed style errors --- skfda/datasets/_samples_generators.py | 94 +++-- skfda/tests/test_euler_maruyama.py | 472 +++++++++++++++----------- 2 files changed, 312 insertions(+), 254 deletions(-) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index f22739b2b..a1f74c055 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import Callable, Sequence, Union, Any +from typing import Any, Callable, Sequence, Union import numpy as np import scipy.integrate @@ -14,7 +14,7 @@ from ..representation import FDataGrid from ..representation.interpolation import SplineInterpolation from ..typing._base import DomainRangeLike, GridPointsLike, RandomStateLike -from ..typing._numpy import NDArrayFloat, ArrayLike +from ..typing._numpy import ArrayLike, NDArrayFloat MeanCallable = Callable[[np.ndarray], np.ndarray] CovarianceCallable = Callable[[np.ndarray, np.ndarray], np.ndarray] @@ -23,11 +23,14 @@ class InitialValueGenerator(Protocol): + """Class to represent SDE initial value generators.""" + def __call__( self, size: int, - random_state: RandomStateLike + random_state: RandomStateLike, ) -> np.typing.NDArray[np.floating[Any]]: + """Interface of initial value generator.""" ... @@ -39,34 +42,28 @@ def euler_maruyama( diffusion: Callable[[float, np.typing.NDArray[np.floating[Any]]], np.typing.NDArray[np.floating[Any]]] | None = None, n_samples: int | None = None, - start: float = 0.0, + start: float = 0, stop: float = 1.0, dim_noise: int | None = None, random_state: RandomStateLike = None, ) -> FDataGrid: - r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + """Numerical integration of an Itô SDE using the Euler-Maruyana scheme. .. math:: - d X(t) = a(t, X(t)) d t + b(t,X(t)) d W(t). + d X(t) = f(t, X(t)) d t + g(t,X(t)) d W(t). Args: initial_condition: Initial condition of the SDE. It can have one of - three formats: - - - An starting initial value from which to - calculate *n_samples* trajectories. - - - An array of initial values. For each starting - point a trajectory will be calculated. - - - A function that generates random numbers or - vectors. It should have two parameters called - size and random_state and it should return an array. - + three formats: An starting initial value from which to + calculate *n_samples* trajectories. An array of initial values. + For each starting point a trajectory will be calculated. A + function that generates random numbers or vectors. It should + have two parameters called size and random_state and it should + return an array. n_grid_points: The total number of points of evaluation. - drift: drift coefficient (:math:`a(t,X_t)` in the equation). - diffusion: diffusion coefficient (:math:`b(t,X_t)` in the equation). + drift: drift coefficient (:math:`f(t,X_t)` in the equation). + diffusion: diffusion coefficient (:math:`g(t,X_t)` in the equation). n_samples: Number of trajectories integrated. start: Starting time of the trajectories. stop: Ending time of the trajectories. @@ -100,21 +97,15 @@ def euler_maruyama( ... drift=example_drift, ... diffusion=example_diffusion, ... ) - >>> trajectories.show() """ random_state = validate_random_state(random_state) if n_samples is None: - if ( - not isinstance(initial_condition, np.ndarray) - and not isinstance(initial_condition, list) - and not isinstance(initial_condition, float) - and not isinstance(initial_condition, int) - ): + if callable(initial_condition): raise ValueError( - "Invalid initial conditions. If a function is given, the \ - n_samples argument must be included." + "Invalid initial conditions. If a function is given, the " + "n_samples argument must be included.", ) initial_values = np.atleast_1d(initial_condition) @@ -123,20 +114,22 @@ def euler_maruyama( if callable(initial_condition): initial_values = initial_condition( size=n_samples, - random_state=random_state + random_state=random_state, ) else: initial_condition = np.atleast_1d(initial_condition) dim_codomain = len(initial_condition) - initial_values = initial_condition * \ - np.ones((n_samples, dim_codomain)) + initial_values = ( + initial_condition + * np.ones((n_samples, dim_codomain)) + ) if initial_values.ndim == 1: initial_values = initial_values[:, np.newaxis] elif initial_values.ndim > 2: raise ValueError( - "Invalid initial conditions. Each of the starting points\ - must be a flat array." + "Invalid initial conditions. Each of the starting points " + "must be a flat array.", ) (n_samples, dim_codomain) = initial_values.shape @@ -145,13 +138,13 @@ def euler_maruyama( def default_drift( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array(0) def default_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.eye(dim_codomain, dim_noise) @@ -164,26 +157,25 @@ def default_diffusion( test_diffusion_dim = np.atleast_1d(diffusion(start, initial_values)) if test_diffusion_dim.ndim == 1: - if (len(test_diffusion_dim) != dim_codomain - and len(test_diffusion_dim) != 1): + if len(test_diffusion_dim) not in {dim_codomain, 1}: raise ValueError( - f"The diffusion function returns the wrong dimensions. \ - The expected dimension is {dim_codomain} or 1." + f"The diffusion function returns the wrong dimensions." + f"The expected dimension is {dim_codomain} or 1.", ) formatting_matrix = np.eye(dim_codomain, dim_noise) - elif test_diffusion_dim.shape[-2:] != (dim_codomain, dim_noise): - raise ValueError( - f"The diffusion function returns the wrong dimensions. \ - The expected dimensiones are {dim_codomain} x {dim_noise}." - ) + elif test_diffusion_dim.shape[-2:] == (dim_codomain, dim_noise): + formatting_matrix = np.ones((dim_codomain, dim_noise)) else: - formatting_matrix = np.ones((dim_codomain, dim_noise)) + raise ValueError( + f"The diffusion function returns the wrong dimensions. " + f"The expected dimensiones are {dim_codomain} x {dim_noise}.", + ) def formatted_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return diffusion(t, x) * formatting_matrix @@ -191,7 +183,7 @@ def formatted_diffusion( times = np.linspace(start, stop, n_grid_points) step_size = times[1] - times[0] noise = random_state.standard_normal( - size=(n_samples, n_grid_points - 1, dim_noise) + size=(n_samples, n_grid_points - 1, dim_noise), ) data_matrix[:, 0] = initial_values @@ -205,8 +197,8 @@ def formatted_diffusion( + np.einsum( '...dj, ...j -> ...d', formatted_diffusion(t_n, x_n), - noise[:, n] - ) * np.sqrt(step_size) + noise[:, n], + ) * np.sqrt(step_size), ) return FDataGrid( @@ -675,7 +667,7 @@ def make_random_warping( np.square(v, out=v) # Creation of FDataGrid in the corresponding domain - data_matrix = scipy.integrate.cumulative_trapezoid( + data_matrix = scipy.integrate.cumtrapz( v, dx=1 / n_features, initial=0, diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py index 9b673ffcf..439554e41 100644 --- a/skfda/tests/test_euler_maruyama.py +++ b/skfda/tests/test_euler_maruyama.py @@ -1,19 +1,22 @@ -"""Tests of Euler Maruyama""" +"""Tests of Euler Maruyama.""" + +from typing import Any import numpy as np +from scipy.stats import multivariate_normal, norm from skfda.datasets import euler_maruyama -from typing import Any -from scipy.stats import norm, multivariate_normal def test_one_initial_point() -> None: - """Case 1 -> One initial point + n_samples > 0 - Tests: - - initial_condition = 0, n_samples = 2 - - initial_condition = [0], n_samples = 2. - # The result should be the same as the first one - - initial_condition = np.array([0, 0]), n_samples = 2. + """Case 1 -> One initial point + n_samples > 0. + + Tests: + - initial_condition = 0, n_samples = 2 + - initial_condition = [0], n_samples = 2. + # The result should be the same as the first one + - initial_condition = np.array([0, 0]), n_samples = 2. + """ n_samples = 2 n_grid_points = 5 @@ -21,12 +24,20 @@ def test_one_initial_point() -> None: # Case 1 Starting point = float + n_samples initial_float = 0 - expected_result = np.array([[[0.], [0.81217268], - [0.50629448], [0.2422086], - [-0.29427571]], - [[0.], [0.43270381], - [-0.71806553], [0.15434035], - [-0.2262631]]]) + expected_result = np.array([[ + [0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) fd = euler_maruyama( initial_float, @@ -37,7 +48,7 @@ def test_one_initial_point() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 2 Starting point = float in an array format + n_samples @@ -47,13 +58,6 @@ def test_one_initial_point() -> None: random_state = np.random.RandomState(1) initial_float_in_list = [0] - expected_result = np.array([[[0.], [0.81217268], - [0.50629448], [0.2422086], - [-0.29427571]], - [[0.], [0.43270381], - [-0.71806553], [0.15434035], - [-0.2262631]]]) - fd = euler_maruyama( initial_float_in_list, n_samples=n_samples, @@ -63,26 +67,30 @@ def test_one_initial_point() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 3 Starting point = array + n_samples - # random_state = np.random.RandomState(1) + n_samples = 2 n_grid_points = 5 random_state = np.random.RandomState(1) initial_array = np.array([0, 0]) - expected_result = np.array([[[0., 0.], - [0.81217268, -0.30587821], - [0.54808681, -0.84236252], - [0.98079062, -1.99313187], - [1.8531965, -2.37373532]], - [[0., 0.], - [0.15951955, -0.12468519], - [0.89057352, -1.15475554], - [0.72936491, -1.34678272], - [1.29624964, -1.89672835]]]) + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) fd = euler_maruyama( initial_array, @@ -93,44 +101,50 @@ def test_one_initial_point() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_initial_data_generator() -> None: - """Case 2 -> A function that generates random variables or vectors + - n_samples > 0 - Tests: - - initial_condition = random_state.standard_normal, - n_samples = 2. - - initial_condition = random_state.standard_normal_2d, - n_samples = 2. + """. + + Case 2 -> A function that generates random variables or vectors + + n_samples > 0. + + Tests: + - initial_condition = random_state.standard_normal, + n_samples = 2. + - initial_condition = random_state.standard_normal_2d, + n_samples = 2. """ n_samples = 2 n_grid_points = 5 # Case 1 Starting generator of floats + n_samples - # standard_normal_generator = random_state.standard_normal random_state = np.random.RandomState(1) def standard_normal_generator( size: int, - random_state: np.random.RandomState + random_state: np.random.RandomState, ) -> np.typing.NDArray[np.floating[Any]]: return ( random_state.standard_normal(size) ) - expected_result = np.array([[[1.62434536], - [1.36025949], - [0.82377518], - [1.25647899], - [0.10570964]], - [[-0.61175641], - [0.26064947], - [-0.11995398], - [0.03956557], - [-0.08511962]]]) + expected_result = np.array([ + [[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964], + ], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962], + ], + ]) fd = euler_maruyama( standard_normal_generator, @@ -141,7 +155,7 @@ def standard_normal_generator( np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 2 Starting generator of vecotrs + n_samples @@ -151,20 +165,24 @@ def standard_normal_generator( def standard_normal_generator_2d( size: int, - random_state: np.random.RandomState + random_state: np.random.RandomState, ) -> np.typing.NDArray[np.floating[Any]]: return random_state.standard_normal((size, 2)) - expected_result = np.array([[[1.62434536, -0.61175641], - [2.05704918, -1.76252576], - [2.92945506, -2.14312921], - [3.08897461, -2.2678144], - [3.82002858, -3.29788476]], - [[-0.52817175, -1.07296862], - [-0.68938035, -1.2649958], - [-0.12249563, -1.81494143], - [-0.20870974, -2.25387064], - [-0.18760286, -1.96246304]]]) + expected_result = np.array([ + [[1.62434536, -0.61175641], + [2.05704918, -1.76252576], + [2.92945506, -2.14312921], + [3.08897461, -2.2678144], + [3.82002858, -3.29788476], + ], + [[-0.52817175, -1.07296862], + [-0.68938035, -1.2649958], + [-0.12249563, -1.81494143], + [-0.20870974, -2.25387064], + [-0.18760286, -1.96246304], + ], + ]) fd = euler_maruyama( standard_normal_generator_2d, @@ -175,19 +193,20 @@ def standard_normal_generator_2d( np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_initial_distribution() -> None: - """Case 3 -> A scipy.stats distribution that generates data with the - rvs method - Tests: - - initial_condition = scipy.stats.norm, n_samples = 2 - - initial_condition = scipy.stats.multivariate_norm, - n_samples = 2. + """. + + Case 3 -> A scipy.stats distribution that generates data with the + rvs method. + Tests: + - initial_condition = scipy.stats.norm().rvs, n_samples = 2 + - initial_condition = scipy.stats.multivariate_norm().rvs, + n_samples = 2. """ - # random_state = np.random.RandomState(1) n_samples = 2 n_grid_points = 5 @@ -196,16 +215,20 @@ def test_initial_distribution() -> None: random_state = np.random.RandomState(1) normal_distribution = norm().rvs - expected_result = np.array([[[1.62434536], - [1.36025949], - [0.82377518], - [1.25647899], - [0.10570964]], - [[-0.61175641], - [0.26064947], - [-0.11995398], - [0.03956557], - [-0.08511962]]]) + expected_result = np.array([ + [[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964], + ], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962], + ], + ]) fd = euler_maruyama( normal_distribution, @@ -216,7 +239,7 @@ def test_initial_distribution() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 2 Starting n-d distribution + n_samples @@ -230,16 +253,20 @@ def test_initial_distribution() -> None: multivariate_normal(mean, covarianze_matrix).rvs ) - expected_result = np.array([[[1.62434536, 0.38824359], - [2.05704918, -0.76252576], - [2.92945506, -1.14312921], - [3.08897461, -1.2678144], - [3.82002858, -2.29788476]], - [[-0.52817175, -0.07296862], - [-0.68938035, -0.2649958], - [-0.12249563, -0.81494143], - [-0.20870974, -1.25387064], - [-0.18760286, -0.96246304]]]) + expected_result = np.array([ + [[1.62434536, 0.38824359], + [2.05704918, -0.76252576], + [2.92945506, -1.14312921], + [3.08897461, -1.2678144], + [3.82002858, -2.29788476], + ], + [[-0.52817175, -0.07296862], + [-0.68938035, -0.2649958], + [-0.12249563, -0.81494143], + [-0.20870974, -1.25387064], + [-0.18760286, -0.96246304], + ], + ]) fd = euler_maruyama( multivariate_normal_distribution, @@ -250,28 +277,32 @@ def test_initial_distribution() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_vector_initial_points() -> None: - """Caso 4 -> A vector of initial points - Tests: - - initial_condition = 0 - - initial_condition = np.array([0, 1]) - - initial_condition = np.array([[0, 1], - [2, 0]]) + """Caso 4 -> A vector of initial points. + + Tests: + - initial_condition = 0 + - initial_condition = np.array([0, 1]) + - initial_condition = np.array([[0, 1], + [2, 0]]) """ n_grid_points = 5 # Case 1. One Starting 1d point = float initial_float = 0 random_state = np.random.RandomState(1) - expected_result = np.array([[[0.], - [0.81217268], - [0.50629448], - [0.2422086], - [-0.29427571]]]) + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + ]) fd = euler_maruyama( initial_float, @@ -281,23 +312,27 @@ def test_vector_initial_points() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 2. Two Starting 1d points n_grid_points = 5 initial_array = np.array([0, 1]) random_state = np.random.RandomState(1) - expected_result = np.array([[[0.], - [0.81217268], - [0.50629448], - [0.2422086], - [-0.29427571]], - [[1.], - [1.43270381], - [0.28193447], - [1.15434035], - [0.7737369]]]) + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[1], + [1.43270381], + [0.28193447], + [1.15434035], + [0.7737369], + ], + ]) fd = euler_maruyama( initial_array, @@ -307,23 +342,27 @@ def test_vector_initial_points() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # Case 3. Starting vector n_grid_points = 5 initial_array = np.array([[0, 1], [2, 0]]) random_state = np.random.RandomState(1) - expected_result = np.array([[[0., 1.], - [0.81217268, 0.69412179], - [0.54808681, 0.15763748], - [0.98079062, -0.99313187], - [1.8531965, -1.37373532]], - [[2., 0.], - [2.15951955, -0.12468519], - [2.89057352, -1.15475554], - [2.72936491, -1.34678272], - [3.29624964, -1.89672835]]]) + expected_result = np.array([ + [[0, 1], + [0.81217268, 0.69412179], + [0.54808681, 0.15763748], + [0.98079062, -0.99313187], + [1.8531965, -1.37373532], + ], + [[2, 0], + [2.15951955, -0.12468519], + [2.89057352, -1.15475554], + [2.72936491, -1.34678272], + [3.29624964, -1.89672835], + ], + ]) fd = euler_maruyama( initial_array, @@ -333,12 +372,12 @@ def test_vector_initial_points() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_drift_cases() -> None: - """Test for different drift inputs""" + """Test for different drift inputs.""" initial_condition = np.array([0, 0]) n_samples = 2 n_grid_points = 5 @@ -346,26 +385,30 @@ def test_drift_cases() -> None: def base_drift( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> float: return 0 def base_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> float: return 1 - expected_result = np.array([[[0., 0.], - [0.81217268, -0.30587821], - [0.54808681, -0.84236252], - [0.98079062, -1.99313187], - [1.8531965, -2.37373532]], - [[0., 0.], - [0.15951955, -0.12468519], - [0.89057352, -1.15475554], - [0.72936491, -1.34678272], - [1.29624964, -1.89672835]]]) + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) fd = euler_maruyama( initial_condition, @@ -378,7 +421,7 @@ def base_diffusion( np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) initial_condition = np.array([0, 0]) @@ -388,20 +431,24 @@ def base_diffusion( def vector_drift( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array([0, 0]) - expected_result = np.array([[[0., 0.], - [0.81217268, -0.30587821], - [0.54808681, -0.84236252], - [0.98079062, -1.99313187], - [1.8531965, -2.37373532]], - [[0., 0.], - [0.15951955, -0.12468519], - [0.89057352, -1.15475554], - [0.72936491, -1.34678272], - [1.29624964, -1.89672835]]]) + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) fd = euler_maruyama( initial_condition, @@ -414,12 +461,12 @@ def vector_drift( np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_diffusion_cases() -> None: - """Test for different diffusion inputs""" + """Test for different diffusion inputs.""" initial_condition = np.array([0, 0]) n_samples = 2 n_grid_points = 5 @@ -427,28 +474,32 @@ def test_diffusion_cases() -> None: def base_drift( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> float: return 0 def vector_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array([1, 2]) # The result should be the same as the previous test, but with # the second column doubled - expected_result = np.array([[[0., 0.], - [0.81217268, -0.30587821], - [0.54808681, -0.84236252], - [0.98079062, -1.99313187], - [1.8531965, -2.37373532]], - [[0., 0.], - [0.15951955, -0.12468519], - [0.89057352, -1.15475554], - [0.72936491, -1.34678272], - [1.29624964, -1.89672835]]]) + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) expected_result[:, :, 1] *= 2 fd = euler_maruyama( @@ -462,7 +513,7 @@ def vector_diffusion( np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) # The expected result should be the same than in test 2 of case 1 of @@ -476,19 +527,30 @@ def vector_diffusion( def matrix_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array([[1], [2]]) - expected_result = np.array([[[0.], [0.81217268], - [0.50629448], [0.2422086], - [-0.29427571]], - [[0.], [0.43270381], - [-0.71806553], [0.15434035], - [-0.2262631]]]) + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) expected_result = np.concatenate( - (expected_result, 2 * expected_result), - axis=2 + ( + expected_result, + 2 * expected_result, + ), + axis=2, ) fd = euler_maruyama( @@ -498,24 +560,24 @@ def matrix_diffusion( drift=base_drift, diffusion=matrix_diffusion, random_state=random_state, - dim_noise=1 + dim_noise=1, ) np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) def test_grid_points() -> None: - """Test for checking that the grid_points are correct""" + """Test for checking that the grid_points are correct.""" random_state = np.random.RandomState(1) initial_condition = np.array([0, 1]) start = 0 stop = 10 n_grid_points = 105 expected_grid_points = np.atleast_2d( - np.linspace(start, stop, n_grid_points) + np.linspace(start, stop, n_grid_points), ) fd = euler_maruyama( @@ -524,17 +586,17 @@ def test_grid_points() -> None: n_samples=10, start=start, stop=stop, - random_state=random_state + random_state=random_state, ) np.testing.assert_array_equal( fd.grid_points, - expected_grid_points + expected_grid_points, ) def test_initial_condition_negative_cases() -> None: - """Test for checking initial_condition related error cases""" + """Test for checking initial_condition related error cases.""" random_state = np.random.RandomState(1) # Case 1 Starting vector of points and n_samples @@ -577,14 +639,14 @@ def test_initial_condition_negative_cases() -> None: def test_diffusion_negative_cases() -> None: - """Test for checking diffusion related error cases""" + """Test for checking diffusion related error cases.""" initial_condition = np.array([0, 0]) n_samples = 2 random_state = np.random.RandomState(1) def bad_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array([[1, 2]]) @@ -598,12 +660,11 @@ def bad_diffusion( def vector_diffusion( t: float, - x: np.typing.NDArray[np.floating[Any]] + x: np.typing.NDArray[np.floating[Any]], ) -> np.typing.NDArray[np.floating[Any]]: return np.array([1, 2]) initial_condition = np.array([0, 0, 0]) - # random_state = np.random.RandomState(1) with np.testing.assert_raises(ValueError): euler_maruyama( @@ -615,22 +676,27 @@ def vector_diffusion( def test_random_generator() -> None: + """Test using Generator instead of RandomState.""" n_samples = 2 n_grid_points = 5 random_state = np.random.default_rng(seed=1) normal_distribution = norm().rvs - expected_result = np.array([[[0.34558419], - [0.51080273], - [-0.14077589], - [0.31190205], - [0.53508933]], - - [[0.82161814], - [0.55314153], - [0.84370058], - [1.02598678], - [1.17305302]]]) + expected_result = np.array([ + [[0.34558419], + [0.51080273], + [-0.14077589], + [0.31190205], + [0.53508933], + ], + + [[0.82161814], + [0.55314153], + [0.84370058], + [1.02598678], + [1.17305302], + ], + ]) fd = euler_maruyama( normal_distribution, @@ -641,5 +707,5 @@ def test_random_generator() -> None: np.testing.assert_almost_equal( fd.data_matrix, - expected_result + expected_result, ) From 65421b50b2ca63bb3a190d10a59ef12437723457 Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Wed, 17 Apr 2024 16:20:02 +0200 Subject: [PATCH 3/6] Fix error when diffusion depended on x. Fix some errors from comments of pull_request. --- skfda/datasets/_samples_generators.py | 151 ++++++++++++++------------ skfda/tests/test_euler_maruyama.py | 89 ++++++--------- 2 files changed, 111 insertions(+), 129 deletions(-) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index a1f74c055..a8cef282b 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -20,56 +20,60 @@ CovarianceCallable = Callable[[np.ndarray, np.ndarray], np.ndarray] MeanLike = Union[float, NDArrayFloat, MeanCallable] +SDETerm = Callable[[float, NDArrayFloat], NDArrayFloat] class InitialValueGenerator(Protocol): - """Class to represent SDE initial value generators.""" + """Class to represent SDE initial value generators. + + This is intented to be an interface compatible with the rvs method of + SciPy distributions. + """ def __call__( self, size: int, random_state: RandomStateLike, - ) -> np.typing.NDArray[np.floating[Any]]: + ) -> NDArrayFloat: """Interface of initial value generator.""" - ... def euler_maruyama( initial_condition: ArrayLike | InitialValueGenerator, n_grid_points: int = 100, - drift: Callable[[float, np.typing.NDArray[np.floating[Any]]], - np.typing.NDArray[np.floating[Any]]] | None = None, - diffusion: Callable[[float, np.typing.NDArray[np.floating[Any]]], - np.typing.NDArray[np.floating[Any]]] | None = None, + drift: SDETerm | ArrayLike = 0, + diffusion: SDETerm | ArrayLike = 1, n_samples: int | None = None, start: float = 0, stop: float = 1.0, + is_diffusion_matrix: bool = False, dim_noise: int | None = None, random_state: RandomStateLike = None, ) -> FDataGrid: - """Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. .. math:: - d X(t) = f(t, X(t)) d t + g(t,X(t)) d W(t). + d X(t) = f(t, X(t)) d t + g(t, X(t)) d W(t). Args: - initial_condition: Initial condition of the SDE. It can have one of + initial_condition: initial condition of the SDE. It can have one of three formats: An starting initial value from which to calculate *n_samples* trajectories. An array of initial values. For each starting point a trajectory will be calculated. A function that generates random numbers or vectors. It should have two parameters called size and random_state and it should return an array. - n_grid_points: The total number of points of evaluation. + n_grid_points: the total number of points of evaluation. drift: drift coefficient (:math:`f(t,X_t)` in the equation). diffusion: diffusion coefficient (:math:`g(t,X_t)` in the equation). - n_samples: Number of trajectories integrated. - start: Starting time of the trajectories. - stop: Ending time of the trajectories. + n_samples: number of trajectories integrated. + start: starting time of the trajectories. + stop: ending time of the trajectories. + is_diffusion_matrix: True if the diffusion coefficient is a matrix. dim_noise: dimension of the noise factor. By default is the data dimension. - random_state: Random state. + random_state: random state. Returns: @@ -80,22 +84,26 @@ def euler_maruyama( Gaussian processes. Examples: - Example of the use of euler_maruyama for an SDE with constant - drift and diffusion and an initial condition which follows a - standard normal distribution. + Example of the use of euler_maruyama for an Ornstein-Uhlenbeck process + that has the equation: + + .. math: + + dX(t) = -A(X(t) - \mu)dt + BdW(t) >>> from scipy.stats import norm - >>> def example_drift(t: float, x: np.ndarray) -> np.ndarray: - ... return 2 - >>> def example_diffusion(t: float, x: np.ndarray) -> np.ndarray: - ... return 0.5 + >>> A = 1 + >>> mu = 3 + >>> B = 0.5 + >>> def ou_drift(t: float, x: np.ndarray) -> np.ndarray: + ... return -A * (x - mu) >>> initial_condition = norm().rvs >>> >>> trajectories = euler_maruyama( ... initial_condition=initial_condition, ... n_samples=10, - ... drift=example_drift, - ... diffusion=example_diffusion, + ... drift=ou_drift, + ... diffusion=B, ... ) """ @@ -133,55 +141,57 @@ def euler_maruyama( ) (n_samples, dim_codomain) = initial_values.shape - if dim_noise is None: - dim_noise = dim_codomain - - def default_drift( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.array(0) - - def default_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.eye(dim_codomain, dim_noise) - - if drift is None: - drift = default_drift - - if diffusion is None: - diffusion = default_diffusion - - test_diffusion_dim = np.atleast_1d(diffusion(start, initial_values)) - - if test_diffusion_dim.ndim == 1: - if len(test_diffusion_dim) not in {dim_codomain, 1}: - raise ValueError( - f"The diffusion function returns the wrong dimensions." - f"The expected dimension is {dim_codomain} or 1.", - ) - formatting_matrix = np.eye(dim_codomain, dim_noise) + if callable(drift): + formatted_drift = drift + else: + def constant_drift( + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(drift) - elif test_diffusion_dim.shape[-2:] == (dim_codomain, dim_noise): - formatting_matrix = np.ones((dim_codomain, dim_noise)) + formatted_drift = constant_drift + if callable(diffusion): + formatted_diffusion = diffusion else: - raise ValueError( - f"The diffusion function returns the wrong dimensions. " - f"The expected dimensiones are {dim_codomain} x {dim_noise}.", + def constant_diffusion( + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(diffusion) + + formatted_diffusion = constant_diffusion + + def vector_diffusion_times_noise( + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> NDArrayFloat: + return formatted_diffusion(t_n, x_n) * noise + + def matrix_diffusion_times_noise( + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> Any: + return np.einsum( + '...dj, ...j -> ...d', + formatted_diffusion(t_n, x_n), + noise, ) - def formatted_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return diffusion(t, x) * formatting_matrix + if dim_noise is None: + dim_noise = dim_codomain + + if is_diffusion_matrix: + diffusion_times_noise = matrix_diffusion_times_noise + else: + diffusion_times_noise = vector_diffusion_times_noise data_matrix = np.zeros((n_samples, n_grid_points, dim_codomain)) times = np.linspace(start, stop, n_grid_points) - step_size = times[1] - times[0] + delta_t = times[1:] - times[:-1] noise = random_state.standard_normal( size=(n_samples, n_grid_points - 1, dim_noise), ) @@ -193,12 +203,9 @@ def formatted_diffusion( data_matrix[:, n + 1] = ( x_n - + step_size * drift(t_n, x_n) - + np.einsum( - '...dj, ...j -> ...d', - formatted_diffusion(t_n, x_n), - noise[:, n], - ) * np.sqrt(step_size), + + delta_t[n] * formatted_drift(t_n, x_n) + + diffusion_times_noise(t_n, x_n, noise[:, n]) + * np.sqrt(delta_t[n]) ) return FDataGrid( @@ -667,7 +674,7 @@ def make_random_warping( np.square(v, out=v) # Creation of FDataGrid in the corresponding domain - data_matrix = scipy.integrate.cumtrapz( + data_matrix = scipy.integrate.cumulative_trapezoid( v, dx=1 / n_features, initial=0, diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py index 439554e41..c23f8a854 100644 --- a/skfda/tests/test_euler_maruyama.py +++ b/skfda/tests/test_euler_maruyama.py @@ -1,11 +1,10 @@ """Tests of Euler Maruyama.""" -from typing import Any - import numpy as np from scipy.stats import multivariate_normal, norm from skfda.datasets import euler_maruyama +from skfda.typing._numpy import NDArrayFloat def test_one_initial_point() -> None: @@ -15,7 +14,6 @@ def test_one_initial_point() -> None: - initial_condition = 0, n_samples = 2 - initial_condition = [0], n_samples = 2. # The result should be the same as the first one - - initial_condition = np.array([0, 0]), n_samples = 2. """ n_samples = 2 @@ -70,25 +68,29 @@ def test_one_initial_point() -> None: expected_result, ) - # Case 3 Starting point = array + n_samples +def test_one_initial_point_multidimensional() -> None: + """Case 1.3 Starting point = array + n_samples. + + initial_condition = np.array([1, 0]), n_samples = 2. + """ n_samples = 2 n_grid_points = 5 random_state = np.random.RandomState(1) - initial_array = np.array([0, 0]) + initial_array = np.array([1, 0]) expected_result = np.array([ - [[0, 0], - [0.81217268, -0.30587821], - [0.54808681, -0.84236252], - [0.98079062, -1.99313187], - [1.8531965, -2.37373532], + [[1, 0], + [1.81217268, -0.30587821], + [1.54808681, -0.84236252], + [1.98079062, -1.99313187], + [2.8531965, -2.37373532], ], - [[0, 0], - [0.15951955, -0.12468519], - [0.89057352, -1.15475554], - [0.72936491, -1.34678272], - [1.29624964, -1.89672835], + [[1, 0], + [1.15951955, -0.12468519], + [1.89057352, -1.15475554], + [1.72936491, -1.34678272], + [2.29624964, -1.89672835], ], ]) @@ -126,7 +128,7 @@ def test_initial_data_generator() -> None: def standard_normal_generator( size: int, random_state: np.random.RandomState, - ) -> np.typing.NDArray[np.floating[Any]]: + ) -> NDArrayFloat: return ( random_state.standard_normal(size) ) @@ -166,7 +168,7 @@ def standard_normal_generator( def standard_normal_generator_2d( size: int, random_state: np.random.RandomState, - ) -> np.typing.NDArray[np.floating[Any]]: + ) -> NDArrayFloat: return random_state.standard_normal((size, 2)) expected_result = np.array([ @@ -385,16 +387,10 @@ def test_drift_cases() -> None: def base_drift( t: float, - x: np.typing.NDArray[np.floating[Any]], + x: NDArrayFloat, ) -> float: return 0 - def base_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> float: - return 1 - expected_result = np.array([ [[0, 0], [0.81217268, -0.30587821], @@ -415,7 +411,7 @@ def base_diffusion( n_grid_points=n_grid_points, n_samples=n_samples, drift=base_drift, - diffusion=base_diffusion, + diffusion=1, random_state=random_state, ) @@ -431,8 +427,8 @@ def base_diffusion( def vector_drift( t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: + x: NDArrayFloat, + ) -> NDArrayFloat: return np.array([0, 0]) expected_result = np.array([ @@ -455,7 +451,7 @@ def vector_drift( n_grid_points=n_grid_points, n_samples=n_samples, drift=vector_drift, - diffusion=base_diffusion, + diffusion=1, random_state=random_state, ) @@ -472,17 +468,7 @@ def test_diffusion_cases() -> None: n_grid_points = 5 random_state = np.random.RandomState(1) - def base_drift( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> float: - return 0 - - def vector_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.array([1, 2]) + vector_diffusion = np.array([1, 2]) # The result should be the same as the previous test, but with # the second column doubled @@ -506,7 +492,7 @@ def vector_diffusion( initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, - drift=base_drift, + drift=0, diffusion=vector_diffusion, random_state=random_state, ) @@ -525,11 +511,7 @@ def vector_diffusion( n_grid_points = 5 random_state = np.random.RandomState(1) - def matrix_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.array([[1], [2]]) + matrix_diffusion = np.array([[1], [2]]) expected_result = np.array([ [[0], @@ -557,9 +539,10 @@ def matrix_diffusion( initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, - drift=base_drift, + drift=0, diffusion=matrix_diffusion, random_state=random_state, + is_diffusion_matrix=True, dim_noise=1, ) @@ -620,7 +603,7 @@ def test_initial_condition_negative_cases() -> None: ) # Case 3: generator function without n_samples - initial_generator = random_state.standard_normal + initial_generator = norm().rvs with np.testing.assert_raises(ValueError): euler_maruyama( @@ -644,11 +627,7 @@ def test_diffusion_negative_cases() -> None: n_samples = 2 random_state = np.random.RandomState(1) - def bad_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.array([[1, 2]]) + bad_diffusion = np.array([[1, 2, 3]]) with np.testing.assert_raises(ValueError): euler_maruyama( @@ -658,11 +637,7 @@ def bad_diffusion( random_state=random_state, ) - def vector_diffusion( - t: float, - x: np.typing.NDArray[np.floating[Any]], - ) -> np.typing.NDArray[np.floating[Any]]: - return np.array([1, 2]) + vector_diffusion = np.array([1, 2]) initial_condition = np.array([0, 0, 0]) From 528cd0dd2af03cfa22ad73e66cad943bd928c72f Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Sun, 5 May 2024 00:03:03 +0200 Subject: [PATCH 4/6] more documentation, and finishing to fix issues with diffusion term. Included diffusion_matricial_term and minor errors --- skfda/datasets/_samples_generators.py | 103 ++++++++++++++++++-------- skfda/tests/test_euler_maruyama.py | 54 ++++++++------ 2 files changed, 105 insertions(+), 52 deletions(-) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index a8cef282b..a262c9aea 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -38,42 +38,71 @@ def __call__( """Interface of initial value generator.""" -def euler_maruyama( +def euler_maruyama( # noqa: WPS210 initial_condition: ArrayLike | InitialValueGenerator, n_grid_points: int = 100, - drift: SDETerm | ArrayLike = 0, - diffusion: SDETerm | ArrayLike = 1, + drift: SDETerm | ArrayLike | None = None, + diffusion: SDETerm | ArrayLike | None = None, n_samples: int | None = None, - start: float = 0, + start: float = 0.0, # noqa: WPS358 -- Distinguish float from integer stop: float = 1.0, - is_diffusion_matrix: bool = False, + diffusion_matricial_term: bool = True, dim_noise: int | None = None, random_state: RandomStateLike = None, ) -> FDataGrid: r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + An SDE can be expressed with the following formula + + .. math:: + + d\mathbf{X}(t) = \mathbf{F}(t, \mathbf{X}(t))dt + \mathbf{G}(t, + \mathbf{X}(t))d\mathbf{W}(t). + + In this equation, :math:`\mathbf{X} = (X^{(1)}, X^{(2)}, ... , X^{(n)}) + \in \mathbb{R}^q` is a vector that represents the state of the stochastic + process. The function :math:`\mathbf{F}(t, \mathbf{X}) = (F^{(1)}(t, + \mathbf{X}), ..., F^{(q)}(t, \mathbf{X}))` is called drift and refers + to the deterministic component of the equation. The function + :math:`\mathbf{G} (t, \mathbf{X}) = (G^{i, j}(t, \mathbf{X}))_{i=1, j=1} + ^{q, m}` is denoted as the diffusion term and refers to the stochastic + component of the evolution. :math:`\mathbf{W}(t)` refers to a Wiener + process (Standard Brownian motion) of dimension :math:`m`. Finally, + :math:`q` refers to the dimension of the variable :math:`\mathbf{X}` + (dimension of the codomain) and :math:`m` to the dimension of the noise. + + Euler-Maruyama's method computes the approximated solution using the + Markov chain + .. math:: - d X(t) = f(t, X(t)) d t + g(t, X(t)) d W(t). + X_{n + 1}^{(i)} = X_n^{(i)} + F^{(i)}(t_n, \mathbf{X}_n)\Delta t_n + + \sum_{j=1}^m G^{i,j}(t_n, \mathbf{X}_n)\sqrt{\Delta t_n}\Delta Z_n^j, + + where :math:`X_n^{(i)}` is the approximated value of :math:`X^{(i)}(t_n)` + and the :math:`\mathbf{Z}_m` are independent, identically distributed + :math:`m`-dimensional standard normal random variables. Args: - initial_condition: initial condition of the SDE. It can have one of + initial_condition: Initial condition of the SDE. It can have one of three formats: An starting initial value from which to calculate *n_samples* trajectories. An array of initial values. For each starting point a trajectory will be calculated. A function that generates random numbers or vectors. It should have two parameters called size and random_state and it should return an array. - n_grid_points: the total number of points of evaluation. - drift: drift coefficient (:math:`f(t,X_t)` in the equation). - diffusion: diffusion coefficient (:math:`g(t,X_t)` in the equation). - n_samples: number of trajectories integrated. - start: starting time of the trajectories. - stop: ending time of the trajectories. - is_diffusion_matrix: True if the diffusion coefficient is a matrix. - dim_noise: dimension of the noise factor. By default is the data + n_grid_points: The total number of points of evaluation. + drift: Drift coefficient (:math:`F(t,\mathbf{X})` in the equation). + diffusion: Diffusion coefficient (:math:`G(t,\mathbf{X})` in the + equation). + n_samples: Number of trajectories integrated. + start: Starting time of the trajectories. + stop: Ending time of the trajectories. + diffusion_matricial_term: True if the diffusion coefficient is a + matrix. + dim_noise: Dimension of the noise factor. By default is the data dimension. - random_state: random state. + random_state: Random state. Returns: @@ -141,50 +170,62 @@ def euler_maruyama( ) (n_samples, dim_codomain) = initial_values.shape + if dim_codomain == 1: + diffusion_matricial_term = False + + if drift is None: + drift = 0.0 # noqa: WPS358 -- Distinguish float from integer + if callable(drift): - formatted_drift = drift + drift_function = drift else: - def constant_drift( + def constant_drift( # noqa: WPS430 -- We need internal functions t: float, x: NDArrayFloat, ) -> NDArrayFloat: return np.atleast_1d(drift) - formatted_drift = constant_drift + drift_function = constant_drift + + if diffusion is None: + if diffusion_matricial_term: + diffusion = np.eye(dim_codomain) + else: + diffusion = 1.0 if callable(diffusion): - formatted_diffusion = diffusion + diffusion_function = diffusion else: - def constant_diffusion( + def constant_diffusion( # noqa: WPS430 -- We need internal functions t: float, x: NDArrayFloat, ) -> NDArrayFloat: return np.atleast_1d(diffusion) - formatted_diffusion = constant_diffusion + diffusion_function = constant_diffusion - def vector_diffusion_times_noise( + def vector_diffusion_times_noise( # noqa: WPS430 We need internal functons t_n: float, x_n: NDArrayFloat, noise: NDArrayFloat, ) -> NDArrayFloat: - return formatted_diffusion(t_n, x_n) * noise + return diffusion_function(t_n, x_n) * noise - def matrix_diffusion_times_noise( + def matrix_diffusion_times_noise( # noqa: WPS430 We need internal functons t_n: float, x_n: NDArrayFloat, noise: NDArrayFloat, ) -> Any: return np.einsum( '...dj, ...j -> ...d', - formatted_diffusion(t_n, x_n), + diffusion_function(t_n, x_n), noise, ) if dim_noise is None: dim_noise = dim_codomain - if is_diffusion_matrix: + if diffusion_matricial_term: diffusion_times_noise = matrix_diffusion_times_noise else: diffusion_times_noise = vector_diffusion_times_noise @@ -203,7 +244,7 @@ def matrix_diffusion_times_noise( data_matrix[:, n + 1] = ( x_n - + delta_t[n] * formatted_drift(t_n, x_n) + + delta_t[n] * drift_function(t_n, x_n) + diffusion_times_noise(t_n, x_n, noise[:, n]) * np.sqrt(delta_t[n]) ) @@ -341,7 +382,7 @@ def make_gaussian_process( ) -def make_sinusoidal_process( +def make_sinusoidal_process( # noqa: WPS211 n_samples: int = 15, n_features: int = 100, *, @@ -466,7 +507,7 @@ def make_multimodal_landmarks( return modes_location + variation -def make_multimodal_samples( +def make_multimodal_samples( # noqa: WPS211 n_samples: int = 15, *, n_modes: int = 1, @@ -564,7 +605,7 @@ def make_multimodal_samples( # Covariance matrix of the samples cov = mode_std * np.eye(dim_domain) - for i, j, k in itertools.product( + for i, j, k in itertools.product( # noqa: WPS440 range(n_samples), range(dim_codomain), range(n_modes), diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py index c23f8a854..42f341204 100644 --- a/skfda/tests/test_euler_maruyama.py +++ b/skfda/tests/test_euler_maruyama.py @@ -10,16 +10,12 @@ def test_one_initial_point() -> None: """Case 1 -> One initial point + n_samples > 0. - Tests: - - initial_condition = 0, n_samples = 2 - - initial_condition = [0], n_samples = 2. - # The result should be the same as the first one + 1.1: initial_condition = 0, n_samples = 2 """ n_samples = 2 n_grid_points = 5 - random_state = np.random.RandomState(1) - # Case 1 Starting point = float + n_samples + random_state = np.random.RandomState(1) # noqa: WPS204 -- reproducibility initial_float = 0 expected_result = np.array([[ @@ -49,13 +45,33 @@ def test_one_initial_point() -> None: expected_result, ) - # Case 2 Starting point = float in an array format + n_samples - # The result must be the same as in Case 1 + +def test_one_initial_point_monodimensional() -> None: + """Case 1.2 Starting point = float in an array format + n_samples. + + initial_condition = [0], n_samples = 2. + The result should be the same as 1.1 + """ n_samples = 2 n_grid_points = 5 random_state = np.random.RandomState(1) initial_float_in_list = [0] + expected_result = np.array([[ + [0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) + fd = euler_maruyama( initial_float_in_list, n_samples=n_samples, @@ -125,7 +141,7 @@ def test_initial_data_generator() -> None: # Case 1 Starting generator of floats + n_samples random_state = np.random.RandomState(1) - def standard_normal_generator( + def standard_normal_generator( # noqa: WPS430 size: int, random_state: np.random.RandomState, ) -> NDArrayFloat: @@ -165,7 +181,7 @@ def standard_normal_generator( n_grid_points = 5 random_state = np.random.RandomState(1) - def standard_normal_generator_2d( + def standard_normal_generator_2d( # noqa: WPS430 size: int, random_state: np.random.RandomState, ) -> NDArrayFloat: @@ -380,12 +396,12 @@ def test_vector_initial_points() -> None: def test_drift_cases() -> None: """Test for different drift inputs.""" - initial_condition = np.array([0, 0]) + initial_condition = np.array([0, 0]) # noqa: WPS204 n_samples = 2 n_grid_points = 5 random_state = np.random.RandomState(1) - def base_drift( + def base_drift( # noqa: WPS430 t: float, x: NDArrayFloat, ) -> float: @@ -411,7 +427,6 @@ def base_drift( n_grid_points=n_grid_points, n_samples=n_samples, drift=base_drift, - diffusion=1, random_state=random_state, ) @@ -425,7 +440,7 @@ def base_drift( n_grid_points = 5 random_state = np.random.RandomState(1) - def vector_drift( + def vector_drift( # noqa: WPS430 t: float, x: NDArrayFloat, ) -> NDArrayFloat: @@ -451,7 +466,6 @@ def vector_drift( n_grid_points=n_grid_points, n_samples=n_samples, drift=vector_drift, - diffusion=1, random_state=random_state, ) @@ -494,6 +508,7 @@ def test_diffusion_cases() -> None: n_samples=n_samples, drift=0, diffusion=vector_diffusion, + diffusion_matricial_term=False, random_state=random_state, ) @@ -527,11 +542,9 @@ def test_diffusion_cases() -> None: [-0.2262631], ], ]) - expected_result = np.concatenate( - ( - expected_result, - 2 * expected_result, - ), + + expected_result = np.concatenate( # noqa: WPS317 + (expected_result, 2 * expected_result), axis=2, ) @@ -542,7 +555,6 @@ def test_diffusion_cases() -> None: drift=0, diffusion=matrix_diffusion, random_state=random_state, - is_diffusion_matrix=True, dim_noise=1, ) From 3ca5085ac539d335c09c31ee108cc85738ca0844 Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Sun, 9 Jun 2024 20:19:16 +0200 Subject: [PATCH 5/6] eliminate dim_noise parameter --- skfda/datasets/_samples_generators.py | 5 ++--- skfda/tests/test_euler_maruyama.py | 11 ----------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index a262c9aea..8bdb6913a 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -47,7 +47,6 @@ def euler_maruyama( # noqa: WPS210 start: float = 0.0, # noqa: WPS358 -- Distinguish float from integer stop: float = 1.0, diffusion_matricial_term: bool = True, - dim_noise: int | None = None, random_state: RandomStateLike = None, ) -> FDataGrid: r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. @@ -222,11 +221,11 @@ def matrix_diffusion_times_noise( # noqa: WPS430 We need internal functons noise, ) - if dim_noise is None: - dim_noise = dim_codomain + dim_noise = dim_codomain if diffusion_matricial_term: diffusion_times_noise = matrix_diffusion_times_noise + dim_noise = diffusion_function(start, initial_values).shape[-1] else: diffusion_times_noise = vector_diffusion_times_noise diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py index 42f341204..e6684add2 100644 --- a/skfda/tests/test_euler_maruyama.py +++ b/skfda/tests/test_euler_maruyama.py @@ -555,7 +555,6 @@ def test_diffusion_cases() -> None: drift=0, diffusion=matrix_diffusion, random_state=random_state, - dim_noise=1, ) np.testing.assert_almost_equal( @@ -639,16 +638,6 @@ def test_diffusion_negative_cases() -> None: n_samples = 2 random_state = np.random.RandomState(1) - bad_diffusion = np.array([[1, 2, 3]]) - - with np.testing.assert_raises(ValueError): - euler_maruyama( - initial_condition, - n_samples=n_samples, - diffusion=bad_diffusion, - random_state=random_state, - ) - vector_diffusion = np.array([1, 2]) initial_condition = np.array([0, 0, 0]) From d8a3dc0b1b0f6a64ed8003fb3edd82acd9fae35e Mon Sep 17 00:00:00 2001 From: Pablo Soto Date: Sun, 9 Jun 2024 20:23:18 +0200 Subject: [PATCH 6/6] fixed warning dim_noise --- skfda/datasets/_samples_generators.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index 8bdb6913a..0ae0e6490 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -99,8 +99,6 @@ def euler_maruyama( # noqa: WPS210 stop: Ending time of the trajectories. diffusion_matricial_term: True if the diffusion coefficient is a matrix. - dim_noise: Dimension of the noise factor. By default is the data - dimension. random_state: Random state.