Skip to content

Commit

Permalink
ENH: Fine calibration support for more MEG systems (mne-tools#12966)
Browse files Browse the repository at this point in the history
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
larsoner and autofix-ci[bot] authored Nov 16, 2024
1 parent ede7f01 commit f45b51b
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 56 deletions.
1 change: 1 addition & 0 deletions doc/changes/devel/12966.newfeature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ability to use :func:`mne.preprocessing.compute_fine_calibration` with non-Neuromag-style systems, as well as options to control the bad-angle and error tolerances, by `Eric Larson`_.
63 changes: 51 additions & 12 deletions mne/preprocessing/_fine_cal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import partial

import numpy as np
from scipy.optimize import fmin_cobyla
from scipy.optimize import minimize

from .._fiff.pick import pick_info, pick_types
from .._fiff.tag import _coil_trans_to_loc, _loc_to_coil_trans
Expand All @@ -16,6 +16,7 @@
from ..utils import (
_check_fname,
_check_option,
_clean_names,
_ensure_int,
_pl,
_reg_pinv,
Expand Down Expand Up @@ -43,6 +44,9 @@ def compute_fine_calibration(
origin=(0.0, 0.0, 0.0),
cross_talk=None,
calibration=None,
*,
angle_limit=5.0,
err_limit=5.0,
verbose=None,
):
"""Compute fine calibration from empty-room data.
Expand All @@ -68,6 +72,17 @@ def compute_fine_calibration(
Dictionary with existing calibration. If provided, the magnetometer
imbalances and adjusted normals will be used and only the gradiometer
imbalances will be estimated (see step 2 in Notes below).
angle_limit : float
The maximum permitted angle in degrees between the original and adjusted
magnetometer normals. If the angle is exceeded, the segment is treated as
an outlier and discarded.
.. versionadded:: 1.9
err_limit : float
The maximum error (in percent) for each channel in order for a segment to
be used.
.. versionadded:: 1.9
%(verbose)s
Returns
Expand Down Expand Up @@ -114,6 +129,12 @@ def compute_fine_calibration(
ext_order = _ensure_int(ext_order, "ext_order")
origin = _check_origin(origin, raw.info, "meg", disp=True)
_check_option("raw.info['bads']", raw.info["bads"], ([],))
_validate_type(err_limit, "numeric", "err_limit")
_validate_type(angle_limit, "numeric", "angle_limit")
for key, val in dict(err_limit=err_limit, angle_limit=angle_limit).items():
if val < 0:
raise ValueError(f"{key} must be greater than or equal to 0, got {val}")
# Fine cal should not include ref channels
picks = pick_types(raw.info, meg=True, ref_meg=False)
if raw.info["dev_head_t"] is not None:
raise ValueError(
Expand Down Expand Up @@ -144,7 +165,7 @@ def compute_fine_calibration(
locs = np.array([ch["loc"] for ch in info["chs"]])
zs = locs[mag_picks, -3:].copy()
if calibration is not None:
_, calibration, _ = _prep_fine_cal(info, calibration)
_, calibration, _ = _prep_fine_cal(info, calibration, ignore_ref=True)
for pi, pick in enumerate(mag_picks):
idx = calibration["ch_names"].index(info["ch_names"][pick])
cals[pick] = calibration["imb_cals"][idx].item()
Expand All @@ -164,7 +185,14 @@ def compute_fine_calibration(
data = raw[picks, start:stop][0]
if ctc is not None:
data = ctc.dot(data)
z, cal, good = _adjust_mag_normals(info, data, origin, ext_order)
z, cal, good = _adjust_mag_normals(
info,
data,
origin,
ext_order,
angle_limit=angle_limit,
err_limit=err_limit,
)
if good:
z_list.append(z)
cal_list.append(cal)
Expand Down Expand Up @@ -213,7 +241,8 @@ def compute_fine_calibration(
cals[ii : ii + 1] if ii in mag_picks else imb[ii]
for ii in range(len(info["ch_names"]))
]
calibration = dict(ch_names=info["ch_names"], locs=locs, imb_cals=imb_cals)
ch_names = _clean_names(info["ch_names"], remove_whitespace=True)
calibration = dict(ch_names=ch_names, locs=locs, imb_cals=imb_cals)
return calibration, count


Expand Down Expand Up @@ -252,7 +281,7 @@ def _vector_angle(x, y):
)


def _adjust_mag_normals(info, data, origin, ext_order):
def _adjust_mag_normals(info, data, origin, ext_order, *, angle_limit, err_limit):
"""Adjust coil normals using magnetometers and empty-room data."""
# in principle we could allow using just mag or mag+grad, but MF uses
# just mag so let's follow suit
Expand Down Expand Up @@ -317,9 +346,16 @@ def _adjust_mag_normals(info, data, origin, ext_order):
)

# Figure out the additive term for z-component
zs[cal_idx] = fmin_cobyla(
objective, old_z, cons=(), rhobeg=1e-3, rhoend=1e-4, disp=False
)
zs[cal_idx] = minimize(
objective,
old_z,
bounds=[(-2, 2)] * 3,
# BFGS is the default for minimize but COBYLA converges faster
method="COBYLA",
# Start with a small relative step because nominal geometry information
# should be fairly accurate to begin with
options=dict(rhobeg=1e-1),
).x

# Do in-place adjustment to all_coils
cals[cal_idx] = 1.0 / np.linalg.norm(zs[cal_idx])
Expand Down Expand Up @@ -347,12 +383,15 @@ def _adjust_mag_normals(info, data, origin, ext_order):
# Chunk is usable if all angles and errors are both small
reason = list()
max_angle = np.max(angles)
if max_angle >= 5.0:
reason.append(f"max angle {max_angle:0.2f} >= 5°")
if max_angle >= angle_limit:
reason.append(f"max angle {max_angle:0.2f} >= {angle_limit:0.1f}°")
each_err = _data_err(data, S_tot, cals, axis=-1)[picks_mag]
n_bad = (each_err > 5.0).sum()
n_bad = (each_err > err_limit).sum()
if n_bad:
reason.append(f"{n_bad} residual{_pl(n_bad)} > 5%")
reason.append(
f"{n_bad} residual{_pl(n_bad)} > {err_limit:0.1f}% "
f"(max: {each_err.max():0.2f}%)"
)
reason = ", ".join(reason)
if reason:
reason = f" ({reason})"
Expand Down
33 changes: 19 additions & 14 deletions mne/preprocessing/maxwell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

from collections import Counter, OrderedDict
from collections import Counter
from functools import partial
from math import factorial
from os import path as op
Expand Down Expand Up @@ -2070,7 +2070,7 @@ def _overlap_projector(data_int, data_res, corr):
return V_principal


def _prep_fine_cal(info, fine_cal):
def _prep_fine_cal(info, fine_cal, *, ignore_ref):
from ._fine_cal import read_fine_calibration

_validate_type(fine_cal, (dict, "path-like"))
Expand All @@ -2081,17 +2081,18 @@ def _prep_fine_cal(info, fine_cal):
extra = "dict"
logger.info(f" Using fine calibration {extra}")
ch_names = _clean_names(info["ch_names"], remove_whitespace=True)
info_to_cal = OrderedDict()
info_to_cal = dict()
missing = list()
for ci, name in enumerate(fine_cal["ch_names"]):
if name not in ch_names:
names_clean = _clean_names(fine_cal["ch_names"], remove_whitespace=True)
for ci, (name, name_clean) in enumerate(zip(fine_cal["ch_names"], names_clean)):
if name_clean not in ch_names:
missing.append(name)
else:
oi = ch_names.index(name)
oi = ch_names.index(name_clean)
info_to_cal[oi] = ci
meg_picks = pick_types(info, meg=True, exclude=[])
meg_picks = pick_types(info, meg=True, exclude=[], ref_meg=not ignore_ref)
if len(info_to_cal) != len(meg_picks):
bad = sorted({ch_names[pick] for pick in meg_picks} - set(fine_cal["ch_names"]))
bad = sorted({ch_names[pick] for pick in meg_picks} - set(names_clean))
raise RuntimeError(
f"Not all MEG channels found in fine calibration file, missing:\n{bad}"
)
Expand All @@ -2102,9 +2103,9 @@ def _prep_fine_cal(info, fine_cal):

def _update_sensor_geometry(info, fine_cal, ignore_ref):
"""Replace sensor geometry information and reorder cal_chs."""
info_to_cal, fine_cal, ch_names = _prep_fine_cal(info, fine_cal)
grad_picks = pick_types(info, meg="grad", exclude=())
mag_picks = pick_types(info, meg="mag", exclude=())
info_to_cal, fine_cal, _ = _prep_fine_cal(info, fine_cal, ignore_ref=ignore_ref)
grad_picks = pick_types(info, meg="grad", exclude=(), ref_meg=not ignore_ref)
mag_picks = pick_types(info, meg="mag", exclude=(), ref_meg=not ignore_ref)

# Determine gradiometer imbalances and magnetometer calibrations
grad_imbalances = np.array(
Expand Down Expand Up @@ -2134,7 +2135,11 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
assert not used[oi]
used[oi] = True
info_ch = info["chs"][oi]
ch_num = int(fine_cal["ch_names"][ci].lstrip("MEG").lstrip("0"))
# This only works for VV-like names
try:
ch_num = int(fine_cal["ch_names"][ci].lstrip("MEG").lstrip("0"))
except ValueError: # invalid literal for int() with base 10
ch_num = oi
cal_chans.append([ch_num, info_ch["coil_type"]])

# Some .dat files might only rotate EZ, so we must check first that
Expand Down Expand Up @@ -2174,7 +2179,7 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
# Channel positions are not changed
info_ch["loc"][3:] = cal_loc[3:]
assert info_ch["coord_frame"] == FIFF.FIFFV_COORD_DEVICE
meg_picks = pick_types(info, meg=True, exclude=())
meg_picks = pick_types(info, meg=True, exclude=(), ref_meg=not ignore_ref)
assert used[meg_picks].all()
assert not used[np.setdiff1d(np.arange(len(used)), meg_picks)].any()
# This gets written to the Info struct
Expand All @@ -2186,7 +2191,7 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
np.clip(ang_shift, -1.0, 1.0, ang_shift)
np.rad2deg(np.arccos(ang_shift), ang_shift) # Convert to degrees
logger.info(
" Adjusted coil positions by (μ ± σ): "
" Adjusted coil orientations by (μ ± σ): "
f"{np.mean(ang_shift):0.1f}° ± {np.std(ang_shift):0.1f}° "
f"(max: {np.max(np.abs(ang_shift)):0.1f}°)"
)
Expand Down
Loading

0 comments on commit f45b51b

Please sign in to comment.