diff --git a/mne/io/cnt/__init__.py b/mne/io/cnt/__init__.py index d85b6bce1db..7ac2b38e1b0 100644 --- a/mne/io/cnt/__init__.py +++ b/mne/io/cnt/__init__.py @@ -2,4 +2,4 @@ # Copyright the MNE-Python contributors. """CNT data reader.""" -from .cnt import read_raw_cnt +from .cnt import read_raw_cnt, read_epochs_cnt, read_evoked_cnt diff --git a/mne/io/cnt/_utils.py b/mne/io/cnt/_utils.py index 86842ad60c6..3601803f6dc 100644 --- a/mne/io/cnt/_utils.py +++ b/mne/io/cnt/_utils.py @@ -73,6 +73,20 @@ def _read_teeg(f, teeg_offset): ) +EpochHeader = namedtuple( + "EpochHeader", + ( + "Accept Ttype Correct Rt Response Reserved" + ), +) +# char Accept; /* accept byte */ +# short Ttype; /* trial type */ +# short Correct; /* accuracy */ +# float Rt; /* reaction time */ +# short Response; /* response type */ +# short Reserved; /* not used */ + + def _get_event_parser(event_type): if event_type == 1: event_maker = CNTEventType1 @@ -83,8 +97,11 @@ def _get_event_parser(event_type): elif event_type == 3: event_maker = CNTEventType3 struct_pattern = " "RawCNT": + r"""Reader function for Neuroscan ``.eeg`` epochs files. + + Parameters + ---------- + input_fname : path-like + Path to the Neuroscan ``.eeg`` file. + events : ndarray, shape (n_events, 3) | None + Path to events file. If array, it is the events typically returned + by the read_events function. If some events don't match the events + of interest as specified by event_id, they will be marked as 'IGNORED' + in the drop log. If None, it is constructed from the Neuroscan (.eeg) + file with each unique event encoded with a different integer. + event_id : int | list of int | dict | None + The id of the event to consider. If dict, the keys can later be used + to access associated events. + Example:: + + {"auditory":1, "visual":3} + + If int, a dict will be created with + the id as string. If a list, all events with the IDs specified + in the list are used. If None, the event_id is constructed from the + Neuroscan (.eeg) file with each descriptions copied from ``eventtype``. + eog : list | tuple | 'auto' + Names or indices of channels that should be designated EOG channels. + If 'auto', the channel names containing ``EOG`` or ``EYE`` are used. + Defaults to empty tuple. + %(uint16_codec)s + %(montage_units)s + %(verbose)s + + Returns + ------- + epochs : instance of Epochs + The epochs. + + See Also + -------- + mne.Epochs : Documentation of attributes and methods. + + + """ + + epochs = EpochsCNT( + input_fname=input_fname, + events=events, + event_id=event_id, + eog=eog, + misc=misc, + ecg=ecg, + emg=emg, + data_format=data_format, + date_format=date_format, + header=header, + verbose=verbose, + ) + return epochs + + +def read_evoked_cnt(fname, info, comment=None): + """Load evoked data from a Neuroscan .avg timelocked structure. + + This function expects to find timelocked data in the structure data_name is + pointing at. + + .. warning:: FieldTrip does not normally store the original information + concerning channel location, orientation, type etc. It is + therefore **highly recommended** to provide the info field. + This can be obtained by reading the original raw data file + with MNE functions (without preload). The returned object + contains the necessary info field. + + Parameters + ---------- + fname : path-like + Path and filename of the ``.mat`` file containing the data. + info : dict or None + The info dict of the raw data file corresponding to the data to import. + If this is set to None, information is extracted from the + Neuroscan structure. + comment : str + Comment on dataset. Can be the condition. + + Returns + ------- + evoked : instance of EvokedArray + An EvokedArray containing the loaded data. + """ + + info, cnt_info = _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, header, mode = 'evoked') + + input_fname = cnt_info["input_fname"] + + # number of points + n_pnts = cnt_info['n_pnts'] + n_channels = cnt_info["orig_nchan"] + cals = cnt_info["cals_avg"] + accepted_epochs = int(cnt_info['accepted_epochs']) + + data = np.empty((n_channels, n_pnts), dtype=float) + UNUSED_HEAD_SIZE = 5 + DATA_POINT_SIZE = 4 + + with open(input_fname, 'rb') as f: + # Ensure the file pointer is at the beginning of the EEG data + data_start = 900 + n_channels * 75 + data_end = data_start + (n_channels * (5 + n_pnts * DATA_POINT_SIZE)) + data_step = 5 + n_pnts * DATA_POINT_SIZE + + for chan, i in enumerate(range(data_start, data_end, data_step)): + f.seek(i) + data_points = np.fromfile(f, dtype='>f', count=n_pnts, offset=UNUSED_HEAD_SIZE) + # Scale the data to physical units in Volts + data[chan] = data_points * cals[chan] / accepted_epochs * 1e-6 + + evoked = EvokedArray(data, info, comment=comment) + return evoked + + +def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, header, mode = 'raw'): """Read the cnt header.""" data_offset = 900 # Size of the 'SETUP' header. cnt_info = dict() @@ -294,11 +438,22 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he session_date = "%s %s" % (read_str(fid, 10), read_str(fid, 12)) meas_date = _session_date_2_meas_date(session_date, date_format) + if mode == 'epoch': + fid.seek(362) + cnt_info['n_epochs'] = np.fromfile(fid, dtype="= 0] @@ -347,8 +502,10 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he cnt_info["channel_offset"] //= n_bytes else: cnt_info["channel_offset"] = 1 - - ch_names, cals, baselines, chs, pos = (list(), list(), list(), list(), list()) + if mode == 'evoked': + ch_names, cals, baselines, chs, pos, cals_avg = (list(), list(), list(), list(), list(), list()) + else: + ch_names, cals, baselines, chs, pos = (list(), list(), list(), list(), list()) bads = list() _validate_type(header, str, "header") @@ -379,6 +536,8 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he sensitivity = np.fromfile(fid, dtype="f4", count=1).item() fid.seek(data_offset + 75 * ch_idx + 71) cal = np.fromfile(fid, dtype="f4", count=1).item() + if mode == 'evoked': + cals_avg.append(cal) cals.append(cal * sensitivity * 1e-6 / 204.8) info = _empty_info(sfreq) @@ -394,6 +553,8 @@ def _get_cnt_info(input_fname, eog, ecg, emg, misc, data_format, date_format, he "last_name": last_name, } + if mode == 'evoked': + cnt_info['cals_avg'] = cals_avg if eog == "auto": eog = _find_channels(ch_names, "EOG") if ecg == "auto": @@ -599,3 +760,219 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): one[idx] -= baselines[idx][:, None] _mult_cal_one(data[:, sample_start:sample_stop], one, idx, cals, mult) + + +class EpochsCNT(BaseEpochs): + r"""Epochs from EEGLAB .set file. + + Parameters + ---------- + input_fname : path-like + Path to the ``.eeg`` epochs file. + events : path-like | array, shape (n_events, 3) | None + Path to events file. If array, it is the events typically returned + by the read_events function. If some events don't match the events + of interest as specified by event_id, they will be marked as 'IGNORED' + in the drop log. If None, it is constructed from the EEGLAB (.set) file + with each unique event encoded with a different integer. + event_id : int | list of int | dict | None + The id of the event to consider. If dict, + the keys can later be used to access associated events. Example: + dict(auditory=1, visual=3). If int, a dict will be created with + the id as string. If a list, all events with the IDs specified + in the list are used. If None, the event_id is constructed from the + EEGLAB (.set) file with each descriptions copied from ``eventtype``. + tmin : float + Start time before event. + baseline : None or tuple of length 2 (default (None, 0)) + The time interval to apply baseline correction. + If None do not apply it. If baseline is (a, b) + the interval is between "a (s)" and "b (s)". + If a is None the beginning of the data is used + and if b is None then b is set to the end of the interval. + If baseline is equal to (None, None) all the time + interval is used. + The baseline (a, b) includes both endpoints, i.e. all + timepoints t such that a <= t <= b. + reject : dict | None + Rejection parameters based on peak-to-peak amplitude. + Valid keys are 'grad' | 'mag' | 'eeg' | 'eog' | 'ecg'. + If reject is None then no rejection is done. Example:: + + reject = dict(grad=4000e-13, # T / m (gradiometers) + mag=4e-12, # T (magnetometers) + eeg=40e-6, # V (EEG channels) + eog=250e-6 # V (EOG channels) + ) + flat : dict | None + Rejection parameters based on flatness of signal. + Valid keys are 'grad' | 'mag' | 'eeg' | 'eog' | 'ecg', and values + are floats that set the minimum acceptable peak-to-peak amplitude. + If flat is None then no rejection is done. + reject_tmin : scalar | None + Start of the time window used to reject epochs (with the default None, + the window will start with tmin). + reject_tmax : scalar | None + End of the time window used to reject epochs (with the default None, + the window will end with tmax). + eog : list | tuple | 'auto' + Names or indices of channels that should be designated EOG channels. + If 'auto', the channel names containing ``EOG`` or ``EYE`` are used. + Defaults to empty tuple. + %(uint16_codec)s + %(montage_units)s + %(verbose)s + + See Also + -------- + mne.Epochs : Documentation of attributes and methods. + + Notes + ----- + .. versionadded:: 0.11.0 + """ + + @verbose + def __init__( + self, + input_fname, + events=None, + event_id=None, + tmin=0, + baseline=None, + reject=None, + flat=None, + reject_tmin=None, + reject_tmax=None, + eog=(), + misc=(), + ecg=(), + emg=(), + data_format="auto", + date_format="mm/dd/yy", + uint16_codec=None, + montage_units="auto", + *, + header="auto", + verbose=None, + ): + if isinstance(events, (str, PathLike, Path)): + events = read_events(events) + _check_option("date_format", date_format, ["mm/dd/yy", "dd/mm/yy"]) + if date_format == "dd/mm/yy": + _date_format = "%d/%m/%y %H:%M:%S" + else: + _date_format = "%m/%d/%y %H:%M:%S" + + input_fname = str( + _check_fname(fname=input_fname, must_exist=True, overwrite="read") + ) + logger.info("Extracting .eeg Parameters from %s..." % input_fname) + self.info, cnt_info = _get_cnt_info( + input_fname, eog, ecg, emg, misc, + data_format, _date_format, header, mode='epoch' + ) + + cnt_info.update(input_fname=input_fname) + self._raw_extras = [cnt_info] + self._filenames = [] + epoch_headers, data = self._read_cnt_epochs_data() + print(epoch_headers) + events = [event[1] for event in epoch_headers] + print(events) + if event_id is None: # convert to int to make typing-checks happy + event_id = {str(e): int(e) for e in np.unique(events)} + print(event_id) + if not ( + (events is None and event_id is None) + or (events is not None and event_id is not None) + ): + raise ValueError( + "Both `events` and `event_id` must be " + "None or not None") + + + # if eeg.trials <= 1: + # raise ValueError( + # "The file does not seem to contain epochs " + # "(trials less than 2). " + # "You should try using read_raw_cnt function." + # ) + + assert data.shape == ( + self._raw_extras[0]["n_epochs"], + self.info["nchan"], + self._raw_extras[0]["n_pnts"], + ) + tmax = ((data.shape[2] - 1) / self.info["sfreq"]) + tmin + + super().__init__( + self.info, + data, + events, + event_id, + tmin, + tmax, + baseline, + reject=reject, + flat=flat, + reject_tmin=reject_tmin, + reject_tmax=reject_tmax, + filename=input_fname, + verbose=verbose, + ) + + # data are preloaded but _bad_dropped is not set so we do it here: + self._bad_dropped = True + + _set_dig_montage_in_init(self, eeg_montage) + + logger.info("Ready.") + + def _read_cnt_epochs_data(self): + + """Read epochs data from .eeg file.""" + + cnt_info = self._raw_extras[0] + info = self.info + + input_fname = cnt_info["input_fname"] + + n_epochs = cnt_info['n_epochs'] + # number of points per epoch + n_pnts = cnt_info['n_pnts'] + n_channels = cnt_info["orig_nchan"] + cals = [d['cal'] for d in info["chs"]] + + data = np.empty((n_epochs, n_channels, n_pnts), dtype=float) + epoch_headers = [] + SWEEP_HEAD_SIZE = 13 + DATA_POINT_SIZE = 4 + + with open(input_fname, 'rb') as f: + # Ensure the file pointer is at the beginning of the EEG data + data_start = 900 + n_channels * 75 + data_end = data_start + n_epochs * (SWEEP_HEAD_SIZE + n_pnts * n_channels * DATA_POINT_SIZE) + data_step = SWEEP_HEAD_SIZE + n_pnts * n_channels * DATA_POINT_SIZE + + for epoch, i in enumerate(range(data_start, data_end, data_step)): + epoch_header = np.fromfile( + f, dtype=np.dtype('