From f4882fb5ed95ac880625b1a535c3f616c043a2af Mon Sep 17 00:00:00 2001 From: Scott W Harden Date: Tue, 15 Oct 2024 11:53:15 -0400 Subject: [PATCH 1/2] ABF.getOnlySweep() Add method to read a sweep without loading the entire file into memory. Resolves #138 --- dev/python/2024-10-15 single sweep.py | 29 ++++++++++++++++++ src/pyabf/abf.py | 42 +++++++++++++++++++++++++++ tests/test_api.py | 9 ++++++ 3 files changed, 80 insertions(+) create mode 100644 dev/python/2024-10-15 single sweep.py diff --git a/dev/python/2024-10-15 single sweep.py b/dev/python/2024-10-15 single sweep.py new file mode 100644 index 0000000..98e1cc7 --- /dev/null +++ b/dev/python/2024-10-15 single sweep.py @@ -0,0 +1,29 @@ +import sys +import pathlib +import matplotlib.pyplot as plt +import numpy as np + +try: + PATH_HERE = pathlib.Path(__file__).parent + PATH_ABFS = PATH_HERE.joinpath("../../data/abfs/").resolve() + PATH_SRC = PATH_HERE.joinpath("../../src/").resolve() + print(PATH_SRC) + sys.path.insert(0, str(PATH_SRC)) + import pyabf +except: + raise EnvironmentError() +1 + +if __name__ == "__main__": + abfPath = pathlib.Path(PATH_ABFS).joinpath("14o08011_ic_pair.abf") + + abf = pyabf.ABF(abfPath, loadData=False) + channelA = abf.getOnlySweep(sweepIndex=0, channelIndex=0) + channelB = abf.getOnlySweep(sweepIndex=0, channelIndex=1) + + print(np.mean(channelA)) + print(np.mean(channelB)) + + plt.plot(channelA) + plt.plot(channelB) + plt.show() diff --git a/src/pyabf/abf.py b/src/pyabf/abf.py index e0da035..e72af7a 100644 --- a/src/pyabf/abf.py +++ b/src/pyabf/abf.py @@ -688,6 +688,48 @@ def setSweep(self, epochTable = None self.sweepEpochs = None + def getOnlySweep(self, sweepIndex: int, channelIndex: int = 0, startTime: float = None, endTime: float = None): + """ + Get values for a sweep by reading directly from the ABF file instead of loading all sweeps into memory. + This method is useful for ABF files which are too large to be loaded into memory. + + ### Parameters + * `sweepIndex` - The sweep number (starting at zero). Note that all channels for this sweep will be returned. + * `startTime` - Data returned will begin at this time within the sweep (in seconds) + * `endTime` - Data returned will end at this time within the sweep (in seconds) + """ + + startTime = startTime if startTime else 0 + startTime = max(0, startTime) + + endTime = endTime if endTime else self.sweepLengthSec + endTime = min(endTime, self.sweepLengthSec) + + bytesPerSample = int(self.dataPointByteSize) + bytesPerSecond = int(self.dataPointByteSize * self.sampleRate) + samplesPerSweep = int(self.dataPointCount / self.sweepCount) + bytesPerSweep = samplesPerSweep * bytesPerSample + sweepFirstByte = self.dataByteStart + bytesPerSweep * sweepIndex + startTime = startTime if startTime else 0 + sweepFirstByte += int(startTime * bytesPerSecond) + endTime = endTime if endTime else self.sweepLengthSec + samplesPerSweep = int((endTime - startTime) * self.sampleRate) + samplesTotal = self.channelCount*samplesPerSweep + + with open(self.abfFilePath, 'rb') as fb: + fb.seek(sweepFirstByte) + raw = np.fromfile(fb, dtype=self._dtype, count=samplesTotal) + nRows = self.channelCount + nCols = samplesPerSweep + raw = np.reshape(raw, (nCols, nRows)) + raw = np.transpose(raw) + data = raw[channelIndex] + data = data.astype(np.float32) + if self._dtype == np.int16: + data *= self._dataGain[channelIndex] + data += self._dataOffset[channelIndex] + return data + def _getAdcNameAndUnits(self, adcIndex: int) -> Tuple[str, str]: if (adcIndex < len(self.adcNames)): return [self.adcNames[adcIndex], self.adcUnits[adcIndex]] diff --git a/tests/test_api.py b/tests/test_api.py index 15d8572..50025a0 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -184,3 +184,12 @@ def test_headerText(abfPath): html = abf.headerHTML assert isinstance(html, str) assert len(html) + + +def test_readOneSweep(): + abfPath = "data/abfs/14o08011_ic_pair.abf" + abf = pyabf.ABF(abfPath, loadData=False) + channelA = abf.getOnlySweep(sweepIndex=0, channelIndex=0) + channelB = abf.getOnlySweep(sweepIndex=0, channelIndex=1) + assert np.mean(channelA) == -58.870506 + assert np.mean(channelB) == -52.948666 From 25b8659aa78ec0980b463db5aaba5a9952f9aa47 Mon Sep 17 00:00:00 2001 From: Scott W Harden Date: Tue, 15 Oct 2024 13:09:06 -0400 Subject: [PATCH 2/2] test: use approximate mean value test --- tests/test_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 50025a0..3611edb 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -191,5 +191,5 @@ def test_readOneSweep(): abf = pyabf.ABF(abfPath, loadData=False) channelA = abf.getOnlySweep(sweepIndex=0, channelIndex=0) channelB = abf.getOnlySweep(sweepIndex=0, channelIndex=1) - assert np.mean(channelA) == -58.870506 - assert np.mean(channelB) == -52.948666 + assert np.mean(channelA) == pytest.approx(-58.870506, 5) + assert np.mean(channelB) == pytest.approx(-52.948666, 5)