Skip to content

Commit

Permalink
Merge pull request #62 from nico-franco-gomez/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
nico-franco-gomez authored Aug 18, 2024
2 parents 572abdf + dcbc8e9 commit 313d4e1
Show file tree
Hide file tree
Showing 26 changed files with 731 additions and 578 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ adheres to `Semantic Versioning <http://semver.org/spec/v2.0.0.html>`_.
- Validation for results from tests in every module (so far many tests are
only regarding functionality)

`0.4.2 <https://pypi.org/project/dsptoolbox/0.4.2>`_ -
---------------------
Added
~~~~~~~
- `apply_gain` utility function in ``standard``
- beta parameter for arbitrary noise generation
- `GroupDelayDesigner` in ``filterbanks``
- nomalization of signals now accepts rms values

Misc
~~~~~
- frequency response interpolation with more interpolation modes
- refactored `PhaseLinearizer`

Bugfix
~~~~~~
- corrected a case where scaling of spectrum while plotting was wrong


`0.4.1 <https://pypi.org/project/dsptoolbox/0.4.1>`_ -
---------------------

Expand Down
4 changes: 3 additions & 1 deletion dsptoolbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CalibrationData,
envelope,
dither,
apply_gain,
)
from .classes import (
Filter,
Expand Down Expand Up @@ -59,6 +60,7 @@
"CalibrationData",
"envelope",
"dither",
"apply_gain",
# Modules
"transfer_functions",
"distances",
Expand All @@ -73,4 +75,4 @@
"tools",
]

__version__ = "0.4.1"
__version__ = "0.4.2"
212 changes: 146 additions & 66 deletions dsptoolbox/_general_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,72 @@
from scipy.fft import next_fast_len


def to_db(
x: NDArray[np.float64],
amplitude_input: bool,
dynamic_range_db: float | None = None,
min_value: float | None = float(np.finfo(np.float64).smallest_normal),
) -> NDArray[np.float64]:
"""Convert to dB from amplitude or power representation. Clipping small
values can be activated in order to avoid -inf dB outcomes.
Parameters
----------
x : NDArray[np.float64]
Array to convert to dB.
amplitude_input : bool
Set to True if the values in x are in their linear form. False means
they have been already squared, i.e., they are in their power form.
dynamic_range_db : float, None, optional
If specified, a dynamic range in dB for the vector is applied by
finding its largest value and clipping to `max - dynamic_range_db`.
This will always overwrite `min_value` if specified. Pass None to
ignore. Default: None.
min_value : float, None, optional
Minimum value to clip `x` before converting into dB in order to avoid
`np.nan` or `-np.inf` in the output. Pass None to ignore. Default:
`np.finfo(np.float64).smallest_normal`.
Returns
-------
NDArray[np.float64]
New array or float in dB.
"""
factor = 20.0 if amplitude_input else 10.0

if min_value is None and dynamic_range_db is None:
return factor * np.log10(np.abs(x))

x_abs = np.abs(x)

if dynamic_range_db is not None:
min_value = np.max(x_abs) * 10.0 ** (-abs(dynamic_range_db) / factor)

return factor * np.log10(np.clip(x_abs, a_min=min_value, a_max=None))


def from_db(x: float | NDArray[np.float64], amplitude_output: bool):
"""Get the values in their amplitude or power form from dB.
Parameters
----------
x : float, NDArray[np.float64]
Values in dB.
amplitude_output : bool
When True, the values are returned in their linear form. Otherwise,
the squared (power) form is returned.
Returns
-------
float NDArray[np.float64]
Converted values
"""
factor = 20.0 if amplitude_output else 10.0
return 10 ** (x / factor)


