diff --git a/galvani/BioLogic.py b/galvani/BioLogic.py index 65d580c..ffb2340 100644 --- a/galvani/BioLogic.py +++ b/galvani/BioLogic.py @@ -9,7 +9,7 @@ import re import csv -from os import SEEK_SET +from os import SEEK_SET, path import time from datetime import date, datetime, timedelta from collections import defaultdict, OrderedDict @@ -65,6 +65,9 @@ def fieldname_to_dtype(fieldname): "Temperature/°C", "Efficiency/%", "Capacity/mA.h", + "Unk1", + "rotation rate/rpm", + "pad number", ): return (fieldname, np.float_) elif fieldname in ("cycle number", "I Range", "Ns", "half cycle", "z cycle"): @@ -73,7 +76,7 @@ def fieldname_to_dtype(fieldname): return ("dQ/mA.h", np.float_) elif fieldname in ("I/mA", "/mA"): return ("I/mA", np.float_) - elif fieldname in ("Ewe/V", "/V", "Ecell/V", ""): + elif fieldname in ("Ewe/V", "/V", "Ecell/V", "", ""): return ("Ewe/V", np.float_) elif fieldname.endswith( ( @@ -86,6 +89,7 @@ def fieldname_to_dtype(fieldname): "/mW.h", "/A", "/mA", + "/µA", "/A.h", "/mA.h", "/V", @@ -94,6 +98,7 @@ def fieldname_to_dtype(fieldname): "/mF", "/uF", "/µF", + "/µF-2", "/nF", "/C", "/Ohm", @@ -101,9 +106,12 @@ def fieldname_to_dtype(fieldname): "/Ohm.cm", "/mS/cm", "/%", + "/mol.L-1", ) ): return (fieldname, np.float_) + elif fieldname.startswith("Custom"): + return (fieldname, np.float_) else: raise ValueError("Invalid column header: %s" % fieldname) @@ -265,20 +273,102 @@ def MPTfileCSV(file_or_path): ] ) +# Column IDs specific to the "stack mode" used for BMS testing (specific to VMP3 ?) +stack_mode_colID_dtype_map = { + 103: ("Estack/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", " ?? + 8: ("I/mA", "/mA", "/mA", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", "/V", " 1: @@ -467,10 +581,15 @@ def VMPdata_dtype_from_colIDs(colIDs): unique_field_name = field_name type_list.append((unique_field_name, field_type)) else: - raise NotImplementedError( - "Column ID {cid} after column {prev} " - "is unknown".format(cid=colID, prev=type_list[-1][0]) - ) + if type_list: + raise NotImplementedError( + "Column ID {cid} after column {prev} " + "is unknown".format(cid=colID, prev=type_list[-1][0]) + ) + else: + raise NotImplementedError( + "First column ID {cid} is unknown".format(cid=colID) + ) return np.dtype(type_list), flags_dict @@ -485,14 +604,16 @@ def read_VMP_modules(fileobj, read_module_data=True): if len(module_magic) == 0: # end of file break elif module_magic != b"MODULE": - raise ValueError( - "Found %r, expecting start of new VMP MODULE" % module_magic - ) + if isinstance(file_or_path, str): + fileobj.close() + raise ValueError("Found %r, expecting start of new VMP MODULE" % module_magic) VMPmodule_hdr = VMPmodule_hdr_v1 # Reading headers binary information hdr_bytes = fileobj.read(VMPmodule_hdr.itemsize) if len(hdr_bytes) < VMPmodule_hdr.itemsize: + if isinstance(file_or_path, str): + fileobj.close() raise IOError("Unexpected end of file while reading module header") # Checking if EC-Lab version is >= 11.50 @@ -506,6 +627,8 @@ def read_VMP_modules(fileobj, read_module_data=True): if read_module_data: hdr_dict["data"] = fileobj.read(hdr_dict["length"]) if len(hdr_dict["data"]) != hdr_dict["length"]: + if isinstance(file_or_path, str): + fileobj.close() raise IOError( """Unexpected end of file while reading data current module: %s @@ -522,7 +645,83 @@ def read_VMP_modules(fileobj, read_module_data=True): yield hdr_dict fileobj.seek(hdr_dict["offset"] + hdr_dict["length"], SEEK_SET) +def loop_from_file(file: str, encoding: str ="latin1"): + """ + When an experiment is still running and it includes loops, + a _LOOP.txt file is temporarily created to progressively store the indexes of new loops. + This function reads the file and creates the loop_index array for MPRfile initialization + + Parameters + ---------- + file : str + Path of the loop file. + encoding : str, optional + Encoding of the text file. The default is "latin1". + + Raises + ------ + ValueError + If the file does not start with "VMP EXPERIMENT LOOP INDEXES". + + Returns + ------- + loop_index : np.array + Indexes of data points that start a new loop. + + """ + with open(file, "r", encoding=encoding) as f: + line = f.readline().strip() + if line != LOOP_MAGIC: + raise ValueError("Invalid magic for LOOP.txt file") + loop_index = np.array([int(line) for line in f], dtype="u4") + + return loop_index + + +def timestamp_from_file(file: str, encoding: str ="latin1"): + """ + When an experiment is still running, a .mpl file is temporarily created to store information + that will be added in the log module and will be appended to the data module in the .mpr file at the end of experiment + This function reads the file and extracts the experimental starting date and time as a timestamp for MPRfile initialization + + Parameters + ---------- + file : str + Path of the log file. + encoding : str, optional + Encoding of the text file. The default is "latin1". + + Raises + ------ + ValueError + If the file does not start with "EC-Lab LOG FILE" or "BT-Lab LOG FILE". + + Returns + ------- + timestamp + Date and time of the start of data acquisition + """ + with open(file, "r", encoding=encoding) as f: + line = f.readline().strip() + if line not in LOG_MAGIC: + raise ValueError("Invalid magic for .mpl file") + log = f.read() + start = tuple( + map( + int, + re.findall( + r"Acquisition started on : (\d+)\/(\d+)\/(\d+) (\d+):(\d+):(\d+)\.(\d+)", + "".join(log), + )[0], + ) + ) + return datetime( + int(start[2]), start[0], start[1], start[3], start[4], start[5], start[6] * 1000 + ) + +LOG_MAGIC = "EC-Lab LOG FILEBT-Lab LOG FILE" +LOOP_MAGIC = "VMP EXPERIMENT LOOP INDEXES" MPR_MAGIC = b"BIO-LOGIC MODULAR FILE\x1a".ljust(48) + b"\x00\x00\x00\x00" @@ -544,18 +743,27 @@ class MPRfile: def __init__(self, file_or_path): self.loop_index = None + self.startdate = None + self.enddate = None + self.timestamp = None if isinstance(file_or_path, str): mpr_file = open(file_or_path, "rb") + loop_file = file_or_path[:-4] + "_LOOP.txt" # loop file for running experiment + log_file = file_or_path[:-1] + "l" # log file for runnning experiment else: mpr_file = file_or_path magic = mpr_file.read(len(MPR_MAGIC)) if magic != MPR_MAGIC: + if isinstance(file_or_path, str): + mpr_file.close() raise ValueError("Invalid magic for .mpr file: %s" % magic) modules = list(read_VMP_modules(mpr_file)) + if isinstance(file_or_path, str): + mpr_file.close() self.modules = modules - (settings_mod,) = (m for m in modules if m["shortname"] == b"VMP Set ") + maybe_set_module = [m for m in modules if m["shortname"] == b"VMP Set "] (data_module,) = (m for m in modules if m["shortname"] == b"VMP data ") maybe_loop_module = [m for m in modules if m["shortname"] == b"VMP loop "] maybe_log_module = [m for m in modules if m["shortname"] == b"VMP LOG "] @@ -567,7 +775,7 @@ def __init__(self, file_or_path): # If EC-Lab version >= 11.50, column_types is [0 1 0 3 0 174...] instead of [1 3 174...] if np.frombuffer(data_module["data"][5:6], dtype="u1").item(): column_types = np.frombuffer(data_module["data"][5:], dtype="u1", count=n_columns) - remaining_headers = data_module["data"][5 + n_columns:100] + remaining_headers = data_module["data"][5 + n_columns : 100] main_data = data_module["data"][100:] else: column_types = np.frombuffer( @@ -576,7 +784,7 @@ def __init__(self, file_or_path): column_types = column_types[1::2] # suppressing zeros in column types array # remaining headers should be empty except for bytes 5 + n_columns * 2 # and 1006 which are sometimes == 1 - remaining_headers = data_module["data"][6 + n_columns * 2:1006] + remaining_headers = data_module["data"][6 + n_columns * 2 : 1006] main_data = data_module["data"][1007:] elif data_module["version"] in [2, 3]: column_types = np.frombuffer(data_module["data"][5:], dtype=" 40000 and ole_timestamp1 < 50000: - ole_timestamp = ole_timestamp1 - elif ole_timestamp2 > 40000 and ole_timestamp2 < 50000: - ole_timestamp = ole_timestamp2 - elif ole_timestamp3 > 40000 and ole_timestamp3 < 50000: - ole_timestamp = ole_timestamp3 - elif ole_timestamp4 > 40000 and ole_timestamp4 < 50000: - ole_timestamp = ole_timestamp4 - - else: + for i in range(8): + log_buff = np.frombuffer(log_module["data"][400 + i :], dtype=" 33315) & (log_buff < 50000)] + if ole_timestamp.size > 0: + break + if ole_timestamp.size == 0: raise ValueError("Could not find timestamp in the LOG module") ole_base = datetime(1899, 12, 30, tzinfo=None) ole_timedelta = timedelta(days=ole_timestamp[0]) self.timestamp = ole_base + ole_timedelta - if self.startdate != self.timestamp.date(): + if self.startdate is None: + self.startdate = self.timestamp.date() + elif self.startdate != self.timestamp.date(): raise ValueError( "Date mismatch:\n" + " Start date: %s\n" % self.startdate + " End date: %s\n" % self.enddate + " Timestamp: %s\n" % self.timestamp ) + else: + if path.isfile(log_file): + self.timestamp = timestamp_from_file(log_file) def get_flag(self, flagname): if flagname in self.flags_dict: diff --git a/galvani/res2sqlite.py b/galvani/res2sqlite.py old mode 100755 new mode 100644 diff --git a/tests/test_BioLogic.py b/tests/test_BioLogic.py index 0d8d111..90b8bf6 100644 --- a/tests/test_BioLogic.py +++ b/tests/test_BioLogic.py @@ -15,6 +15,7 @@ from galvani import BioLogic, MPTfile, MPRfile from galvani.BioLogic import MPTfileCSV # not exported +import warnings def test_open_MPT(testdata_dir): mpt1, comments = MPTfile(os.path.join(testdata_dir, "bio_logic1.mpt")) @@ -147,7 +148,7 @@ def test_MPR_dates(testdata_dir, filename, startdate, enddate): if enddate: assert mpr.enddate.strftime("%Y-%m-%d") == enddate else: - assert not hasattr(mpr, "enddate") + assert mpr.enddate is None def test_open_MPR_fails_for_bad_file(testdata_dir): @@ -245,6 +246,15 @@ def assert_field_matches(fieldname): "|Ewe h5|/V", "|Ewe h6|/V", "|Ewe h7|/V", + "THD Ece/%", + "NSD Ece/%", + "NSR Ece/%", + "|Ece h2|/V", + "|Ece h3|/V", + "|Ece h4|/V", + "|Ece h5|/V", + "|Ece h6|/V", + "|Ece h7|/V", "THD I/%", "NSD I/%", "NSR I/%", @@ -257,11 +267,20 @@ def assert_field_matches(fieldname): ] if fieldname in EIS_quality_indicators: # EIS quality indicators only valid for f < 100kHz - index_inf_100k = np.where(mpr.data["freq/Hz"] < 100000)[0] - assert_allclose( - mpr.data[index_inf_100k][fieldname], - mpt[index_inf_100k][fieldname].astype(mpr.data[fieldname].dtype), - ) + if len(mpr.data) == 1: + if mpt["THD Ewe/%"] != -1: + assert_allclose( + mpr.data[fieldname], + mpt[fieldname].astype(mpr.data[fieldname].dtype), + ) + else: + indexes = np.where(mpt["THD Ewe/%"] != -1)[0] + + assert_allclose( + mpr.data[indexes][fieldname], + mpt[indexes][fieldname].astype(mpr.data[fieldname].dtype), + ) + elif fieldname == "/V": assert_allclose( mpr.data[fieldname], @@ -358,3 +377,25 @@ def test_MPR_matches_MPT_v1150(testdata_dir, basename_v1150): mpr = MPRfile(binpath) mpt, comments = MPTfile(txtpath, encoding="latin1") assert_MPR_matches_MPT_v2(mpr, mpt, comments) + + +def test_loop_from_file(testdata_dir): + """Check if the loop_index is correctly extracted from the _LOOP.txt file + """ + mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr")) + if mpr.loop_index is None: + raise AssertionError("No loop_index found") + elif not len(mpr.loop_index) == 4: + raise AssertionError("loop_index is not the right size") + elif not (mpr.loop_index == [0, 4, 8, 11]).all(): + raise AssertionError("loop_index values are wrong") + + +def test_timestamp_from_file(testdata_dir): + """Check if the loop_index is correctly extracted from the _LOOP.txt file + """ + mpr = MPRfile(os.path.join(testdata_dir, "running", "running_OCV.mpr")) + if not hasattr(mpr, "timestamp"): + raise AssertionError("No timestamp found") + elif not mpr.timestamp.timestamp() == 1707299985.908: + raise AssertionError("timestamp value is wrong")