Skip to content

Commit

Permalink
Merge pull request #58 from nico-franco-gomez/dev
Browse files Browse the repository at this point in the history
v0.3.8
  • Loading branch information
nico-franco-gomez authored May 26, 2024
2 parents 65308a9 + 0b836f2 commit 4d487d5
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 165 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ adheres to `Semantic Versioning <http://semver.org/spec/v2.0.0.html>`_.
- Validation for results from tests in every module (so far many tests are
only regarding functionality)

`0.3.8 <https://pypi.org/project/dsptoolbox/0.3.8>`_ -
---------------------

Misc
~~~~~~~
- renamed paramater `remove_impulse_delay` to `remove_ir_latency`
- changed default values in `PhaseLinearizer`
- general documentation improvements

Bugfix
~~~~~~
- `find_ir_latency` now searches for the latency in comparison to the minimum
phase ir
- `harmonic_distortion_analysis` was fixed so that it can succesfully trim
the fundamental ir

`0.3.7 <https://pypi.org/project/dsptoolbox/0.3.7>`_ -
---------------------

Expand Down
2 changes: 1 addition & 1 deletion dsptoolbox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@
"effects",
]

__version__ = "0.3.7"
__version__ = "0.3.8"
118 changes: 115 additions & 3 deletions dsptoolbox/_general_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"""

import numpy as np
from scipy.signal import windows, convolve as scipy_convolve, hilbert
from scipy.signal import (
windows,
convolve as scipy_convolve,
hilbert,
correlate,
)
from scipy.interpolate import interp1d
from scipy.linalg import toeplitz as toeplitz_scipy
from os import sep
Expand Down Expand Up @@ -1235,7 +1240,7 @@ def _get_fractional_impulse_peak_index(
return latency_samples + start_offset


def _remove_impulse_delay_from_phase(
def _remove_ir_latency_from_phase(
freqs: np.ndarray,
phase: np.ndarray,
time_data: np.ndarray,
Expand All @@ -1261,5 +1266,112 @@ def _remove_impulse_delay_from_phase(
New phase response without impulse delay.
"""
delays_s = _get_fractional_impulse_peak_index(time_data) / sampling_rate_hz
min_ir = _min_phase_ir_from_real_cepstrum(time_data)
delays_s = _fractional_latency(time_data, min_ir) / sampling_rate_hz
return _wrap_phase(phase + 2 * np.pi * freqs[:, None] * delays_s[None, :])


def _min_phase_ir_from_real_cepstrum(time_data: np.ndarray):
"""Returns minimum-phase version of a time series using the real cepstrum
method.
Parameters
----------
time_data : `np.ndarray`
Time series to compute the minimum phase version from. It is assumed
to have shape (time samples, channels).
Returns
-------
min_phase_time_data : `np.ndarray`
New time series.
"""
return np.real(
np.fft.ifft(
_get_minimum_phase_spectrum_from_real_cepstrum(time_data), axis=0
)
)


def _get_minimum_phase_spectrum_from_real_cepstrum(time_data: np.ndarray):
"""Returns minimum-phase version of a time series using the real cepstrum
method.
Parameters
----------
time_data : `np.ndarray`
Time series to compute the minimum phase version from. It is assumed
to have shape (time samples, channels).
Returns
-------
`np.ndarray`
New spectrum with minimum phase.
"""
# Real cepstrum
y = np.real(
np.fft.ifft(np.log(np.abs(np.fft.fft(time_data, axis=0))), axis=0)
)

