Skip to content

Commit

Permalink
Merge pull request #54 from nico-franco-gomez/dev
Browse files Browse the repository at this point in the history
v0.3.5
  • Loading branch information
nico-franco-gomez authored Apr 17, 2024
2 parents df92413 + 8ed5c01 commit 2e4d417
Show file tree
Hide file tree
Showing 14 changed files with 421 additions and 132 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ adheres to `Semantic Versioning <http://semver.org/spec/v2.0.0.html>`_.
- Validation for results from tests in every module (so far many tests are
only regarding functionality)

`0.3.5 <https://pypi.org/project/dsptoolbox/0.3.5>`_ -
---------------------

Added
~~~~~~~
- `harmonic_distortion_analysis` in ``transfer_functions``
- added possibility of scaling the spectrogram
- calibration using any dBSPL value

Bugfix
~~~~~~~
- `reverb_time` now uses indices of peaks instead of -20 dBFS threshold since
it delivers more accurate results
- now scaling a spectrum of a signal with a window is done correctly (taking
the window into account)

Misc
~~~~~~
- general documentation and small performance improvements

`0.3.4 <https://pypi.org/project/dsptoolbox/0.3.4>`_ -
---------------------

Expand Down
2 changes: 1 addition & 1 deletion dsptoolbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@
"effects",
]

