From 65ffe273b0e2b3f28cb2574434e420644446cff6 Mon Sep 17 00:00:00 2001 From: Francesco Serafini Date: Tue, 15 Oct 2024 11:15:32 +0100 Subject: [PATCH] Correction to M-test with resampling, and addition of new MLL test --- csep/core/catalog_evaluations.py | 350 ++++++++++++++++++++++++++++++- csep/utils/stats.py | 37 ++++ tests/test_magnitude_tests.py | 38 ++++ 3 files changed, 423 insertions(+), 2 deletions(-) create mode 100644 tests/test_magnitude_tests.py diff --git a/csep/core/catalog_evaluations.py b/csep/core/catalog_evaluations.py index 214ceca0..296cb3d6 100644 --- a/csep/core/catalog_evaluations.py +++ b/csep/core/catalog_evaluations.py @@ -1,11 +1,14 @@ -# Third-Party Imports +# Python imports import time +from typing import Optional, TYPE_CHECKING +# Third-Party Imports import numpy import scipy.stats # PyCSEP imports from csep.core.exceptions import CSEPEvaluationException +from csep.core.catalogs import CSEPCatalog from csep.models import ( CatalogNumberTestResult, CatalogSpatialTestResult, @@ -14,7 +17,10 @@ CalibrationTestResult ) from csep.utils.calc import _compute_likelihood -from csep.utils.stats import get_quantiles, cumulative_square_diff +from csep.utils.stats import get_quantiles, cumulative_square_diff, MLL_score + +if TYPE_CHECKING: + from csep.core.forecasts import CatalogForecast def number_test(forecast, observed_catalog, verbose=True): @@ -55,6 +61,7 @@ def number_test(forecast, observed_catalog, verbose=True): obs_name=observed_catalog.name) return result + def spatial_test(forecast, observed_catalog, verbose=True): """ Performs spatial test for catalog-based forecasts. @@ -140,6 +147,7 @@ def spatial_test(forecast, observed_catalog, verbose=True): return result + def magnitude_test(forecast, observed_catalog, verbose=True): """ Performs magnitude test for catalog-based forecasts """ test_distribution = [] @@ -215,6 +223,7 @@ def magnitude_test(forecast, observed_catalog, verbose=True): return result + def pseudolikelihood_test(forecast, observed_catalog, verbose=True): """ Performs the spatial pseudolikelihood test for catalog forecasts. @@ -310,6 +319,7 @@ def pseudolikelihood_test(forecast, observed_catalog, verbose=True): return result + def calibration_test(evaluation_results, delta_1=False): """ Perform the calibration test by computing a Kilmogorov-Smirnov test of the observed quantiles against a uniform distribution. @@ -345,3 +355,339 @@ def calibration_test(evaluation_results, delta_1=False): return result +def resampled_magnitude_test(forecast: "CatalogForecast", + observed_catalog: CSEPCatalog, + mag_half_bin: float = 0.05, + verbose: bool = False, + seed: Optional[int] = None) -> CatalogMagnitudeTestResult: + """ + Performs the resampled magnitude test for catalog-based forecasts (Serafini et al., 2024), + which corrects the bias from the original M-test implementation to the total N of events. + Calculates the pseudo log-likelihood distribution from the synthetic catalog Λ_j as: + + D_j = Σ_k [log(Λ_U(k) / N_U + 1) - log(Λ_j(k) / N_j + 1)]^2 + + where k are the magnitude bins, Λ_U the union of all synthetic catalogs, N_U the total + number of events in Λ_U, and N_j the number of events in Λ_j + + The pseudo log-likelihood score from the observations is calculated as: + + D_o = Σ_k [log(Λ_U(k) / N_U + 1) - log(Ω(k) / N_o + 1)]^2 + + where Ω is the observed catalog and N_o the total number of observed events. + + Args: + forecast (CatalogForecast): The forecast to be evaluated + observed_catalog (CSEPCatalog): The observation/testing catalog. + mag_half_bin (float): half of the magnitude bin size + verbose (bool): Flag to display debug messages + seed (int): random number seed generator + + Returns: + A CatalogMagnitudeTestResult object containing the statistic distribution and the + observed statistic. + """ + + # set seed + if seed: + numpy.random.seed(seed) + """ """ + test_distribution = [] + + if forecast.region.magnitudes is None: + raise CSEPEvaluationException( + "Forecast must have region.magnitudes member to perform magnitude test.") + + # short-circuit if zero events + if observed_catalog.event_count == 0: + print("Cannot perform magnitude test when observed event count is zero.") + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=None, + quantile=(None, None), + status='not-valid', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result + + # compute expected rates for forecast if needed + if forecast.expected_rates is None: + forecast.get_expected_rates(verbose=verbose) + + # THIS IS NEW - returns the average events in the magnitude bins + union_histogram = numpy.zeros(len(forecast.magnitudes)) + for j, cat in enumerate(forecast): + union_histogram += cat.magnitude_counts() + # end new + n_union_events = numpy.sum(union_histogram) + obs_histogram = observed_catalog.magnitude_counts() + n_obs = numpy.sum(obs_histogram) + union_scale = n_obs / n_union_events + scaled_union_histogram = union_histogram * union_scale + + # this is new - prob to be used for resampling + probs = union_histogram / n_union_events + # end new + + # compute the test statistic for each catalog + t0 = time.time() + for i, catalog in enumerate(forecast): + # THIS IS NEW - sampled from the union forecast histogram + mag_values = numpy.random.choice(forecast.magnitudes + mag_half_bin, p=probs, + size=int(n_obs)) + extended_mag_max = max(forecast.magnitudes) + 10 + mag_counts, tmp = numpy.histogram(mag_values, bins=numpy.append(forecast.magnitudes, + extended_mag_max)) + # end new + n_events = numpy.sum(mag_counts) + if n_events == 0: + # print("Skipping to next because catalog contained zero events.") + continue + scale = n_obs / n_events + catalog_histogram = mag_counts * scale + # compute magnitude test statistic for the catalog + test_distribution.append( + cumulative_square_diff(numpy.log10(catalog_histogram + 1), + numpy.log10(scaled_union_histogram + 1)) + ) + # output status + if verbose: + tens_exp = numpy.floor(numpy.log10(i + 1)) + if (i + 1) % 10 ** tens_exp == 0: + t1 = time.time() + print(f'Processed {i + 1} catalogs in {t1 - t0} seconds', flush=True) + + # compute observed statistic + obs_d_statistic = cumulative_square_diff(numpy.log10(obs_histogram + 1), + numpy.log10(scaled_union_histogram + 1)) + + # score evaluation + delta_1, delta_2 = get_quantiles(test_distribution, obs_d_statistic) + + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=obs_d_statistic, + quantile=(delta_1, delta_2), + status='normal', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result + + +def MLL_magnitude_test(forecast: "CatalogForecast", + observed_catalog: CSEPCatalog, + verbose: bool = False, + seed: Optional[int] = None) -> CatalogMagnitudeTestResult: + """ + + Args: + forecast: + observed_catalog: + verbose: + seed: + + Returns: + + """ + + if seed: + numpy.random.seed(seed) + + test_distribution = [] + + if forecast.region.magnitudes is None: + raise CSEPEvaluationException( + "Forecast must have region.magnitudes member to perform magnitude test.") + + # short-circuit if zero events + if observed_catalog.event_count == 0: + print("Cannot perform magnitude test when observed event count is zero.") + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=None, + quantile=(None, None), + status='not-valid', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result + + # compute expected rates for forecast if needed + if forecast.expected_rates is None: + forecast.get_expected_rates(verbose=verbose) + + # calculate histograms of union forecast and total number of events + Lambda_u_histogram = numpy.zeros(len(forecast.magnitudes)) + Lambda_u = [] + for j, cat in enumerate(forecast): + Lambda_u = numpy.append(Lambda_u, cat.get_magnitudes()) + Lambda_u_histogram += cat.magnitude_counts() + # # calculate histograms of observations and observed number of events + Omega_histogram = observed_catalog.magnitude_counts() + n_obs = numpy.sum(Omega_histogram) + + # compute observed statistic + obs_d_statistic = MLL_score(Lambda_U_counts=Lambda_u_histogram, + Lambda_j_counts=Omega_histogram) + + # compute the test statistic for each catalog + t0 = time.time() + for i, catalog in enumerate(forecast): + # this is new - sampled from the union forecast histogram + mag_values = numpy.random.choice(Lambda_u, size=int(n_obs)) + extended_mag_max = max(forecast.magnitudes) + 10 + Lambda_j_histogram, tmp = numpy.histogram(mag_values, + bins=numpy.append(forecast.magnitudes, + extended_mag_max)) + # end new + n_events = numpy.sum(Lambda_j_histogram) + # if n_events == 0: + # print("Skipping to next because catalog contained zero events.") + # continue + + # compute magnitude test statistic for the catalog + test_distribution.append( + MLL_score(Lambda_U_counts=Lambda_u_histogram, + Lambda_j_counts=Lambda_j_histogram) + ) + # output status + if verbose: + tens_exp = numpy.floor(numpy.log10(i + 1)) + if (i + 1) % 10 ** tens_exp == 0: + t1 = time.time() + print(f'Processed {i + 1} catalogs in {t1 - t0} seconds', flush=True) + + # score evaluation + delta_1, delta_2 = get_quantiles(test_distribution, obs_d_statistic) + + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=obs_d_statistic, + quantile=(delta_1, delta_2), + status='normal', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result + + +def MLL_magnitude_test_light(forecast: "CatalogForecast", + observed_catalog: CSEPCatalog, + mag_half_bin: float = 0.05, + verbose: bool = False, + seed: Optional[int] = None) -> CatalogMagnitudeTestResult: + """ + + Args: + forecast: + observed_catalog: + mag_half_bin: + verbose: + seed: + + Returns: + + """ + + # set seed + if seed: + numpy.random.seed(seed) + """ Performs magnitude test for catalog-based forecasts """ + test_distribution = [] + + if forecast.region.magnitudes is None: + raise CSEPEvaluationException( + "Forecast must have region.magnitudes member to perform magnitude test.") + + # short-circuit if zero events + if observed_catalog.event_count == 0: + print("Cannot perform magnitude test when observed event count is zero.") + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=None, + quantile=(None, None), + status='not-valid', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result + + # compute expected rates for forecast if needed + if forecast.expected_rates is None: + forecast.get_expected_rates(verbose=verbose) + + # calculate histograms of union forecast and total number of events + Lambda_u_histogram = numpy.zeros(len(forecast.magnitudes)) + for j, cat in enumerate(forecast): + Lambda_u_histogram += cat.magnitude_counts() + + # # calculate histograms of observations and observed number of events + Omega_histogram = observed_catalog.magnitude_counts() + n_obs = numpy.sum(Omega_histogram) + + # compute observed statistic + obs_d_statistic = MLL_score(Lambda_U_counts=Lambda_u_histogram, + Lambda_j_counts=Omega_histogram) + + probs = Lambda_u_histogram / numpy.sum(Lambda_u_histogram) + + # compute the test statistic for each catalog + t0 = time.time() + for i, catalog in enumerate(forecast): + # this is new - sampled from the union forecast histogram + mag_values = numpy.random.choice(forecast.magnitudes + mag_half_bin, p=probs, + size=int(n_obs)) + extended_mag_max = max(forecast.magnitudes) + 10 + Lambda_j_histogram, tmp = numpy.histogram(mag_values, + bins=numpy.append(forecast.magnitudes, + extended_mag_max)) + # end new + n_events = numpy.sum(Lambda_j_histogram) + # if n_events == 0: + # print("Skipping to next because catalog contained zero events.") + # continue + + # compute magnitude test statistic for the catalog + test_distribution.append( + MLL_score(Lambda_U_counts=Lambda_u_histogram, + Lambda_j_counts=Lambda_j_histogram) + ) + # output status + if verbose: + tens_exp = numpy.floor(numpy.log10(i + 1)) + if (i + 1) % 10 ** tens_exp == 0: + t1 = time.time() + print(f'Processed {i + 1} catalogs in {t1 - t0} seconds', flush=True) + + # score evaluation + delta_1, delta_2 = get_quantiles(test_distribution, obs_d_statistic) + + # prepare result + result = CatalogMagnitudeTestResult(test_distribution=test_distribution, + name='M-Test', + observed_statistic=obs_d_statistic, + quantile=(delta_1, delta_2), + status='normal', + min_mw=forecast.min_magnitude, + obs_catalog_repr=str(observed_catalog), + obs_name=observed_catalog.name, + sim_name=forecast.name) + + return result \ No newline at end of file diff --git a/csep/utils/stats.py b/csep/utils/stats.py index 3550b0de..ab3d5170 100644 --- a/csep/utils/stats.py +++ b/csep/utils/stats.py @@ -1,3 +1,5 @@ +from typing import Sequence + import numpy import scipy.stats import scipy.special @@ -269,3 +271,38 @@ def get_Kagan_I1_score(forecasts, catalog): I_1[j] = numpy.dot(counts[non_zero_idx], numpy.log2(rate_den[non_zero_idx] / uniform_forecast)) / n_event return I_1 + + +def log_d_multinomial(x: numpy.ndarray, size: int, prob: numpy.ndarray): + + return scipy.special.loggamma(size + 1) + numpy.sum( + x * numpy.log(prob) - scipy.special.loggamma(x + 1)) + + +def MLL_score(Lambda_U_counts: numpy.ndarray, Lambda_j_counts: numpy.ndarray): + """ + + Args: + Lambda_U_counts: + Lambda_j_counts: + + Returns: + + """ + N_u = numpy.sum(Lambda_U_counts) + N_j = numpy.sum(Lambda_j_counts) + coef_ = N_u / N_j + Lambda_U_mod = Lambda_U_counts + coef_ + Lambda_j_mod = Lambda_j_counts + 1 + Lambda_merged = Lambda_U_mod + Lambda_j_mod + + pr_merged = Lambda_merged / numpy.sum(Lambda_merged) + pr_U = Lambda_U_mod / numpy.sum(Lambda_U_mod) + pr_j = Lambda_j_mod / numpy.sum(Lambda_j_mod) + + loglik_merged = log_d_multinomial(x=Lambda_merged, size=numpy.sum(Lambda_merged), + prob=pr_merged) + loglik_U = log_d_multinomial(x=Lambda_U_mod, size=numpy.sum(Lambda_U_mod), prob=pr_U) + loglik_j = log_d_multinomial(x=Lambda_j_mod, size=numpy.sum(Lambda_j_mod), prob=pr_j) + + return 2 * (loglik_merged - loglik_U - loglik_j) diff --git a/tests/test_magnitude_tests.py b/tests/test_magnitude_tests.py new file mode 100644 index 00000000..d072e245 --- /dev/null +++ b/tests/test_magnitude_tests.py @@ -0,0 +1,38 @@ +from unittest import TestCase +from csep.core.catalog_evaluations import (resampled_magnitude_test, MLL_magnitude_test, + MLL_magnitude_test_light) + + +class TestResampledMagnitudeTest(TestCase): + def setUp(self): + pass + + def unit_test1(self): + pass + + def tearDown(self): + pass + + +class TestMLLTest(TestCase): + def setUp(self): + pass + + def unit_test1(self): + pass + + def tearDown(self): + pass + + +class TestMLLTestLight(TestCase): + def setUp(self): + pass + + def unit_test1(self): + pass + + def tearDown(self): + pass + +