# Window in the cepstral domain, like obtaining hilbert transform
w = np.zeros(y.shape[0])
w[0] = 1
w[: len(w) // 2 - 1] = 2
# If length is even, nyquist is exactly in the middle
if len(w) % 2 == 0:
w[len(w) // 2] = 1

# Windowing in cepstral domain and back to spectral domain
return np.exp(np.fft.fft(y * w[..., None], axis=0))


def _fractional_latency(
td1: np.ndarray, td2: np.ndarray | None = None, polynomial_points: int = 1
):
"""This function computes the sub-sample latency between two signals using
Zero-Crossing of the analytic (hilbert transformed) correlation function.
The number of polynomial points taken around the correlation maximum can be
set, although some polynomial orders might fail to compute the root. In
that case, integer latency will be returned for the respective channel.
Parameters
----------
td1 : `np.ndaray`
Delayed version of the signal.
td2 : `np.ndarray`, optional
Original version of the signal. If `None` is passed, the latencies
are computed between the first channel of td1 and every other.
Default: `None`.
polynomial_points : int, optional
This corresponds to the number of points taken around the root in order
to fit a polynomial. Accuracy might improve with higher orders but
it could also lead to ill-conditioned polynomials. In case root finding
is not successful, integer latency values are returned. Default: 1.
Returns
-------
lags : `np.ndarray`
Fractional delays. It has shape (channel). In case td2 was `None`, its
length is `channels - 1`.
References
----------
- N. S. M. Tamim and F. Ghani, "Hilbert transform of FFT pruned cross
correlation function for optimization in time delay estimation," 2009
IEEE 9th Malaysia International Conference on Communications (MICC),
Kuala Lumpur, Malaysia, 2009, pp. 809-814,
doi: 10.1109/MICC.2009.5431382.
"""
if td2 is None:
td2 = td1[:, 0][..., None]
td1 = np.atleast_2d(td1[:, 1:])
xcor = correlate(td2, td1)
else:
xcor = np.zeros((td1.shape[0] + td2.shape[0] - 1, td2.shape[1]))
for i in range(td2.shape[1]):
xcor[:, i] = correlate(td2[:, i], td1[:, i])
inds = _get_fractional_impulse_peak_index(xcor, polynomial_points)
return td1.shape[0] - inds - 1
51 changes: 0 additions & 51 deletions dsptoolbox/_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ._general_helpers import (
_pad_trim,
_compute_number_frames,
_get_fractional_impulse_peak_index,
_wrap_phase,
)
from warnings import warn
Expand All @@ -35,56 +34,6 @@ def _latency(
return in1.shape[0] - peak_inds - 1


def _fractional_latency(
td1: np.ndarray, td2: np.ndarray | None = None, polynomial_points: int = 1
):
"""This function computes the sub-sample latency between two signals using
Zero-Crossing of the analytic (hilbert transformed) correlation function.
The number of polynomial points taken around the correlation maximum can be
set, although some polynomial orders might fail to compute the root. In
that case, integer latency will be returned for the respective channel.
Parameters
----------
td1 : `np.ndaray`
Delayed version of the signal.
td2 : `np.ndarray`, optional
Original version of the signal. If `None` is passed, the latencies
are computed between the first channel of td1 and every other.
Default: `None`.
polynomial_points : int, optional
This corresponds to the number of points taken around the root in order
to fit a polynomial. Accuracy might improve with higher orders but
it could also lead to ill-conditioned polynomials. In case root finding
is not successful, integer latency values are returned. Default: 1.
Returns
-------
lags : `np.ndarray`
Fractional delays. It has shape (channel). In case td2 was `None`, its
length is `channels - 1`.
References
----------
- N. S. M. Tamim and F. Ghani, "Hilbert transform of FFT pruned cross
correlation function for optimization in time delay estimation," 2009
IEEE 9th Malaysia International Conference on Communications (MICC),
Kuala Lumpur, Malaysia, 2009, pp. 809-814,
doi: 10.1109/MICC.2009.5431382.
"""
if td2 is None:
td2 = td1[:, 0][..., None]
td1 = np.atleast_2d(td1[:, 1:])
xcor = correlate(td2, td1)
else:
xcor = np.zeros((td1.shape[0] + td2.shape[0] - 1, td2.shape[1]))
for i in range(td2.shape[1]):
xcor[:, i] = correlate(td2[:, i], td1[:, i])
inds = _get_fractional_impulse_peak_index(xcor, polynomial_points)
return td1.shape[0] - inds - 1


def _welch(
x: np.ndarray,
y: np.ndarray | None,
Expand Down
4 changes: 2 additions & 2 deletions dsptoolbox/classes/_phaseLinearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _set_target_group_delay(self, target_group_delay: np.ndarray):
def set_parameters(
self,
delay_increase_percent: float = 100.0,
total_length_factor: float = 1.5,
total_length_factor: float = 1.0,
):
"""Set parameters for the FIR filter.
Expand All @@ -97,7 +97,7 @@ def set_parameters(
Default: 100.
total_length_factor : float, optional
The total length of the filter is based on the longest group delay.
This factor can augment it. Default: 1.5.
This factor can augment it. Default: 1.
Notes
-----
Expand Down
2 changes: 1 addition & 1 deletion dsptoolbox/classes/filter_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def get_coefficients(
coefficients : array-like
Array with filter coefficients with shape depending on mode:
- `'ba'`: list(b, a) with b and a of type `np.ndarray`.
- `'sos'`: `np.ndarray`
- `'sos'`: `np.ndarray` with shape (n_sections, 6).
- `'zpk'`: tuple(z, p, k) with z, p, k of type `np.ndarray`
- Return `None` if user decides that ba->sos is too costly. The
threshold is for filters with order > 500.
Expand Down
30 changes: 15 additions & 15 deletions dsptoolbox/classes/signal_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
_check_format_in_path,
_scale_spectrum,
_wrap_phase,
_remove_impulse_delay_from_phase,
_remove_ir_latency_from_phase,
)
from .._standard import _welch, _group_delay_direct, _stft, _csm

Expand Down Expand Up @@ -1018,14 +1018,14 @@ def plot_magnitude(
)
return fig, ax

def plot_time(self) -> tuple[Figure, Axes]:
def plot_time(self) -> tuple[Figure, list[Axes]]:
"""Plots time signals.
Returns
-------
fig : `matplotlib.figure.Figure`
Figure.
ax : `matplotlib.axes.Axes`
ax : list of `matplotlib.axes.Axes`
Axes.
"""
Expand Down Expand Up @@ -1065,7 +1065,7 @@ def plot_spl(
normalize_at_peak: bool = False,
range_db: float | None = 100.0,
window_length_s: float = 0.0,
) -> tuple[Figure, Axes]:
) -> tuple[Figure, list[Axes]]:
"""Plots the momentary sound pressure level (dB or dBFS) of each
channel. If the signal is calibrated and not normalized at peak, the
values correspond to dB, otherwise they are dBFS.
Expand All @@ -1087,7 +1087,7 @@ def plot_spl(
-------
fig : `matplotlib.figure.Figure`
Figure.
ax : `matplotlib.axes.Axes`
ax : list of `matplotlib.axes.Axes`
Axes.
Notes
Expand Down Expand Up @@ -1191,7 +1191,7 @@ def plot_spl(
def plot_group_delay(
self,
range_hz=[20, 20000],
remove_impulse_delay: bool = False,
remove_ir_latency: bool = False,
smoothing: int = 0,
) -> tuple[Figure, Axes]:
"""Plots group delay of each channel.
Expand All @@ -1203,7 +1203,7 @@ def plot_group_delay(
range_hz : array-like with length 2, optional
Range of frequencies for which to show group delay.
Default: [20, 20e3].
remove_impulse_delay : bool, optional
remove_ir_latency : bool, optional
When `True`, the delay of the impulse is removed prior to the
computation of the group delay. Default: `False`.
smoothing : int, optional
Expand Down Expand Up @@ -1241,8 +1241,8 @@ def plot_group_delay(
self._spectrum_parameters = prior_spectrum_parameters

sp = np.angle(sp)
if remove_impulse_delay:
sp = _remove_impulse_delay_from_phase(
if remove_ir_latency:
sp = _remove_ir_latency_from_phase(
f, sp, self.time_data, self.sampling_rate_hz
)

Expand Down Expand Up @@ -1337,14 +1337,14 @@ def plot_spectrogram(
)
return fig, ax

def plot_coherence(self) -> tuple[Figure, Axes]:
def plot_coherence(self) -> tuple[Figure, list[Axes]]:
"""Plots coherence measurements if there are any.
Returns
-------
fig : `matplotlib.figure.Figure`
Figure.
ax : `matplotlib.axes.Axes`
ax : list of `matplotlib.axes.Axes`
Axes.
"""
Expand Down Expand Up @@ -1372,7 +1372,7 @@ def plot_phase(
range_hz=[20, 20e3],
unwrap: bool = False,
smoothing: int = 0,
remove_impulse_delay: bool = False,
remove_ir_latency: bool = False,
) -> tuple[Figure, Axes]:
"""Plots phase of the frequency response, only available if the method
for the spectrum `"standard"`.
Expand All @@ -1388,7 +1388,7 @@ def plot_phase(
When different than 0, the phase response is smoothed across the
1/smoothing-octave band. This only applies smoothing to the plot
data. Default: 0.
remove_impulse_delay : bool, optional
remove_ir_latency : bool, optional
If the signal is of type `"rir"` or `"ir"`, the delay of the
impulse can be removed. Default: `False`.
Expand All @@ -1415,12 +1415,12 @@ def plot_phase(

self._spectrum_parameters["smoothe"] = prior_smoothing

if remove_impulse_delay:
if remove_ir_latency:
assert self.signal_type in (
"rir",
"ir",
), f"{self.signal_type} is not valid, use rir or ir"
ph = _remove_impulse_delay_from_phase(
ph = _remove_ir_latency_from_phase(
f, ph, self.time_data, self.sampling_rate_hz
)

Expand Down
2 changes: 1 addition & 1 deletion dsptoolbox/standard_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
_indices_above_threshold_dbfs,
_detrend,
_rms,
_fractional_latency,
_fractional_delay_filter,
)
from ._general_helpers import (
Expand All @@ -37,6 +36,7 @@
_fade,
_check_format_in_path,
_get_smoothing_factor_ema,
_fractional_latency,
)


Expand Down
Loading

0 comments on commit 4d487d5

Please sign in to comment.