From 720bf9c601a89efb70baacc51ccfe55bdf679327 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Mon, 16 Sep 2024 14:32:50 -0500 Subject: [PATCH 1/9] Add pickling support for arrays --- pyopencl/array.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++ test/test_array.py | 20 +++++++++++ 2 files changed, 108 insertions(+) diff --git a/pyopencl/array.py b/pyopencl/array.py index b6f52abe..7dbaa496 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -342,6 +342,37 @@ class _copy_queue: # noqa: N801 _NOT_PRESENT = object() +# {{{ pickling support + +import threading +from contextlib import contextmanager + + +_QUEUE_FOR_PICKLING_TLS = threading.local() + + +@contextmanager +def queue_for_pickling(queue): + r"""A context manager that, for the current thread, sets the command queue + to be used for pickling and unpickling :class:`Array`\ s to *queue*.""" + try: + existing_pickle_queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + existing_pickle_queue = None + + if existing_pickle_queue is not None: + raise RuntimeError("array_context_for_pickling should not be called " + "inside the context of its own invocation.") + + _QUEUE_FOR_PICKLING_TLS.queue = queue + try: + yield None + finally: + _QUEUE_FOR_PICKLING_TLS.queue = None + +# }}} + + class Array: """A :class:`numpy.ndarray` work-alike that stores its data and performs its computations on the compute device. :attr:`shape` and :attr:`dtype` work @@ -705,6 +736,63 @@ def __init__( "than expected, potentially leading to crashes.", InconsistentOpenCLQueueWarning, stacklevel=2) + # {{{ Pickling + + def __getstate__(self): + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError("CL Array instances can only be pickled while " + "queue_for_pickling is active.") + + d = {} + d["shape"] = self.shape + d["dtype"] = self.dtype + d["strides"] = self.strides + d["allocator"] = self.allocator + d["nbytes"] = self.nbytes + d["strides"] = self.strides + d["offset"] = self.offset + d["data"] = self.get(queue=queue) + d["_flags"] = self._flags + d["size"] = self.size + + return d + + def __setstate__(self, state): + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError("CL Array instances can only be pickled while " + "queue_for_pickling is active.") + + self.queue = queue + self.shape = state["shape"] + self.dtype = state["dtype"] + self.strides = state["strides"] + self.allocator = state["allocator"] + self.offset = state["offset"] + self._flags = state["_flags"] + self.size = state["size"] + self.nbytes = state["nbytes"] + self.events = [] + + if self.allocator is None: + context = queue.context + self.base_data = cl.Buffer(context, cl.mem_flags.READ_WRITE, self.nbytes) + else: + self.base_data = self.allocator(self.nbytes) + + self.set(state["data"], queue=queue) + + # }}} + @property def ndim(self): return len(self.shape) diff --git a/test/test_array.py b/test/test_array.py index 4b648878..84c1eb3a 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2393,6 +2393,26 @@ def test_xdg_cache_home(ctx_factory): # }}} +# {{{ test pickling + +def test_array_pickling(ctx_factory): + context = ctx_factory() + queue = cl.CommandQueue(context) + + a = np.array([1, 2, 3, 4, 5]).astype(np.float32) + a_gpu = cl_array.to_device(queue, a) + + import pickle + with pytest.raises(RuntimeError): + pickle.dumps(a_gpu) + + with cl.array.queue_for_pickling(queue): + a_gpu_pickled = pickle.loads(pickle.dumps(a_gpu)) + assert np.all(a_gpu_pickled.get() == a) + +# }}} + + if __name__ == "__main__": if len(sys.argv) > 1: exec(sys.argv[1]) From 7df74f57aa844c40a2ac86be889a8cfd583fc749 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Tue, 17 Sep 2024 22:39:35 -0500 Subject: [PATCH 2/9] Update error message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andreas Klöckner --- pyopencl/array.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index 7dbaa496..926854d4 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -361,7 +361,7 @@ def queue_for_pickling(queue): existing_pickle_queue = None if existing_pickle_queue is not None: - raise RuntimeError("array_context_for_pickling should not be called " + raise RuntimeError("queue_for_pickling should not be called " "inside the context of its own invocation.") _QUEUE_FOR_PICKLING_TLS.queue = queue From 3c777bab48b361224828b81a51a25610d1aa8ce7 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Wed, 18 Sep 2024 14:59:48 -0500 Subject: [PATCH 3/9] better support+test for Array subclasses --- pyopencl/array.py | 37 +++++++++++++------------------------ test/test_array.py | 21 ++++++++++++++++++++- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index 926854d4..f9f0aed5 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -71,7 +71,7 @@ class DoubleDowncastWarning(UserWarning): _DOUBLE_DOWNCAST_WARNING = ( - "The operation you requested would result in a double-precisision " + "The operation you requested would result in a double-precision " "quantity according to numpy semantics. Since your device does not " "support double precision, a single-precision quantity is being returned.") @@ -748,19 +748,14 @@ def __getstate__(self): raise RuntimeError("CL Array instances can only be pickled while " "queue_for_pickling is active.") - d = {} - d["shape"] = self.shape - d["dtype"] = self.dtype - d["strides"] = self.strides - d["allocator"] = self.allocator - d["nbytes"] = self.nbytes - d["strides"] = self.strides - d["offset"] = self.offset - d["data"] = self.get(queue=queue) - d["_flags"] = self._flags - d["size"] = self.size + state = self.__dict__.copy() + del state["context"] + del state["events"] + del state["queue"] + del state["base_data"] + state["data"] = self.get(queue=queue) - return d + return state def __setstate__(self, state): try: @@ -772,20 +767,14 @@ def __setstate__(self, state): raise RuntimeError("CL Array instances can only be pickled while " "queue_for_pickling is active.") - self.queue = queue - self.shape = state["shape"] - self.dtype = state["dtype"] - self.strides = state["strides"] - self.allocator = state["allocator"] - self.offset = state["offset"] - self._flags = state["_flags"] - self.size = state["size"] - self.nbytes = state["nbytes"] + self.__dict__.update(state) + self.context = queue.context self.events = [] + self.queue = queue if self.allocator is None: - context = queue.context - self.base_data = cl.Buffer(context, cl.mem_flags.READ_WRITE, self.nbytes) + self.base_data = cl.Buffer(self.context, cl.mem_flags.READ_WRITE, + self.nbytes) else: self.base_data = self.allocator(self.nbytes) diff --git a/test/test_array.py b/test/test_array.py index 84c1eb3a..a0733844 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2395,6 +2395,15 @@ def test_xdg_cache_home(ctx_factory): # {{{ test pickling +from pytools.tag import Taggable + + +class TaggableCLArray(cl_array.Array, Taggable): + def __init__(self, cq, shape, dtype, tags): + super().__init__(cq=cq, shape=shape, dtype=dtype) + self.tags = tags + + def test_array_pickling(ctx_factory): context = ctx_factory() queue = cl.CommandQueue(context) @@ -2406,10 +2415,20 @@ def test_array_pickling(ctx_factory): with pytest.raises(RuntimeError): pickle.dumps(a_gpu) - with cl.array.queue_for_pickling(queue): + with cl_array.queue_for_pickling(queue): a_gpu_pickled = pickle.loads(pickle.dumps(a_gpu)) assert np.all(a_gpu_pickled.get() == a) + a_gpu_tagged = TaggableCLArray(queue, a.shape, a.dtype, tags={"foo", "bar"}) + a_gpu_tagged.set(a) + + with cl_array.queue_for_pickling(queue): + a_gpu_tagged_pickled = pickle.loads(pickle.dumps(a_gpu_tagged)) + + assert np.all(a_gpu_tagged_pickled.get() == a) + assert a_gpu_tagged_pickled.tags == a_gpu_tagged.tags + + # }}} From b9fa6ef535ca7c64c5fda650357a70f9f3e455c4 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Wed, 18 Sep 2024 15:13:51 -0500 Subject: [PATCH 4/9] support pickling allocator, test SVM --- pyopencl/array.py | 9 ++++++++- test/test_array.py | 21 +++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index f9f0aed5..d4c78578 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -352,7 +352,7 @@ class _copy_queue: # noqa: N801 @contextmanager -def queue_for_pickling(queue): +def queue_for_pickling(queue, alloc=None): r"""A context manager that, for the current thread, sets the command queue to be used for pickling and unpickling :class:`Array`\ s to *queue*.""" try: @@ -365,6 +365,7 @@ def queue_for_pickling(queue): "inside the context of its own invocation.") _QUEUE_FOR_PICKLING_TLS.queue = queue + _QUEUE_FOR_PICKLING_TLS.alloc = alloc try: yield None finally: @@ -749,6 +750,8 @@ def __getstate__(self): "queue_for_pickling is active.") state = self.__dict__.copy() + + del state["allocator"] del state["context"] del state["events"] del state["queue"] @@ -760,14 +763,18 @@ def __getstate__(self): def __setstate__(self, state): try: queue = _QUEUE_FOR_PICKLING_TLS.queue + alloc = _QUEUE_FOR_PICKLING_TLS.alloc except AttributeError: queue = None + alloc = None if queue is None: raise RuntimeError("CL Array instances can only be pickled while " "queue_for_pickling is active.") self.__dict__.update(state) + + self.allocator = alloc self.context = queue.context self.events = [] self.queue = queue diff --git a/test/test_array.py b/test/test_array.py index a0733844..44c23bc1 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2419,6 +2419,8 @@ def test_array_pickling(ctx_factory): a_gpu_pickled = pickle.loads(pickle.dumps(a_gpu)) assert np.all(a_gpu_pickled.get() == a) + # {{{ subclass test + a_gpu_tagged = TaggableCLArray(queue, a.shape, a.dtype, tags={"foo", "bar"}) a_gpu_tagged.set(a) @@ -2428,6 +2430,25 @@ def test_array_pickling(ctx_factory): assert np.all(a_gpu_tagged_pickled.get() == a) assert a_gpu_tagged_pickled.tags == a_gpu_tagged.tags + # }}} + + # {{{ SVM test + + from pyopencl.tools import SVMAllocator, SVMPool + + alloc = SVMAllocator(context, alignment=0, queue=queue) + alloc = SVMPool(alloc) + + a_dev = cl_array.to_device(queue, a, allocator=alloc) + + with cl_array.queue_for_pickling(queue, alloc): + a_dev_pickled = pickle.loads(pickle.dumps(a_dev)) + + assert np.all(a_dev_pickled.get() == a) + assert a_dev_pickled.allocator is alloc + + # }}} + # }}} From 3930335c06a91ab3c08906be42079516501e3ee7 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Wed, 18 Sep 2024 15:18:23 -0500 Subject: [PATCH 5/9] misc fixes --- pyopencl/array.py | 1 + test/test_array.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index d4c78578..153dfeb5 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -370,6 +370,7 @@ def queue_for_pickling(queue, alloc=None): yield None finally: _QUEUE_FOR_PICKLING_TLS.queue = None + _QUEUE_FOR_PICKLING_TLS.alloc = None # }}} diff --git a/test/test_array.py b/test/test_array.py index 44c23bc1..d54cc307 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2434,18 +2434,21 @@ def test_array_pickling(ctx_factory): # {{{ SVM test - from pyopencl.tools import SVMAllocator, SVMPool + from pyopencl.characterize import has_coarse_grain_buffer_svm + + if has_coarse_grain_buffer_svm(queue.device): + from pyopencl.tools import SVMAllocator, SVMPool - alloc = SVMAllocator(context, alignment=0, queue=queue) - alloc = SVMPool(alloc) + alloc = SVMAllocator(context, alignment=0, queue=queue) + alloc = SVMPool(alloc) - a_dev = cl_array.to_device(queue, a, allocator=alloc) + a_dev = cl_array.to_device(queue, a, allocator=alloc) - with cl_array.queue_for_pickling(queue, alloc): - a_dev_pickled = pickle.loads(pickle.dumps(a_dev)) + with cl_array.queue_for_pickling(queue, alloc): + a_dev_pickled = pickle.loads(pickle.dumps(a_dev)) - assert np.all(a_dev_pickled.get() == a) - assert a_dev_pickled.allocator is alloc + assert np.all(a_dev_pickled.get() == a) + assert a_dev_pickled.allocator is alloc # }}} From b4a65b3e50119ad51a52b27fae36f15b136fa5cf Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Mon, 30 Sep 2024 13:07:16 -0500 Subject: [PATCH 6/9] update strides --- pyopencl/array.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyopencl/array.py b/pyopencl/array.py index 153dfeb5..3d63c758 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -786,7 +786,13 @@ def __setstate__(self, state): else: self.base_data = self.allocator(self.nbytes) - self.set(state["data"], queue=queue) + ary = state["data"] + + # Mimics the stride update in _get() below + if ary.strides != self.strides: + ary = _as_strided(ary, strides=self.strides) + + self.set(ary, queue=queue) # }}} From 34b082649561d923e7f8923f14785593f2d22b39 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Tue, 1 Oct 2024 16:32:21 -0500 Subject: [PATCH 7/9] (attempt at) pickle Buffers --- pyopencl/__init__.py | 76 ++++++++++++++++++++++++++++++++++++++++++++ test/test_array.py | 20 ++++++++++++ 2 files changed, 96 insertions(+) diff --git a/pyopencl/__init__.py b/pyopencl/__init__.py index af23e1b1..50053586 100644 --- a/pyopencl/__init__.py +++ b/pyopencl/__init__.py @@ -2407,4 +2407,80 @@ def fsvm_empty_like(ctx, ary, alignment=None): _KERNEL_ARG_CLASSES = (*_KERNEL_ARG_CLASSES, SVM) +# {{{ pickling support + +import threading +from contextlib import contextmanager + + +_QUEUE_FOR_PICKLING_TLS = threading.local() + + +@contextmanager +def queue_for_pickling(queue): + r"""A context manager that, for the current thread, sets the command queue + to be used for pickling and unpickling :class:`Buffer`\ s to *queue*.""" + try: + existing_pickle_queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + existing_pickle_queue = None + + if existing_pickle_queue is not None: + raise RuntimeError("queue_for_pickling should not be called " + "inside the context of its own invocation.") + + _QUEUE_FOR_PICKLING_TLS.queue = queue + try: + yield None + finally: + _QUEUE_FOR_PICKLING_TLS.queue = None + + +def _getstate_buffer(self): + import pyopencl as cl + state = {} + state["size"] = self.size + state["flags"] = self.flags + + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError("CL Buffer instances can only be pickled while " + "queue_for_pickling is active.") + + a = bytearray(self.size) + cl.enqueue_copy(queue, a, self) + + state["_pickle_data"] = a + + return state + + +def _setstate_buffer(self, state): + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError("CL Buffer instances can only be unpickled while " + "queue_for_pickling is active.") + + size = state["size"] + flags = state["flags"] + + import pyopencl as cl + + a = state["_pickle_data"] + Buffer.__init__(self, queue.context, flags | cl.mem_flags.COPY_HOST_PTR, size, a) + + +Buffer.__getstate__ = _getstate_buffer +Buffer.__setstate__ = _setstate_buffer + +# }}} + # vim: foldmethod=marker diff --git a/test/test_array.py b/test/test_array.py index d54cc307..7ef41698 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2453,6 +2453,26 @@ def test_array_pickling(ctx_factory): # }}} +def test_buffer_pickling(ctx_factory): + context = ctx_factory() + queue = cl.CommandQueue(context) + + a = np.array([1, 2, 3, 4, 5]).astype(np.float32) + a_gpu = cl.Buffer(context, cl.mem_flags.READ_WRITE, a.nbytes) + cl.enqueue_copy(queue, a_gpu, a) + + import pickle + + with pytest.raises(cl.RuntimeError): + pickle.dumps(a_gpu) + + with cl.queue_for_pickling(queue): + a_gpu_pickled = pickle.loads(pickle.dumps(a_gpu)) + + a_new = np.empty_like(a) + cl.enqueue_copy(queue, a_new, a_gpu_pickled) + assert np.all(a_new == a) + # }}} From 001615ce8b7e2a5b179a92a094aa917af1f26749 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Tue, 1 Oct 2024 22:48:36 -0500 Subject: [PATCH 8/9] pickle SVM, centralize queue_for_pickling, use Buffer/SVM pickling for Arrays --- pyopencl/__init__.py | 52 +++++++++++++++++++++++++++++++++++-- pyopencl/array.py | 62 +++++--------------------------------------- test/test_array.py | 11 ++++---- 3 files changed, 62 insertions(+), 63 deletions(-) diff --git a/pyopencl/__init__.py b/pyopencl/__init__.py index 50053586..fdef4313 100644 --- a/pyopencl/__init__.py +++ b/pyopencl/__init__.py @@ -2417,9 +2417,10 @@ def fsvm_empty_like(ctx, ary, alignment=None): @contextmanager -def queue_for_pickling(queue): +def queue_for_pickling(queue, alloc=None): r"""A context manager that, for the current thread, sets the command queue - to be used for pickling and unpickling :class:`Buffer`\ s to *queue*.""" + to be used for pickling and unpickling :class:`Array`\ s and :class:`Buffer`\ s + to *queue*.""" try: existing_pickle_queue = _QUEUE_FOR_PICKLING_TLS.queue except AttributeError: @@ -2430,10 +2431,12 @@ def queue_for_pickling(queue): "inside the context of its own invocation.") _QUEUE_FOR_PICKLING_TLS.queue = queue + _QUEUE_FOR_PICKLING_TLS.alloc = alloc try: yield None finally: _QUEUE_FOR_PICKLING_TLS.queue = None + _QUEUE_FOR_PICKLING_TLS.alloc = None def _getstate_buffer(self): @@ -2481,6 +2484,51 @@ def _setstate_buffer(self, state): Buffer.__getstate__ = _getstate_buffer Buffer.__setstate__ = _setstate_buffer +if get_cl_header_version() >= (2, 0): + def _getstate_svm(self): + import pyopencl as cl + + state = {} + state["size"] = self.size + + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError(f"{self.__class__.__name__} instances can only be " + "pickled while queue_for_pickling is active.") + + a = bytearray(self.size) + cl.enqueue_copy(queue, a, self) + + state["_pickle_data"] = a + + return state + + def _setstate_svm(self, state): + import pyopencl as cl + + try: + queue = _QUEUE_FOR_PICKLING_TLS.queue + except AttributeError: + queue = None + + if queue is None: + raise RuntimeError(f"{self.__class__.__name__} instances can only be " + "unpickled while queue_for_pickling is active.") + + size = state["size"] + + a = state["_pickle_data"] + SVMAllocation.__init__(self, queue.context, size, alignment=0, flags=0, + queue=queue) + cl.enqueue_copy(queue, self, a) + + SVMAllocation.__getstate__ = _getstate_svm + SVMAllocation.__setstate__ = _setstate_svm + # }}} # vim: foldmethod=marker diff --git a/pyopencl/array.py b/pyopencl/array.py index 3d63c758..92ceadaf 100644 --- a/pyopencl/array.py +++ b/pyopencl/array.py @@ -342,39 +342,6 @@ class _copy_queue: # noqa: N801 _NOT_PRESENT = object() -# {{{ pickling support - -import threading -from contextlib import contextmanager - - -_QUEUE_FOR_PICKLING_TLS = threading.local() - - -@contextmanager -def queue_for_pickling(queue, alloc=None): - r"""A context manager that, for the current thread, sets the command queue - to be used for pickling and unpickling :class:`Array`\ s to *queue*.""" - try: - existing_pickle_queue = _QUEUE_FOR_PICKLING_TLS.queue - except AttributeError: - existing_pickle_queue = None - - if existing_pickle_queue is not None: - raise RuntimeError("queue_for_pickling should not be called " - "inside the context of its own invocation.") - - _QUEUE_FOR_PICKLING_TLS.queue = queue - _QUEUE_FOR_PICKLING_TLS.alloc = alloc - try: - yield None - finally: - _QUEUE_FOR_PICKLING_TLS.queue = None - _QUEUE_FOR_PICKLING_TLS.alloc = None - -# }}} - - class Array: """A :class:`numpy.ndarray` work-alike that stores its data and performs its computations on the compute device. :attr:`shape` and :attr:`dtype` work @@ -742,13 +709,13 @@ def __init__( def __getstate__(self): try: - queue = _QUEUE_FOR_PICKLING_TLS.queue + queue = cl._QUEUE_FOR_PICKLING_TLS.queue except AttributeError: queue = None if queue is None: raise RuntimeError("CL Array instances can only be pickled while " - "queue_for_pickling is active.") + "cl.queue_for_pickling is active.") state = self.__dict__.copy() @@ -756,22 +723,19 @@ def __getstate__(self): del state["context"] del state["events"] del state["queue"] - del state["base_data"] - state["data"] = self.get(queue=queue) - return state def __setstate__(self, state): try: - queue = _QUEUE_FOR_PICKLING_TLS.queue - alloc = _QUEUE_FOR_PICKLING_TLS.alloc + queue = cl._QUEUE_FOR_PICKLING_TLS.queue + alloc = cl._QUEUE_FOR_PICKLING_TLS.alloc except AttributeError: queue = None alloc = None if queue is None: - raise RuntimeError("CL Array instances can only be pickled while " - "queue_for_pickling is active.") + raise RuntimeError("CL Array instances can only be unpickled while " + "cl.queue_for_pickling is active.") self.__dict__.update(state) @@ -780,20 +744,6 @@ def __setstate__(self, state): self.events = [] self.queue = queue - if self.allocator is None: - self.base_data = cl.Buffer(self.context, cl.mem_flags.READ_WRITE, - self.nbytes) - else: - self.base_data = self.allocator(self.nbytes) - - ary = state["data"] - - # Mimics the stride update in _get() below - if ary.strides != self.strides: - ary = _as_strided(ary, strides=self.strides) - - self.set(ary, queue=queue) - # }}} @property diff --git a/test/test_array.py b/test/test_array.py index 7ef41698..ab028eaf 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2415,7 +2415,7 @@ def test_array_pickling(ctx_factory): with pytest.raises(RuntimeError): pickle.dumps(a_gpu) - with cl_array.queue_for_pickling(queue): + with cl.queue_for_pickling(queue): a_gpu_pickled = pickle.loads(pickle.dumps(a_gpu)) assert np.all(a_gpu_pickled.get() == a) @@ -2424,7 +2424,7 @@ def test_array_pickling(ctx_factory): a_gpu_tagged = TaggableCLArray(queue, a.shape, a.dtype, tags={"foo", "bar"}) a_gpu_tagged.set(a) - with cl_array.queue_for_pickling(queue): + with cl.queue_for_pickling(queue): a_gpu_tagged_pickled = pickle.loads(pickle.dumps(a_gpu_tagged)) assert np.all(a_gpu_tagged_pickled.get() == a) @@ -2437,14 +2437,15 @@ def test_array_pickling(ctx_factory): from pyopencl.characterize import has_coarse_grain_buffer_svm if has_coarse_grain_buffer_svm(queue.device): - from pyopencl.tools import SVMAllocator, SVMPool + from pyopencl.tools import SVMAllocator alloc = SVMAllocator(context, alignment=0, queue=queue) - alloc = SVMPool(alloc) + # FIXME: SVMPool is not picklable + # alloc = SVMPool(alloc) a_dev = cl_array.to_device(queue, a, allocator=alloc) - with cl_array.queue_for_pickling(queue, alloc): + with cl.queue_for_pickling(queue, alloc): a_dev_pickled = pickle.loads(pickle.dumps(a_dev)) assert np.all(a_dev_pickled.get() == a) From 00876e19e3ccc4ab9d8cbfd5b0df3662e7fe12e5 Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Wed, 2 Oct 2024 16:21:57 -0500 Subject: [PATCH 9/9] add _get_queue_for_pickling, outline some pool support --- pyopencl/__init__.py | 117 +++++++++++++++++++++++++++++-------------- test/test_array.py | 16 ++++-- 2 files changed, 90 insertions(+), 43 deletions(-) diff --git a/pyopencl/__init__.py b/pyopencl/__init__.py index fdef4313..7c3e6edd 100644 --- a/pyopencl/__init__.py +++ b/pyopencl/__init__.py @@ -137,6 +137,7 @@ MemoryObject, MemoryMap, Buffer, + PooledBuffer, _Program, Kernel, @@ -197,7 +198,7 @@ enqueue_migrate_mem_objects, unload_platform_compiler) if get_cl_header_version() >= (2, 0): - from pyopencl._cl import SVM, SVMAllocation, SVMPointer + from pyopencl._cl import SVM, SVMAllocation, SVMPointer, PooledSVM if _cl.have_gl(): from pyopencl._cl import ( # noqa: F401 @@ -2439,21 +2440,28 @@ def queue_for_pickling(queue, alloc=None): _QUEUE_FOR_PICKLING_TLS.alloc = None -def _getstate_buffer(self): - import pyopencl as cl - state = {} - state["size"] = self.size - state["flags"] = self.flags - +def _get_queue_for_pickling(obj): try: queue = _QUEUE_FOR_PICKLING_TLS.queue + alloc = _QUEUE_FOR_PICKLING_TLS.alloc except AttributeError: queue = None if queue is None: - raise RuntimeError("CL Buffer instances can only be pickled while " + raise RuntimeError(f"{type(obj).__name__} instances can only be pickled while " "queue_for_pickling is active.") + return queue, alloc + + +def _getstate_buffer(self): + import pyopencl as cl + queue, _alloc = _get_queue_for_pickling(self) + + state = {} + state["size"] = self.size + state["flags"] = self.flags + a = bytearray(self.size) cl.enqueue_copy(queue, a, self) @@ -2463,20 +2471,12 @@ def _getstate_buffer(self): def _setstate_buffer(self, state): - try: - queue = _QUEUE_FOR_PICKLING_TLS.queue - except AttributeError: - queue = None - - if queue is None: - raise RuntimeError("CL Buffer instances can only be unpickled while " - "queue_for_pickling is active.") + import pyopencl as cl + queue, _alloc = _get_queue_for_pickling(self) size = state["size"] flags = state["flags"] - import pyopencl as cl - a = state["_pickle_data"] Buffer.__init__(self, queue.context, flags | cl.mem_flags.COPY_HOST_PTR, size, a) @@ -2484,21 +2484,44 @@ def _setstate_buffer(self, state): Buffer.__getstate__ = _getstate_buffer Buffer.__setstate__ = _setstate_buffer + +def _getstate_pooledbuffer(self): + import pyopencl as cl + queue, _alloc = _get_queue_for_pickling(self) + + state = {} + state["size"] = self.size + state["flags"] = self.flags + + a = bytearray(self.size) + cl.enqueue_copy(queue, a, self) + state["_pickle_data"] = a + + return state + + +def _setstate_pooledbuffer(self, state): + _queue, _alloc = _get_queue_for_pickling(self) + + _size = state["size"] + _flags = state["flags"] + + _a = state["_pickle_data"] + # FIXME: Unclear what to do here - PooledBuffer does not have __init__ + + +PooledBuffer.__getstate__ = _getstate_pooledbuffer +PooledBuffer.__setstate__ = _setstate_pooledbuffer + + if get_cl_header_version() >= (2, 0): - def _getstate_svm(self): + def _getstate_svmallocation(self): import pyopencl as cl state = {} state["size"] = self.size - try: - queue = _QUEUE_FOR_PICKLING_TLS.queue - except AttributeError: - queue = None - - if queue is None: - raise RuntimeError(f"{self.__class__.__name__} instances can only be " - "pickled while queue_for_pickling is active.") + queue, _alloc = _get_queue_for_pickling(self) a = bytearray(self.size) cl.enqueue_copy(queue, a, self) @@ -2507,17 +2530,10 @@ def _getstate_svm(self): return state - def _setstate_svm(self, state): + def _setstate_svmallocation(self, state): import pyopencl as cl - try: - queue = _QUEUE_FOR_PICKLING_TLS.queue - except AttributeError: - queue = None - - if queue is None: - raise RuntimeError(f"{self.__class__.__name__} instances can only be " - "unpickled while queue_for_pickling is active.") + queue, _alloc = _get_queue_for_pickling(self) size = state["size"] @@ -2526,8 +2542,33 @@ def _setstate_svm(self, state): queue=queue) cl.enqueue_copy(queue, self, a) - SVMAllocation.__getstate__ = _getstate_svm - SVMAllocation.__setstate__ = _setstate_svm + SVMAllocation.__getstate__ = _getstate_svmallocation + SVMAllocation.__setstate__ = _setstate_svmallocation + + def _getstate_pooled_svm(self): + import pyopencl as cl + + state = {} + state["size"] = self.size + + queue, _alloc = _get_queue_for_pickling(self) + + a = bytearray(self.size) + cl.enqueue_copy(queue, a, self) + + state["_pickle_data"] = a + + return state + + def _setstate_pooled_svm(self, state): + _queue, _alloc = _get_queue_for_pickling(self) + _size = state["size"] + _data = state["_pickle_data"] + + # FIXME: Unclear what to do here - PooledSVM does not have __init__ + + PooledSVM.__getstate__ = _getstate_pooled_svm + PooledSVM.__setstate__ = _setstate_pooled_svm # }}} diff --git a/test/test_array.py b/test/test_array.py index ab028eaf..104990b3 100644 --- a/test/test_array.py +++ b/test/test_array.py @@ -2404,12 +2404,18 @@ def __init__(self, cq, shape, dtype, tags): self.tags = tags -def test_array_pickling(ctx_factory): +@pytest.mark.parametrize("use_mempool", [False, True]) +def test_array_pickling(ctx_factory, use_mempool): context = ctx_factory() queue = cl.CommandQueue(context) + if use_mempool: + alloc = cl_tools.MemoryPool(cl_tools.ImmediateAllocator(queue)) + else: + alloc = None + a = np.array([1, 2, 3, 4, 5]).astype(np.float32) - a_gpu = cl_array.to_device(queue, a) + a_gpu = cl_array.to_device(queue, a, allocator=alloc) import pickle with pytest.raises(RuntimeError): @@ -2437,11 +2443,11 @@ def test_array_pickling(ctx_factory): from pyopencl.characterize import has_coarse_grain_buffer_svm if has_coarse_grain_buffer_svm(queue.device): - from pyopencl.tools import SVMAllocator + from pyopencl.tools import SVMAllocator, SVMPool alloc = SVMAllocator(context, alignment=0, queue=queue) - # FIXME: SVMPool is not picklable - # alloc = SVMPool(alloc) + if use_mempool: + alloc = SVMPool(alloc) a_dev = cl_array.to_device(queue, a, allocator=alloc)