Skip to content

Commit

Permalink
Merge pull request #142 from swharden/138
Browse files Browse the repository at this point in the history
ABF.getOnlySweep()
  • Loading branch information
swharden authored Oct 15, 2024
2 parents 9f48334 + 25b8659 commit 3ad9cd6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 0 deletions.
29 changes: 29 additions & 0 deletions dev/python/2024-10-15 single sweep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import sys
import pathlib
import matplotlib.pyplot as plt
import numpy as np

try:
PATH_HERE = pathlib.Path(__file__).parent
PATH_ABFS = PATH_HERE.joinpath("../../data/abfs/").resolve()
PATH_SRC = PATH_HERE.joinpath("../../src/").resolve()
print(PATH_SRC)
sys.path.insert(0, str(PATH_SRC))
import pyabf
except:
raise EnvironmentError()
1

if __name__ == "__main__":
abfPath = pathlib.Path(PATH_ABFS).joinpath("14o08011_ic_pair.abf")

abf = pyabf.ABF(abfPath, loadData=False)
channelA = abf.getOnlySweep(sweepIndex=0, channelIndex=0)
channelB = abf.getOnlySweep(sweepIndex=0, channelIndex=1)

print(np.mean(channelA))
print(np.mean(channelB))

plt.plot(channelA)
plt.plot(channelB)
plt.show()
42 changes: 42 additions & 0 deletions src/pyabf/abf.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,48 @@ def setSweep(self,
epochTable = None
self.sweepEpochs = None

def getOnlySweep(self, sweepIndex: int, channelIndex: int = 0, startTime: float = None, endTime: float = None):
"""
Get values for a sweep by reading directly from the ABF file instead of loading all sweeps into memory.
This method is useful for ABF files which are too large to be loaded into memory.
### Parameters
* `sweepIndex` - The sweep number (starting at zero). Note that all channels for this sweep will be returned.
* `startTime` - Data returned will begin at this time within the sweep (in seconds)
* `endTime` - Data returned will end at this time within the sweep (in seconds)
"""

startTime = startTime if startTime else 0
startTime = max(0, startTime)

endTime = endTime if endTime else self.sweepLengthSec
endTime = min(endTime, self.sweepLengthSec)

bytesPerSample = int(self.dataPointByteSize)
bytesPerSecond = int(self.dataPointByteSize * self.sampleRate)
samplesPerSweep = int(self.dataPointCount / self.sweepCount)
bytesPerSweep = samplesPerSweep * bytesPerSample
sweepFirstByte = self.dataByteStart + bytesPerSweep * sweepIndex
startTime = startTime if startTime else 0
sweepFirstByte += int(startTime * bytesPerSecond)
endTime = endTime if endTime else self.sweepLengthSec
samplesPerSweep = int((endTime - startTime) * self.sampleRate)
samplesTotal = self.channelCount*samplesPerSweep

with open(self.abfFilePath, 'rb') as fb:
fb.seek(sweepFirstByte)
raw = np.fromfile(fb, dtype=self._dtype, count=samplesTotal)
nRows = self.channelCount
nCols = samplesPerSweep
raw = np.reshape(raw, (nCols, nRows))
raw = np.transpose(raw)
data = raw[channelIndex]
data = data.astype(np.float32)
if self._dtype == np.int16:
data *= self._dataGain[channelIndex]
data += self._dataOffset[channelIndex]
return data

def _getAdcNameAndUnits(self, adcIndex: int) -> Tuple[str, str]:
if (adcIndex < len(self.adcNames)):
return [self.adcNames[adcIndex], self.adcUnits[adcIndex]]
Expand Down
9 changes: 9 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,12 @@ def test_headerText(abfPath):
html = abf.headerHTML
assert isinstance(html, str)
assert len(html)


def test_readOneSweep():
abfPath = "data/abfs/14o08011_ic_pair.abf"
abf = pyabf.ABF(abfPath, loadData=False)
channelA = abf.getOnlySweep(sweepIndex=0, channelIndex=0)
channelB = abf.getOnlySweep(sweepIndex=0, channelIndex=1)
assert np.mean(channelA) == pytest.approx(-58.870506, 5)
assert np.mean(channelB) == pytest.approx(-52.948666, 5)

0 comments on commit 3ad9cd6

Please sign in to comment.