def _find_nearest(points, vector) -> NDArray[np.int_]:
"""Gives back the indexes with the nearest points in vector
Expand Down Expand Up @@ -193,10 +259,10 @@ def _get_normalized_spectrum(
# Factor
if scaling == "amplitude":
scale_factor = 20e-6 if calibrated_data and normalize is None else 1
factor = 20
amplitude_scaling = True
elif scaling == "power":
scale_factor = 4e-10 if calibrated_data and normalize is None else 1
factor = 10
amplitude_scaling = False
else:
raise ValueError(
f"{scaling} is not supported. Please select amplitude or "
Expand Down Expand Up @@ -228,10 +294,7 @@ def _get_normalized_spectrum(
_fractional_octave_smoothing(mag_spectra**0.5, smoothe) ** 2
)

epsilon = 10 ** (-800 / 10)
mag_spectra = factor * np.log10(
np.clip(mag_spectra, a_min=epsilon, a_max=None) / scale_factor
)
mag_spectra = to_db(mag_spectra / scale_factor, amplitude_scaling, 500)

if normalize is not None:
for i in range(spectra.shape[1]):
Expand Down Expand Up @@ -263,8 +326,9 @@ def _get_normalized_spectrum(
def _find_frequencies_above_threshold(
spec, f, threshold_db, normalize=True
) -> list:
"""Finds frequencies above a certain threshold in a given spectrum."""
denum_db = 20 * np.log10(np.abs(spec))
"""Finds frequencies above a certain threshold in a given (amplitude)
spectrum."""
denum_db = to_db(spec, True)
if normalize:
denum_db -= np.max(denum_db)
freqs = f[denum_db > threshold_db]
Expand Down Expand Up @@ -352,14 +416,15 @@ def _compute_number_frames(


def _normalize(
s: NDArray[np.float64], dbfs: float, mode="peak"
s: NDArray[np.float64], dbfs: float, mode: str, per_channel: bool
) -> NDArray[np.float64]:
"""Normalizes a signal.
Parameters
----------
s: NDArray[np.float64]
Signal to normalize.
Signal to normalize. It can be 1 or 2D. Time samples are assumed to
be in the outer axis.
dbfs: float
dbfs value to normalize to.
mode: str, optional
Expand All @@ -372,22 +437,44 @@ def _normalize(
Normalized signal.
"""
s = s.copy()
assert mode in ("peak", "rms"), (
"Mode of normalization is not "
+ "available. Select either peak or rms"
)

onedim = s.ndim == 1
if onedim:
s = s[..., None]

factor = from_db(dbfs, True)
if mode == "peak":
s /= np.max(np.abs(s))
s *= 10 ** (dbfs / 20)
if mode == "rms":
s *= 10 ** (dbfs / 20) / _rms(s)
return s
factor /= np.max(np.abs(s), axis=0 if per_channel else None)
elif mode == "rms":
factor /= _rms(s if per_channel else s.flatten())
s_norm = s * factor

return s_norm[..., 0] if onedim else s_norm


def _rms(x: NDArray[np.float64]) -> NDArray[np.float64]:
"""Root mean square computation."""
return np.sqrt(np.sum(x**2) / len(x))
def _rms(x: NDArray[np.float64]) -> float | NDArray[np.float64]:
"""Root mean squared value of a discrete time series.
Parameters
----------
x : NDArray[np.float64]
Time series.
Returns
-------
rms : float or NDArray[np.float64]
Root mean squared of a signal. Float or NDArray[np.float64] depending
on input.
"""
single_dim = x.ndim == 1
x = x[..., None] if single_dim else x
rms_vals = np.std(x, axis=0)
return rms_vals[..., 0] if single_dim else rms_vals


def _amplify_db(s: NDArray[np.float64], db: float) -> NDArray[np.float64]:
Expand Down Expand Up @@ -645,7 +732,7 @@ def _frequency_weightning(
weights = 12194**2 * f**2 / ((f**2 + 20.6**2) * (f**2 + 12194**2))
weights /= weights[ind1k]
if db_output:
weights = 20 * np.log10(weights)
weights = to_db(weights, True)
return weights


Expand Down Expand Up @@ -1077,16 +1164,15 @@ def _get_chirp_rate(range_hz: list, length_seconds: float) -> float:


def _correct_for_real_phase_spectrum(phase_spectrum: NDArray[np.float64]):
"""This function takes in a wrapped phase spectrum and corrects it to
be for a real signal (assuming the last frequency bin corresponds to
nyquist, i.e., time data had an even length). This effectively adds a
small linear phase offset so that the phase at nyquist is either 0 or
np.pi.
"""This function takes in a phase spectrum and corrects it to be for a real
signal (assuming the last frequency bin corresponds to nyquist, i.e., time
data had an even length). This effectively adds a small linear phase offset
so that the phase at nyquist is either 0 or np.pi.
Parameters
----------
phase_spectrum : NDArray[np.float64]
Wrapped phase to be corrected. It is assumed that its last element
Phase to be corrected. It is assumed that its last element
corresponds to the nyquist frequency.
Returns
Expand All @@ -1095,11 +1181,7 @@ def _correct_for_real_phase_spectrum(phase_spectrum: NDArray[np.float64]):
Phase spectrum that can correspond to a real signal.
"""
factor = (
phase_spectrum[-1]
if phase_spectrum[-1] >= 0
else np.pi + phase_spectrum[-1]
)
factor = phase_spectrum[-1] % np.pi
return (
phase_spectrum
- np.linspace(0, 1, len(phase_spectrum), endpoint=True) * factor
Expand Down Expand Up @@ -1456,16 +1538,18 @@ def _interpolate_fr(
Frequency response to be interpolated.
f_target : NDArray[np.float64]
Target frequency vector.
mode : str, optional
Convert to amplitude or power representation from dB during
interpolation (or the other way around) using the modes `"db2power"`
(input in dB, interpolation in power spectrum, output in dB),
`"db2amplitude"`, `"amplitude2db"`, `"power2db"`. Pass `None` to avoid
any conversion. Default: `None`.
interpolation_scheme : str, optional
mode : str, None, {"db2amplitude", "amplitude2db", "power2db",\
"power2amplitude", "amplitude2power"}, optional
Convert between amplitude, power or dB representation during the
interpolation step. For instance, using the modes "db2power" means
input in dB, interpolation in power spectrum, output in dB. Available
modes are "db2amplitude", "amplitude2db", "power2db",
"power2amplitude", "amplitude2power". Pass None to avoid any
conversion. Default: None.
interpolation_scheme : str, {"linear", "quadratic", "cubic"}, optional
Type of interpolation to use. See `scipy.interpolation.interp1d` for
details. Choose from `"quadratic"` or `"cubic"` splines, or `"linear"`.
Default: `"linear"`.
details. Choose from "quadratic" or "cubic" splines, or "linear".
Default: "linear".
Returns
-------
Expand All @@ -1476,11 +1560,11 @@ def _interpolate_fr(
-----
- The input is always assumed to be already sorted.
- In case `f_target` has values outside the boundaries of `f_interp`,
the first and last values of `fr_interp` are used for extrapolation. This
also applies if interpolation is done in dB. If done in amplitude or
power units, the fill value outside the boundaries is 0.
0 is used as the fill value. For interpolation in dB, fill values are
the vector's edges.
- The interpolation is always done along the first (outer) axis or the
vector.
- When converting to dB, the default clipping value of `to_db` is used.
- Theoretical thoughts on interpolating an amplitude or power
frequency response:
- Using complex and dB values during interpolation are not very precise
Expand Down Expand Up @@ -1519,31 +1603,29 @@ def _interpolate_fr(
"""

fill_value = (fr_interp[0], fr_interp[-1])
fill_value = (0.0, 0.0)
y = fr_interp.copy()

# Conversion if necessary
if mode is not None:
mode = mode.lower()
factor = 20 if "amplitude" in mode else 10
if mode[:3] == "db2":
fr_interp = 10 ** (fr_interp / factor)
fill_value = (0.0, 0.0)
if mode == "power2amplitude":
y **= 0.5
elif mode == "amplitude2power":
y **= 2.0
elif mode[:3] == "db2":
y = from_db(y, "amplitude" in mode)
elif mode[-3:] == "2db":
fr_interp = factor * np.log10(
np.clip(
np.abs(fr_interp),
a_min=np.finfo(np.float64).smallest_normal,
a_max=None,
)
)
fill_value = (fr_interp[0], fr_interp[-1])
y = to_db(y, "amplitude" in mode)
fill_value = (y[0], y[-1])
else:
raise ValueError(f"Unsupported interpolation mode: {mode}")

interpolated = interp1d(
f_interp,
fr_interp,
y,
kind=interpolation_scheme,
copy=False,
bounds_error=False,
assume_sorted=True,
fill_value=fill_value,
Expand All @@ -1552,16 +1634,14 @@ def _interpolate_fr(

# Back conversion if activated
if mode is not None:
if mode[:3] == "db2":
interpolated = factor * np.log10(
np.clip(
np.abs(interpolated),
a_min=np.finfo(np.float64).smallest_normal,
a_max=None,
)
)
if mode == "power2amplitude":
interpolated **= 2.0
elif mode == "amplitude2power":
interpolated **= 0.5
elif mode[:3] == "db2":
interpolated = to_db(interpolated, "amplitude" in mode)
elif mode[-3:] == "2db":
interpolated = 10 ** (interpolated / factor)
interpolated = from_db(interpolated, "amplitude" in mode)

return interpolated

Expand Down
Loading

0 comments on commit 313d4e1

Please sign in to comment.