Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

Support for persistent numpy arrays #20

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion nvm/libex.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@
PObject ob_base;
PObjPtr ob_dict;
} PObjectObject;

typedef struct {
PVarObject ob_base;
PObjPtr dtypestr;
PObjPtr data;
} PNPArrayObject;
"""

ffi.set_source("_pmem",
Expand Down
4 changes: 4 additions & 0 deletions nvm/pmemobj/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
from .object import PersistentObject
from .tuple import PersistentTuple
from .set import PersistentSet, PersistentFrozenSet
try:
from .nparray import PersistentNPArray
except ImportError:
pass
196 changes: 196 additions & 0 deletions nvm/pmemobj/nparray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import numpy as np
import sys
from _pmem import ffi

NP_POBJPTR_ARRAY_TYPE_NUM = 70
NP_POBJPTR_STR_TYPE_NUM = 71


class PersistentNPArray(object):
def _validate_keywords(self, **kw):
mandatory_fields = ["dtype"]
for field in mandatory_fields:
if field not in kw:
raise TypeError("Missing '%s' parameter" % (field))
if type(kw["dtype"]) is not np.dtype:
raise TypeError("'dtype' argument must be of type np.dtype")
if not kw["dtype"].isbuiltin:
raise TypeError("'dtype' argument must represent a builtin type")
if "shape" in kw:
shape = kw["shape"]
if type(shape) is int:
if shape <= 0:
raise ValueError(
"'shape' should be a strictly positive value")
kw["shape"] = (shape, )
elif type(shape) is tuple:
if len(shape) != 1 or\
type(shape[0]) is not int or shape[0] <= 0:
raise ValueError(
"'shape' should contain a single,"
" strictly positive value")
else:
raise TypeError("'shape' is an unsupported"
"data type: %s" % type(shape))

def __init__(self, *args, **kw):
self._validate_keywords(**kw)

data_size = 0
data_init = None

if len(args) and hasattr(args[0], "__len__") and len(args[0]):
data_size = len(args[0])
data_init = args[0]

if "shape" in kw:
data_size = max(data_size, kw["shape"][0])
data_shape = kw["shape"]
else:
data_shape = (data_size,)

data_type = kw["dtype"]
data_size *= data_type.itemsize
data_type_name = str(data_type)

if sys.version_info[0] > 2:
data_type_name = data_type_name.encode('utf-8')

mm = self._p_mm
with mm.transaction():
self._body.data = mm.zalloc(data_size,
type_num=NP_POBJPTR_ARRAY_TYPE_NUM)
data_buffer = ffi.buffer(mm.direct(self._body.data), data_size)
self._body.dtypestr = mm.zalloc(
len(data_type_name) + 1,
type_num=NP_POBJPTR_STR_TYPE_NUM)
str_buff = ffi.cast('char *', mm.direct(self._body.dtypestr))
ffi.buffer(str_buff, len(data_type_name))[:] = data_type_name
self.array = np.ndarray(shape=data_shape,
dtype=data_type,
buffer=data_buffer)
ob = ffi.cast('PVarObject *', mm.direct(self._p_oid))
ob.ob_size = data_shape[0]

if data_init is not None:
self.array[:data_size] = data_init

def _p_new(self, manager):
mm = self._p_mm = manager
with mm.transaction():
self._p_oid = mm.zalloc(ffi.sizeof('PNPArrayObject'))
ob = ffi.cast('PObject *', mm.direct(self._p_oid))
ob.ob_type = mm._get_type_code(self.__class__)
ob = ffi.cast('PVarObject *', mm.direct(self._p_oid))
ob.ob_size = 0
self._body = ffi.cast('PNPArrayObject *', mm.direct(self._p_oid))
self._body.dtypestr = mm.OID_NULL
self._body.data = mm.OID_NULL
self.array = None

def _p_resurrect(self, manager, oid):
mm = self._p_mm = manager
self._p_oid = oid
ob = ffi.cast('PVarObject *', mm.direct(self._p_oid))
self._body = ffi.cast('PNPArrayObject *', mm.direct(oid))
if mm.otuple(self._body.dtypestr) != mm.OID_NULL and\
mm.otuple(self._body.data) != mm.OID_NULL:
data_type_name = ffi.string(ffi.cast('char *',
mm.direct(self._body.dtypestr)))
data_type = np.dtype(data_type_name)
data_buffer = ffi.buffer(mm.direct(self._body.data),
data_type.itemsize * ob.ob_size)
data_shape = (ob.ob_size, )
self.array = np.ndarray(shape=data_shape,
dtype=data_type,
buffer=data_buffer)

def _p_substructures(self):
return ((self._body.data, NP_POBJPTR_ARRAY_TYPE_NUM),
(self._body.dtypestr, NP_POBJPTR_STR_TYPE_NUM))

def _p_traverse(self):
return []

def _p_deallocate(self):
mm = self._p_mm
if mm.otuple(self._body.dtypestr) != mm.OID_NULL:
mm.free(self._body.dtypestr)
if mm.otuple(self._body.data) != mm.OID_NULL:
mm.free(self._body.data)

def _normalize_index(self, index):
nparr_len = self.array.shape[0]
if isinstance(index, slice):
start = index.start
if start is None:
start = 0
elif start < 0:
new_start = start + nparr_len
if new_start < 0:
raise IndexError(
"index %d is out of bounds for axis 0 with size %d" %
(start, nparr_len))
start = new_start
else:
if start >= nparr_len:
raise IndexError(
"index %d is out of bounds for axis 0 with size %d" %
(start, nparr_len))
stop = index.stop
if stop is None:
stop = nparr_len
elif stop < 0:
new_stop = stop + nparr_len
if new_stop < 0:
raise IndexError(
"index %d is out of bounds for axis 0 with size %d" %
(stop, nparr_len))
stop = new_stop
else:
if stop > nparr_len:
raise IndexError(
"index %d is out of bounds for axis 0 with size %d" %
(stop, nparr_len))
if index.step is None or index.step > 0:
return (start, stop)
return (stop, start)
else:
try:
index = int(index)
except ValueError:
raise NotImplementedError("Not a recognized index type")
if index < 0:
index = nparr_len + index
if index < 0 or index >= nparr_len:
raise IndexError(
"index %d is out of bounds for axis 0 with size %d" %
(index, nparr_len))
return (index, index + 1)

def _snapshot_slice(self, index):
mm = self._p_mm
snapshot_bounds = self._normalize_index(index)
bounds_size = snapshot_bounds[1] - snapshot_bounds[0]
if bounds_size > 0:
data_item_size = self.array.dtype.itemsize
mm.snapshot_range(mm.direct(self._body.data),
data_item_size * bounds_size)

def snapshot_range(self, start_index=0, end_index=None):
_slice = slice(start_index, end_index)
self._snapshot_slice(_slice)

def snapshot_index(self, index):
self._snapshot_slice(index)

def __setitem__(self, index, value):
with self._p_mm.transaction():
self._snapshot_slice(index)
self.array.__setitem__(index, value)

def __getitem__(self, index):
return self.array.__getitem__(index)

def __len__(self):
return self._size
2 changes: 1 addition & 1 deletion nvm/pmemobj/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ def gc(self, debug=None):
if oid == self.mm.OID_NULL:
continue
if oid not in substructures[type_num]:
log.error("%s points to subsctructure type %s"
log.error("%s points to substructure type %s"
" at %s, but we didn't find it in"
" the pmemobj object list.",
container_oid, type_num, oid)
Expand Down
117 changes: 117 additions & 0 deletions tests/test_pmemobj_pnparray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# -*- coding: utf8 -*-
import unittest
from nvm import pmemobj
from tests.support import TestCase

numpy_avail = True
try:
import numpy as np
except ImportError:
numpy_avail = False

if numpy_avail:
class TestPersistentNPArray(TestCase):

def _make_nparray(self, *arg, **kw):
self.fn = self._test_fn()
self.pop = pmemobj.create(
self.fn, pool_size=64 * 1024 * 1024, debug=True)
self.addCleanup(self.pop.close)
self.pop.root = self.pop.new(pmemobj.PersistentNPArray, *arg, **kw)
return self.pop.root

def _reread_nparray(self):
self.pop.close()
self.pop = pmemobj.open(self.fn)
return self.pop.root

def test_creation_args(self):
for param in [None, [1, 2, 3, 4, 5]]:
with self.assertRaises(TypeError):
self._make_nparray(param)
with self.assertRaises(TypeError):
self._make_nparray(param, dtype="int16", shape=1)
with self.assertRaises(ValueError):
self._make_nparray(param,
dtype=np.dtype(np.int16), shape=-1)
with self.assertRaises(TypeError):
self._make_nparray(param,
dtype=np.dtype(np.int16), shape=True)
with self.assertRaises(ValueError):
self._make_nparray(param,
dtype=np.dtype(np.int16), shape=(1, 2))
with self.assertRaises(ValueError):
self._make_nparray(param,
dtype=np.dtype(np.int16), shape=())

def test_eq(self):
array_a = self._make_nparray(range(1024 * 1024),
dtype=np.dtype(np.int64))
array_b = np.array(range(1024 * 1024))
self.assertEqual(array_a.array.all(), array_b.all())

def test_persist(self):
array_a = self._make_nparray(range(1024 * 1024),
dtype=np.dtype(np.int64))
array_b = np.array(range(1024 * 1024))
array_a = self._reread_nparray()
self.assertEqual(str(array_a.array.dtype), "int64")
self.assertEqual(array_a.array.shape, array_b.shape)
self.assertEqual(array_a.array.all(), array_b.all())

def test_index_and_slices(self):
array_a = self._make_nparray(range(1000),
dtype=np.dtype(np.int64))
array_b = np.array(range(1000))
for count in range(2):
self.assertEqual(array_a[0], array_b[0])
for idx in range(0, 1000):
self.assertEqual(array_a[idx], array_b[idx])
self.assertEqual(array_a[-idx], array_b[-idx])
for idx in range(1, 1000, 10):
self.assertEqual(array_a[idx:idx + 10].all(),
array_b[idx:idx + 10].all())
self.assertEqual(array_a[:idx].all(),
array_b[:idx].all())
self.assertEqual(array_a[:-idx].all(),
array_b[:-idx].all())
self.assertEqual(array_a[idx:].all(),
array_b[idx:].all())
self.assertEqual(array_a[-idx:].all(),
array_b[-idx:].all())
array_a = self._reread_nparray()

def test_assignments(self):
array_a = self._make_nparray(None, shape=(1000, ),
dtype=np.dtype(np.int64))
array_b = np.array([0] * 1000)
for idx in range(0, 1000):
array_a[idx] = idx
array_b[idx] = idx
self.assertEqual(array_a[:].all(), array_b[:].all())

def test_sliced_assignments(self):
array_a = self._make_nparray(None, shape=(1000, ),
dtype=np.dtype(np.int64))
array_b = np.array([0] * 1000)
for idx in range(0, 1000, 100):
array_a[idx:idx + 100] = 100
array_b[idx:idx + 100] = 100
self.assertEqual(array_a[:].all(), array_b[:].all())

def test_transaction_sorting(self):
array_a = self._make_nparray(range(1000, 0, -1),
dtype=np.dtype(np.int64))
array_b = np.array(range(1000, 0, -1))
with self.pop.transaction():
array_a.snapshot_range()
array_a.array.sort()
array_b.sort()
self.assertEqual(array_a[:].all(), array_b[:].all())
else:
class TestPersistentNPArray(TestCase):
pass


if __name__ == '__main__':
unittest.main()