Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate rp calcs/exploratory implementation code #2

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions exploration/03_check_return_period_funcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# ---
# jupyter:
# jupytext:
# cell_metadata_filter: -all
# custom_cell_magics: kql
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.11.2
# kernelspec:
# display_name: ds-raster-stats
# language: python
# name: ds-raster-stats
# ---

# %% [markdown]
# This script implements the src.utils return_period function for some test
# data we have already created for somalia in teh hdx-ingest repo.
# it's basically just some sanity checks to make sure the functions work
# and qualitatively look as expected.

# %%
# %matplotlib inline
# %load_ext autoreload
# %autoreload 2

import re
from io import BytesIO

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.ticker import FuncFormatter

from src.utils import cloud_utils
from src.utils import return_periods as rp


# %%
def to_snake_case(name):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()


sample_admin = "Awdal" # just used for plotting checking
# %% [markdown]
# Load data

# %%
pc = cloud_utils.get_container_client(mode="dev", container_name="projects")
blob_name = "ds-floodscan-ingest/df_aer_sfed_som_adm1_zstats.parquet"
blob_client = pc.get_blob_client(blob_name)
blob_data = blob_client.download_blob().readall()
df = pd.read_parquet(BytesIO(blob_data))
df.columns = [to_snake_case(col) for col in df.columns]

# %% [markdown]
# ## Pre Process Data
#
# So just take yearly max values

# %%
df["year"] = pd.to_datetime(df["date"]).dt.year
df_max = df.groupby(["adm1_en", "adm1_pcode", "year"]).max().reset_index()

# %% [markdown]
# ## Calculate LP3 Params
#
# Show the implementation by first just doing each method one-by-one on a
# single sample admin

# %%
rp.lp3_params(
df_max[df_max["adm1_en"] == sample_admin].value, est_method="usgs"
)


# %%
rp.lp3_params(
df_max[df_max["adm1_en"] == sample_admin].value, est_method="lmoments"
)

# %%
rp.lp3_params(
df_max[df_max["adm1_en"] == sample_admin].value, est_method="scipy"
)

# %% [markdown]
# We can also run them all at once with this wrapper

# %%
rp.lp3_params_all(df_max[df_max["adm1_en"] == sample_admin].value)

# %% [markdown]
# Use wrapper function with `groupby` to run for each admin

# %%
df_params = (
df_max.groupby("adm1_en")["value"]
.apply(rp.lp3_params_all)
.reset_index()
.rename(columns={"level_1": "method"})
)

# %% [markdown]
# ## Calculate RPs
#
# Now we can loop through each admin and calculate RPs associated with
# each value

# %%
unique_adm1s = df_params["adm1_en"].unique()
df_list = []

for adm in unique_adm1s:
df_params_adm_filt = df_params[df_params["adm1_en"] == adm]
df_adm_filt = df[df["adm1_en"] == adm]

est_methods = ["lmoments", "scipy", "usgs"]

for method in est_methods:
df_adm_filt[f"rp_{method}"] = rp.lp3_rp(
x=df_adm_filt["value"],
params=df_params_adm_filt.loc[
df_params_adm_filt["method"] == method, "value"
].iloc[0],
est_method=method,
)

df_list.append(df_adm_filt)

# Concatenate all the dataframes in the list
df_combined = pd.concat(df_list, ignore_index=True)


# %% [markdown]
# Plot all RPs from each method - can see that they are generally similar,
# but plot is not very useful.

# %%
df_combined_sample = df_combined[df_combined["adm1_en"] == sample_admin]

# Plot RP values against 'value' for different estimation methods
plt.figure(figsize=(10, 6))

# Plot for lmoments
plt.scatter(
df_combined_sample["value"],
df_combined_sample["rp_lmoments"],
color="blue",
label="RP Lmoments",
alpha=0.5,
)

# Plot for scipy
plt.scatter(
df_combined_sample["value"],
df_combined_sample["rp_scipy"],
color="green",
label="RP Scipy",
alpha=0.5,
)

# Plot for usgs
plt.scatter(
df_combined_sample["value"],
df_combined_sample["rp_usgs"],
color="red",
label="RP USGS",
alpha=0.5,
)

plt.xlabel("Value")
plt.ylabel("Return Period (RP)")
plt.title("Return Periods (RP) vs Value")
plt.legend()
plt.xscale("log")
plt.yscale("log")
plt.grid(True, which="both", ls="--")
plt.show()


# %% [markdown]
# ## Plot RVs for Specified RPs
#
# for a better comparison we can calculate RVs for specified RPs `1-1000`
# for the sample admin

# %%

return_periods = np.arange(1, 10000)

params_sample_lmom = df_params.loc[
(df_params["method"] == "lmoments")
& (df_params["adm1_en"] == sample_admin),
"value",
].iloc[0]
params_sample_usgs = df_params.loc[
(df_params["method"] == "usgs") & (df_params["adm1_en"] == sample_admin),
"value",
].iloc[0]
params_sample_scipy = df_params.loc[
(df_params["method"] == "scipy") & (df_params["adm1_en"] == sample_admin),
"value",
].iloc[0]
# Calculate return values for each return period using lmoments method

# import inspect
# inspect.getsourcelines(rp.lp3_rv)
return_values_lmom = rp.lp3_rv(
rp=return_periods, params=params_sample_lmom, est_method="lmoments"
)
return_values_usgs = rp.lp3_rv(
rp=return_periods, params=params_sample_usgs, est_method="usgs"
)
return_values_scipy = rp.lp3_rv(
rp=return_periods, params=params_sample_scipy, est_method="scipy"
)

