From e5ba514d8ce5f635ccbd0a64a3bc93820e28f0f8 Mon Sep 17 00:00:00 2001 From: Catalin Gabriel Manciu Date: Fri, 22 Dec 2017 18:00:08 -0800 Subject: [PATCH 1/2] Add persistent numpy arrays support --- nvm/libex.py | 6 +- nvm/pmemobj/__init__.py | 4 + nvm/pmemobj/nparray.py | 196 ++++++++++++++++++++++++++++++++++++++++ nvm/pmemobj/pool.py | 2 +- 4 files changed, 206 insertions(+), 2 deletions(-) create mode 100644 nvm/pmemobj/nparray.py diff --git a/nvm/libex.py b/nvm/libex.py index 15eb7be..6ebf242 100644 --- a/nvm/libex.py +++ b/nvm/libex.py @@ -65,7 +65,11 @@ PObject ob_base; PObjPtr ob_dict; } PObjectObject; - + typedef struct { + PVarObject ob_base; + PObjPtr dtypestr; + PObjPtr data; + } PNPArrayObject; """ ffi.set_source("_pmem", diff --git a/nvm/pmemobj/__init__.py b/nvm/pmemobj/__init__.py index dbbe11a..a77cb90 100644 --- a/nvm/pmemobj/__init__.py +++ b/nvm/pmemobj/__init__.py @@ -4,3 +4,7 @@ from .object import PersistentObject from .tuple import PersistentTuple from .set import PersistentSet, PersistentFrozenSet +try: + from .nparray import PersistentNPArray +except ImportError: + pass \ No newline at end of file diff --git a/nvm/pmemobj/nparray.py b/nvm/pmemobj/nparray.py new file mode 100644 index 0000000..4457a6b --- /dev/null +++ b/nvm/pmemobj/nparray.py @@ -0,0 +1,196 @@ +import numpy as np +import sys +from _pmem import ffi + +NP_POBJPTR_ARRAY_TYPE_NUM = 70 +NP_POBJPTR_STR_TYPE_NUM = 71 + + +class PersistentNPArray(object): + def _validate_keywords(self, **kw): + mandatory_fields = ["dtype"] + for field in mandatory_fields: + if field not in kw: + raise TypeError("Missing '%s' parameter" % (field)) + if type(kw["dtype"]) is not np.dtype: + raise TypeError("'dtype' argument must be of type np.dtype") + if not kw["dtype"].isbuiltin: + raise TypeError("'dtype' argument must represent a builtin type") + if "shape" in kw: + shape = kw["shape"] + if type(shape) is int: + if shape <= 0: + raise ValueError( + "'shape' should be a strictly positive value") + kw["shape"] = (shape, ) + elif type(shape) is tuple: + if len(shape) != 1 or\ + type(shape[0]) is not int or shape[0] <= 0: + raise ValueError( + "'shape' should contain a single," + " strictly positive value") + else: + raise TypeError("'shape' is an unsupported" + "data type: %s" % type(shape)) + + def __init__(self, *args, **kw): + self._validate_keywords(**kw) + + data_size = 0 + data_init = None + + if len(args) and hasattr(args[0], "__len__") and len(args[0]): + data_size = len(args[0]) + data_init = args[0] + + if "shape" in kw: + data_size = max(data_size, kw["shape"][0]) + data_shape = kw["shape"] + else: + data_shape = (data_size,) + + data_type = kw["dtype"] + data_size *= data_type.itemsize + data_type_name = str(data_type) + + if sys.version_info[0] > 2: + data_type_name = data_type_name.encode('utf-8') + + mm = self._p_mm + with mm.transaction(): + self._body.data = mm.zalloc(data_size, + type_num=NP_POBJPTR_ARRAY_TYPE_NUM) + data_buffer = ffi.buffer(mm.direct(self._body.data), data_size) + self._body.dtypestr = mm.zalloc( + len(data_type_name) + 1, + type_num=NP_POBJPTR_STR_TYPE_NUM) + str_buff = ffi.cast('char *', mm.direct(self._body.dtypestr)) + ffi.buffer(str_buff, len(data_type_name))[:] = data_type_name + self.array = np.ndarray(shape=data_shape, + dtype=data_type, + buffer=data_buffer) + ob = ffi.cast('PVarObject *', mm.direct(self._p_oid)) + ob.ob_size = data_shape[0] + + if data_init is not None: + self.array[:data_size] = data_init + + def _p_new(self, manager): + mm = self._p_mm = manager + with mm.transaction(): + self._p_oid = mm.zalloc(ffi.sizeof('PNPArrayObject')) + ob = ffi.cast('PObject *', mm.direct(self._p_oid)) + ob.ob_type = mm._get_type_code(self.__class__) + ob = ffi.cast('PVarObject *', mm.direct(self._p_oid)) + ob.ob_size = 0 + self._body = ffi.cast('PNPArrayObject *', mm.direct(self._p_oid)) + self._body.dtypestr = mm.OID_NULL + self._body.data = mm.OID_NULL + self.array = None + + def _p_resurrect(self, manager, oid): + mm = self._p_mm = manager + self._p_oid = oid + ob = ffi.cast('PVarObject *', mm.direct(self._p_oid)) + self._body = ffi.cast('PNPArrayObject *', mm.direct(oid)) + if mm.otuple(self._body.dtypestr) != mm.OID_NULL and\ + mm.otuple(self._body.data) != mm.OID_NULL: + data_type_name = ffi.string(ffi.cast('char *', + mm.direct(self._body.dtypestr))) + data_type = np.dtype(data_type_name) + data_buffer = ffi.buffer(mm.direct(self._body.data), + data_type.itemsize * ob.ob_size) + data_shape = (ob.ob_size, ) + self.array = np.ndarray(shape=data_shape, + dtype=data_type, + buffer=data_buffer) + + def _p_substructures(self): + return ((self._body.data, NP_POBJPTR_ARRAY_TYPE_NUM), + (self._body.dtypestr, NP_POBJPTR_STR_TYPE_NUM)) + + def _p_traverse(self): + return [] + + def _p_deallocate(self): + mm = self._p_mm + if mm.otuple(self._body.dtypestr) != mm.OID_NULL: + mm.free(self._body.dtypestr) + if mm.otuple(self._body.data) != mm.OID_NULL: + mm.free(self._body.data) + + def _normalize_index(self, index): + nparr_len = self.array.shape[0] + if isinstance(index, slice): + start = index.start + if start is None: + start = 0 + elif start < 0: + new_start = start + nparr_len + if new_start < 0: + raise IndexError( + "index %d is out of bounds for axis 0 with size %d" % + (start, nparr_len)) + start = new_start + else: + if start >= nparr_len: + raise IndexError( + "index %d is out of bounds for axis 0 with size %d" % + (start, nparr_len)) + stop = index.stop + if stop is None: + stop = nparr_len + elif stop < 0: + new_stop = stop + nparr_len + if new_stop < 0: + raise IndexError( + "index %d is out of bounds for axis 0 with size %d" % + (stop, nparr_len)) + stop = new_stop + else: + if stop > nparr_len: + raise IndexError( + "index %d is out of bounds for axis 0 with size %d" % + (stop, nparr_len)) + if index.step is None or index.step > 0: + return (start, stop) + return (stop, start) + else: + try: + index = int(index) + except ValueError: + raise NotImplementedError("Not a recognized index type") + if index < 0: + index = nparr_len + index + if index < 0 or index >= nparr_len: + raise IndexError( + "index %d is out of bounds for axis 0 with size %d" % + (index, nparr_len)) + return (index, index + 1) + + def _snapshot_slice(self, index): + mm = self._p_mm + snapshot_bounds = self._normalize_index(index) + bounds_size = snapshot_bounds[1] - snapshot_bounds[0] + if bounds_size > 0: + data_item_size = self.array.dtype.itemsize + mm.snapshot_range(mm.direct(self._body.data), + data_item_size * bounds_size) + + def snapshot_range(self, start_index=0, end_index=None): + _slice = slice(start_index, end_index) + self._snapshot_slice(_slice) + + def snapshot_index(self, index): + self._snapshot_slice(index) + + def __setitem__(self, index, value): + with self._p_mm.transaction(): + self._snapshot_slice(index) + self.array.__setitem__(index, value) + + def __getitem__(self, index): + return self.array.__getitem__(index) + + def __len__(self): + return self._size diff --git a/nvm/pmemobj/pool.py b/nvm/pmemobj/pool.py index 9a8f49f..1d5a9cb 100644 --- a/nvm/pmemobj/pool.py +++ b/nvm/pmemobj/pool.py @@ -925,7 +925,7 @@ def gc(self, debug=None): if oid == self.mm.OID_NULL: continue if oid not in substructures[type_num]: - log.error("%s points to subsctructure type %s" + log.error("%s points to substructure type %s" " at %s, but we didn't find it in" " the pmemobj object list.", container_oid, type_num, oid) From 8063da22978c6123716e4787ded7e3cb08351a89 Mon Sep 17 00:00:00 2001 From: Catalin Gabriel Manciu Date: Fri, 22 Dec 2017 18:01:47 -0800 Subject: [PATCH 2/2] tests: Add PersistentNPArray tests --- tests/test_pmemobj_pnparray.py | 117 +++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/test_pmemobj_pnparray.py diff --git a/tests/test_pmemobj_pnparray.py b/tests/test_pmemobj_pnparray.py new file mode 100644 index 0000000..2c18846 --- /dev/null +++ b/tests/test_pmemobj_pnparray.py @@ -0,0 +1,117 @@ +# -*- coding: utf8 -*- +import unittest +from nvm import pmemobj +from tests.support import TestCase + +numpy_avail = True +try: + import numpy as np +except ImportError: + numpy_avail = False + +if numpy_avail: + class TestPersistentNPArray(TestCase): + + def _make_nparray(self, *arg, **kw): + self.fn = self._test_fn() + self.pop = pmemobj.create( + self.fn, pool_size=64 * 1024 * 1024, debug=True) + self.addCleanup(self.pop.close) + self.pop.root = self.pop.new(pmemobj.PersistentNPArray, *arg, **kw) + return self.pop.root + + def _reread_nparray(self): + self.pop.close() + self.pop = pmemobj.open(self.fn) + return self.pop.root + + def test_creation_args(self): + for param in [None, [1, 2, 3, 4, 5]]: + with self.assertRaises(TypeError): + self._make_nparray(param) + with self.assertRaises(TypeError): + self._make_nparray(param, dtype="int16", shape=1) + with self.assertRaises(ValueError): + self._make_nparray(param, + dtype=np.dtype(np.int16), shape=-1) + with self.assertRaises(TypeError): + self._make_nparray(param, + dtype=np.dtype(np.int16), shape=True) + with self.assertRaises(ValueError): + self._make_nparray(param, + dtype=np.dtype(np.int16), shape=(1, 2)) + with self.assertRaises(ValueError): + self._make_nparray(param, + dtype=np.dtype(np.int16), shape=()) + + def test_eq(self): + array_a = self._make_nparray(range(1024 * 1024), + dtype=np.dtype(np.int64)) + array_b = np.array(range(1024 * 1024)) + self.assertEqual(array_a.array.all(), array_b.all()) + + def test_persist(self): + array_a = self._make_nparray(range(1024 * 1024), + dtype=np.dtype(np.int64)) + array_b = np.array(range(1024 * 1024)) + array_a = self._reread_nparray() + self.assertEqual(str(array_a.array.dtype), "int64") + self.assertEqual(array_a.array.shape, array_b.shape) + self.assertEqual(array_a.array.all(), array_b.all()) + + def test_index_and_slices(self): + array_a = self._make_nparray(range(1000), + dtype=np.dtype(np.int64)) + array_b = np.array(range(1000)) + for count in range(2): + self.assertEqual(array_a[0], array_b[0]) + for idx in range(0, 1000): + self.assertEqual(array_a[idx], array_b[idx]) + self.assertEqual(array_a[-idx], array_b[-idx]) + for idx in range(1, 1000, 10): + self.assertEqual(array_a[idx:idx + 10].all(), + array_b[idx:idx + 10].all()) + self.assertEqual(array_a[:idx].all(), + array_b[:idx].all()) + self.assertEqual(array_a[:-idx].all(), + array_b[:-idx].all()) + self.assertEqual(array_a[idx:].all(), + array_b[idx:].all()) + self.assertEqual(array_a[-idx:].all(), + array_b[-idx:].all()) + array_a = self._reread_nparray() + + def test_assignments(self): + array_a = self._make_nparray(None, shape=(1000, ), + dtype=np.dtype(np.int64)) + array_b = np.array([0] * 1000) + for idx in range(0, 1000): + array_a[idx] = idx + array_b[idx] = idx + self.assertEqual(array_a[:].all(), array_b[:].all()) + + def test_sliced_assignments(self): + array_a = self._make_nparray(None, shape=(1000, ), + dtype=np.dtype(np.int64)) + array_b = np.array([0] * 1000) + for idx in range(0, 1000, 100): + array_a[idx:idx + 100] = 100 + array_b[idx:idx + 100] = 100 + self.assertEqual(array_a[:].all(), array_b[:].all()) + + def test_transaction_sorting(self): + array_a = self._make_nparray(range(1000, 0, -1), + dtype=np.dtype(np.int64)) + array_b = np.array(range(1000, 0, -1)) + with self.pop.transaction(): + array_a.snapshot_range() + array_a.array.sort() + array_b.sort() + self.assertEqual(array_a[:].all(), array_b[:].all()) +else: + class TestPersistentNPArray(TestCase): + pass + + +if __name__ == '__main__': + unittest.main()