__version__ = "0.3.4"
__version__ = "0.3.5"
80 changes: 48 additions & 32 deletions dsptoolbox/_general_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _calculate_window(
def _get_normalized_spectrum(
f,
spectra: np.ndarray,
mode="standard",
scaling: str = "amplitude",
f_range_hz=[20, 20000],
normalize: str | None = None,
smoothe: int = 0,
Expand All @@ -123,9 +123,9 @@ def _get_normalized_spectrum(
Frequency vector.
spectra : `np.ndarray`
Spectrum matrix.
mode : str, optional
Mode of spectrum, needed for factor in dB respresentation.
Choose from `'standard'` or `'welch'`. Default: `'standard'`.
scaling : str, optional
Information about whether the spectrum is scaled as an amplitude or
power. Choose from `'amplitude'` or `'power'`. Default: `'amplitude'`.
f_range_hz : array-like with length 2
Range of frequencies to get the normalized spectrum back.
Default: [20, 20e3].
Expand All @@ -138,7 +138,8 @@ def _get_normalized_spectrum(
1/smoothe-fractional octave band smoothing for magnitude spectra. Pass
`0` for no smoothing. Default: 0.
phase : bool, optional
When `True`, phase spectra are also returned. Default: `False`.
When `True`, phase spectra are also returned. Smoothing is also
applied to the unwrapped phase. Default: `False`.
calibrated_data : bool, optional
When `True`, it is assumed that the time data has been calibrated
to be in Pascal so that it is scaled by p0=20e-6 Pa. Default: `False`.
Expand Down Expand Up @@ -176,15 +177,16 @@ def _get_normalized_spectrum(
+ "possible since the spectra are not complex"
)
# Factor
if mode == "standard":
if scaling == "amplitude":
scale_factor = 20e-6 if calibrated_data and normalize is None else 1
factor = 20
elif mode == "welch":
elif scaling == "power":
scale_factor = 4e-10 if calibrated_data and normalize is None else 1
factor = 10
else:
raise ValueError(
f"{mode} is not supported. Please select standard " "or welch"
f"{scaling} is not supported. Please select amplitude or "
+ "power scaling"
)

if f_range_hz is not None:
Expand All @@ -204,24 +206,23 @@ def _get_normalized_spectrum(

mag_spectra = np.abs(spectra)

if smoothe != 0 and mode == "standard":
if mode == "standard":
if smoothe != 0:
if scaling == "amplitude":
mag_spectra = _fractional_octave_smoothing(mag_spectra, smoothe)
else: # Welch
else: # Smoothing always in amplitude representation
mag_spectra = (
_fractional_octave_smoothing(mag_spectra**0.5, smoothe) ** 2
)

epsilon = 10 ** (-400 / 10)
epsilon = 10 ** (-800 / 10)
mag_spectra = factor * np.log10(
np.clip(mag_spectra, a_min=epsilon, a_max=None) / scale_factor
)

if normalize is not None:
for i in range(spectra.shape[1]):
if normalize == "1k":
gain = _get_exact_gain_1khz(f, mag_spectra[:, i])
mag_spectra[:, i] -= gain
mag_spectra[:, i] -= _get_exact_gain_1khz(f, mag_spectra[:, i])
else:
mag_spectra[:, i] -= np.max(mag_spectra[:, i])

Expand Down Expand Up @@ -1072,6 +1073,7 @@ def _scale_spectrum(
mode: str | None,
time_length_samples: int,
sampling_rate_hz: int,
window: np.ndarray | None = None,
) -> np.ndarray:
"""Scale the spectrum directly from the (unscaled) FFT. It is assumed that
the time data was not windowed.
Expand All @@ -1095,6 +1097,13 @@ def _scale_spectrum(
`np.ndarray`
Scaled spectrum
Notes
-----
- The amplitude spectrum shows the RMS value of each frequency in the
signal.
- Integrating the power spectral density over the frequency spectrum
delivers the total energy contained in the signal (parseval's theorem).
"""
assert time_length_samples in (
(spectrum.shape[0] - 1) * 2,
Expand All @@ -1113,16 +1122,23 @@ def _scale_spectrum(
), f"{mode} is not a supported mode"

if "spectral density" in mode:
factor = (1 / time_length_samples / sampling_rate_hz) ** 0.5
if window is None:
factor = (2 / time_length_samples / sampling_rate_hz) ** 0.5
else:
factor = (
2 / np.sum(window**2, axis=0, keepdims=True) / sampling_rate_hz
) ** 0.5
elif "spectrum" in mode:
factor = 1 / time_length_samples
if window is None:
factor = 2**0.5 / time_length_samples
else:
factor = 2**0.5 / np.sum(window, axis=0, keepdims=True)

spectrum *= factor

spectrum[0] /= 2**0.5
if time_length_samples % 2 == 0:
spectrum[1:-1] *= 2**0.5
else:
spectrum[1:] *= 2**0.5
spectrum[-1] /= 2**0.5

if "power" in mode:
spectrum = np.abs(spectrum) ** 2
Expand Down Expand Up @@ -1154,24 +1170,24 @@ def _get_fractional_impulse_peak_index(
n_channels = time_data.shape[1]
delay_samples = np.argmax(np.abs(time_data), axis=0).astype(int)

# Take only the part of the time vector with the impulses and some safety
# samples
# Take only the part of the time vector with the peaks and some safety
# samples (±200)
time_data = time_data[: np.max(delay_samples) + 200, :]
start_offset = max(np.min(delay_samples) - 200, 0)
time_data = time_data[start_offset:, :]
delay_samples -= start_offset

h = hilbert(time_data, axis=0)
h = hilbert(time_data, axis=0).imag
x = np.arange(-polynomial_points + 1, polynomial_points + 1)

latency_samples = np.zeros(n_channels)

for ch in range(n_channels):
# ===== Ensure that delay_samples is before the peak in each channel
selection = h[delay_samples[ch] : delay_samples[ch] + 2, ch].imag
selection = h[delay_samples[ch] : delay_samples[ch] + 2, ch]
move_back_one_sample = selection[0] * selection[1] > 0
delay_samples[ch] -= int(move_back_one_sample)
if (
h[delay_samples[ch], ch].imag * h[delay_samples[ch] + 1, ch].imag
> 0
):
if h[delay_samples[ch], ch] * h[delay_samples[ch] + 1, ch] > 0:
latency_samples[ch] = delay_samples[ch] + int(move_back_one_sample)
warn(
f"Fractional latency detection failed for channel {ch}. "
Expand All @@ -1191,16 +1207,16 @@ def _get_fractional_impulse_peak_index(
+ polynomial_points
+ 1,
ch,
].imag,
],
deg=2 * polynomial_points - 1,
)

# Find root and check it is less than one
roots = np.roots(pol).squeeze()
# Find roots
roots = np.roots(pol)
# Get only root between 0 and 1
roots = roots[
(roots == roots.real) # Real roots
& (roots <= 1 + 1e-10) # Range
& (roots <= 1) # Range
& (roots >= 0)
].real
try:
Expand All @@ -1216,7 +1232,7 @@ def _get_fractional_impulse_peak_index(
continue

latency_samples[ch] = delay_samples[ch] + fractional_delay_samples
return latency_samples
return latency_samples + start_offset


def _remove_impulse_delay_from_phase(
Expand Down
66 changes: 49 additions & 17 deletions dsptoolbox/_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
_pad_trim,
_compute_number_frames,
_get_fractional_impulse_peak_index,
_wrap_phase,
)
from warnings import warn

Expand All @@ -22,16 +23,16 @@ def _latency(
"""
if in2 is None:
in2 = np.repeat(in1[:, 0][..., None], in1.shape[1] - 1, axis=1)
in2 = in1[:, 0][..., None]
in1 = np.atleast_2d(in1[:, 1:])

latency_per_channel_samples = np.zeros(in1.shape[1], dtype=int)
for i in range(in1.shape[1]):
xcorr = correlate(in2[:, i].flatten(), in1[:, i].flatten())
latency_per_channel_samples[i] = int(
in1.shape[0] - np.argmax(np.abs(xcorr)) - 1
)
return latency_per_channel_samples
xcorr = correlate(in2, in1, mode="full")
peak_inds = np.argmax(np.abs(xcorr), axis=0)
else:
peak_inds = np.zeros(in1.shape[1], dtype=int)
for i in range(in1.shape[1]):
xcorr = correlate(in2[:, i].flatten(), in1[:, i].flatten())
peak_inds[i] = int(np.argmax(np.abs(xcorr)))
return in1.shape[0] - peak_inds - 1


def _fractional_latency(
Expand Down Expand Up @@ -355,7 +356,7 @@ def _minimum_phase(
)[:original_length]

if not unwrapped:
minimum_phase = np.angle(np.exp(1j * minimum_phase))
minimum_phase = _wrap_phase(minimum_phase)
return minimum_phase


Expand All @@ -368,7 +369,7 @@ def _stft(
fft_length_samples: int | None = None,
detrend: bool = True,
padding: bool = False,
scaling: bool = False,
scaling: str | None = None,
):
"""Computes the STFT of a signal. Output matrix has (freqs_hz, seconds_s).
Expand Down Expand Up @@ -396,9 +397,10 @@ def _stft(
When `True`, the original signal is padded in the beginning and ending
so that no energy is lost due to windowing when the COLA constraint is
met. Default: `False`.
scaling : bool, optional
When `True`, the output is scaled as an amplitude spectrum, otherwise
no scaling is applied. See references for details. Default: `False`.
scaling : str, optional
Scale as `"amplitude spectrum"`, `"amplitude spectral density"`,
`"power spectrum"` or `"power spectral density"`. Pass `None`
to avoid any scaling. See references for details. Default: `None`.
Returns
-------
Expand All @@ -424,12 +426,30 @@ def _stft(
assert overlap_percent >= 0 and overlap_percent < 100, (
"overlap_percent" + " should be between 0 and 100"
)
valid_scaling = [
"power spectrum",
"power spectral density",
"amplitude spectrum",
"amplitude spectral density",
None,
]
assert scaling in valid_scaling, (
f"{scaling} is not valid. Use "
+ "power spectrum, power spectral density, amplitude spectrum, "
+ "amplitude spectral density or None"
)

if scaling is None:
scaling = ""

if fft_length_samples is None:
fft_length_samples = window_length_samples

# Window and step
window = windows.get_window(
window_type, window_length_samples, fftbins=True
)
overlap_samples = int(overlap_percent / 100 * window_length_samples)
overlap_samples = int(overlap_percent / 100 * window_length_samples + 0.5)
step = window_length_samples - overlap_samples

# Check COLA
Expand All @@ -452,12 +472,24 @@ def _stft(
# Spectra
stft = np.fft.rfft(time_x, axis=0, n=fft_length_samples)
# Scaling
if scaling:
factor = np.sqrt(2 / np.sum(window) ** 2)
if scaling in ("power spectrum", "amplitude spectrum"):
factor = 2**0.5 / np.sum(window)
elif scaling in ("power spectral density", "amplitude spectral density"):
factor = (2 / (window @ window) / fs_hz) ** 0.5
else:
factor = 1

stft *= factor

if scaling:
stft[0, ...] /= 2**0.5

if scaling and fft_length_samples % 2 == 0:
stft[-1, ...] /= 2**0.5

if "power" in scaling:
stft = np.abs(stft) ** 2

time_s = np.linspace(0, len(x) / fs_hz, stft.shape[1])
freqs_hz = np.fft.rfftfreq(len(window), 1 / fs_hz)
return time_s, freqs_hz, stft
Expand Down
Loading

0 comments on commit 2e4d417

Please sign in to comment.