# Plot the return periods against the return values
plt.figure(figsize=(10, 6))
plt.plot(
return_periods, return_values_lmom * 100, label="LMoments", color="blue"
)
plt.plot(return_periods, return_values_usgs * 100, label="USGS", color="red")
plt.plot(
return_periods, return_values_scipy * 100, label="SCIPY", color="green"
)
plt.xlabel("Return Period")
plt.ylabel("Return Value (%)")
plt.title("Return Values vs Return Periods")
plt.xscale("log")
plt.yscale("log")
plt.legend()
plt.grid(True, which="both", ls="--")
# Format y-axis to percentage
plt.gca().yaxis.set_major_formatter(
FuncFormatter(lambda y, _: "{:.0f}%".format(y))
)
plt.show()
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ dask==2024.7.0
scipy==1.14.1
pre-commit==3.8.0
netCDF4==1.7.2
scipy==1.14.1
lmoments3==1.0.7
pyarrow==17.0.0
128 changes: 128 additions & 0 deletions src/utils/return_periods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import numpy as np
import pandas as pd
from lmoments3 import distr, lmom_ratios
from scipy.stats import norm, pearson3, skew


def lp3_params(x, est_method="lmoments"):
x = np.asarray(x)
x[x == 0] = np.min(x[x != 0])
x_sorted = np.sort(x)
x_log = np.log10(x_sorted)

if est_method == "lmoments":
lmoments = lmom_ratios(x_log, nmom=4)
params = distr.pe3.lmom_fit(x_log, lmoments) # , lmom_ratios=lmom
elif est_method == "scipy":
params = pearson3.fit(x_log, method="MM")
elif est_method == "usgs":
params = {
"mu": np.mean(x_log),
"sd": np.std(x_log, ddof=1),
"g": skew(x_log),
}
else:
raise ValueError(
"Invalid est_method specified. Choose 'usgs','lmoments' or 'scipy'." # noqa: E501
)
return params


def lp3_params_all(value):
methods = ["usgs", "lmoments", "scipy"]
params = {method: lp3_params(value, method) for method in methods}
return pd.Series(params)


def lp3_rp(x, params, est_method="lmoments"):
x = np.asarray(x)
x_sorted = np.sort(x)
x_log = np.log10(x_sorted)

if est_method == "scipy":
# Calculate the CDF for the log-transformed data
p_lte = pearson3.cdf(x_log, *params)

elif est_method == "lmoments":
p_lte = distr.pe3.cdf(x_log, **params)

elif est_method == "usgs":
g = params["g"]
mu = params["mu"]
stdev = params["sd"]
# Step 1: From quantiles_rp to y_fit_rp
y_fit_rp = x_log

# Step 2: From y_fit_rp to k_rp
k_rp = (y_fit_rp - mu) / stdev

# Step 3: From k_rp to q_rp_norm
q_rp_norm = (g / 6) + ((k_rp * g / 2 + 1) ** (1 / 3) - 1) * 6 / g

# Step 4: From q_rp_norm to rp_exceedance
p_lte = norm.cdf(q_rp_norm, loc=0, scale=1)

else:
raise ValueError(
"Invalid package specified. Choose 'distr' , 'usgs',or 'scipy'."
)

# Calculate the return periods
p_gte = 1 - p_lte
rp = 1 / p_gte

return rp


def lp3_rv(rp, params, est_method="usgs"):
"""
Calculate return values for given return periods using the
Log-Pearson Type III distribution.

Parameters:
rp (list or array-like): List of return periods.
params (dict or tuple): Parameters for the distribution.
- For 'usgs' method, a dictionary with keys 'g' (skewness),
'mu' (mean), and 'sd' (standard deviation).
- For 'lmoments' method, a dictionary with parameters for the
Log Pearson Type III distribution.
- For 'scipy' method, a tuple with parameters for the
Log Pearson Type III distribution.
est_method (str, optional): Method to estimate the return values.
Options are 'usgs', 'lmoments', or 'scipy'. Default is 'usgs'.

Returns:
numpy.ndarray: Return values corresponding to the given return periods.

Raises:
ValueError: If an invalid estimation method is provided.
"""
est_method = est_method.lower()
if est_method not in ["usgs", "lmoments", "scipy"]:
raise ValueError(
"Invalid method. Choose 'usgs' or 'lmoments' or 'scipy'."
)
rp_exceedance = [1 / rp for rp in rp]

if est_method == "usgs":
g = params["g"]
mu = params["mu"]
stdev = params["sd"]

q_rp_norm = norm.ppf(
1 - np.array(rp_exceedance), loc=0, scale=1
) # Normal quantiles
k_rp = (2 / g) * (
((q_rp_norm - (g / 6)) * (g / 6) + 1) ** 3 - 1
) # Skewness adjustment
y_fit_rp = mu + k_rp * stdev # Fitted values for return periods
ret = 10 ** (y_fit_rp)
# return return_value_lp3_usgs(x, rp)
elif est_method == "lmoments":
value_log = distr.pe3.ppf(1 - np.array(rp_exceedance), **params)
ret = 10 ** (value_log)
elif est_method == "scipy":
value_log = pearson3.ppf(1 - np.array(rp_exceedance), *params)
ret = 10 ** (value_log)

return ret