Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vmm benchmark support #883

Draft
wants to merge 15 commits into
base: branch-0.28
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ucp/_libs/arr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ cdef class Array:

cdef readonly bint cuda

cdef readonly list _blocks

cpdef bint _c_contiguous(self)
cpdef bint _f_contiguous(self)
cpdef bint _contiguous(self)
Expand Down
27 changes: 27 additions & 0 deletions ucp/_libs/arr.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,29 @@ cdef class Array:
else:
self.shape_mv = None
self.strides_mv = None

# Multi-block VMM property
self._blocks = None
try:
from dask_cuda.vmm_pool import rmm_get_current_vmm_pool

try:
vmm_pool = rmm_get_current_vmm_pool()
try:
from ucp._libs.vmm import build_slices
blocks = vmm_pool._allocs[self.ptr].blocks
try:
# self._blocks = build_slices(blocks, obj.shape[0])
self._blocks = build_slices(blocks, self.nbytes)
except AttributeError:
print(f"AttributeError: {type(obj)}, {obj}")
except KeyError as e:
pass
except ValueError as e:
pass
except ImportError as e:
if hasattr(obj, "get_blocks"):
self._blocks = obj.get_blocks()
else:
mv = PyMemoryView_FromObject(obj)
pybuf = PyMemoryView_GET_BUFFER(mv)
Expand Down Expand Up @@ -237,6 +260,10 @@ cdef class Array:
s *= self.shape_mv[i]
return strides

@property
def blocks(self):
return self._blocks


@boundscheck(False)
@initializedcheck(False)
Expand Down
139 changes: 139 additions & 0 deletions ucp/_libs/vmm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from functools import partial
from typing import List, Tuple, Union

import numpy as np
from cuda import cuda

from dask_cuda.rmm_vmm_block_pool import VmmBlockPool
from dask_cuda.rmm_vmm_pool import checkCudaErrors
from dask_cuda.vmm_pool import VmmBlock, VmmPool


def get_vmm_allocator(vmm):
if vmm:
if isinstance(vmm, VmmBlockPool) or isinstance(vmm, VmmPool):
vmm_allocator = VmmBlockPoolArray
else:
vmm_allocator = VmmSingleArray
return partial(vmm_allocator, vmm)

return None


def copy_to_host(
dst: np.ndarray,
src: Union[int, cuda.CUdeviceptr],
size: int,
stream: cuda.CUstream = cuda.CUstream(0),
):
if isinstance(src, int):
src = cuda.CUdeviceptr(src)
assert isinstance(src, cuda.CUdeviceptr)
assert isinstance(dst, np.ndarray)
assert isinstance(size, int)
assert size > 0
# print(
# f"copy_to_host src: {hex(int(src))}, dst: {hex(int(dst.ctypes.data))}",
# flush=True
# )
checkCudaErrors(cuda.cuMemcpyDtoHAsync(dst.ctypes.data, src, size, stream))
checkCudaErrors(cuda.cuStreamSynchronize(stream))


def copy_to_device(
dst: Union[int, cuda.CUdeviceptr],
src: np.ndarray,
size: int,
stream: cuda.CUstream = cuda.CUstream(0),
):
assert isinstance(src, np.ndarray)
if isinstance(dst, int):
dst = cuda.CUdeviceptr(dst)
assert isinstance(dst, cuda.CUdeviceptr)
assert isinstance(size, int)
assert size > 0
# print(
# f"copy_to_device src: {hex(int(src.ctypes.data))}, dst: {hex(int(dst))}",
# flush=True
# )
checkCudaErrors(cuda.cuMemcpyHtoDAsync(dst, src.ctypes.data, size, stream))
checkCudaErrors(cuda.cuStreamSynchronize(stream))


class VmmAllocBase:
def __init__(self, ptr, size):
self.ptr = cuda.CUdeviceptr(ptr)
self.shape = (size,)

def __repr__(self) -> str:
return f"<VmmAllocBase ptr={hex(int(self.ptr))}, size={self.shape[0]}>"

@property
def __cuda_array_interface__(self):
return {
"shape": (self.shape),
"typestr": "u1",
"data": (int(self.ptr), False),
"version": 2,
}

@property
def nbytes(self):
return self.shape[0]


class VmmArraySlice(VmmAllocBase):
pass


class VmmSingleArray(VmmAllocBase):
def __init__(self, vmm_allocator, size):
ptr = cuda.CUdeviceptr(vmm_allocator.allocate(size))
super().__init__(ptr, size)

self.vmm_allocator = vmm_allocator

def __del__(self):
self.vmm_allocator.deallocate(int(self.ptr), self.shape[0])


class VmmBlockPoolArray(VmmAllocBase):
def __init__(self, vmm_block_pool_allocator, size):
ptr = cuda.CUdeviceptr(vmm_block_pool_allocator.allocate(size))
super().__init__(ptr, size)

self.vmm_allocator = vmm_block_pool_allocator

def __del__(self):
self.vmm_allocator.deallocate(int(self.ptr), self.shape[0])

def get_blocks(self):
if isinstance(self.vmm_allocator, VmmBlockPool):
blocks = self.vmm_allocator.get_allocation_blocks(int(self.ptr))
else:
blocks = self.vmm_allocator._allocs[int(self.ptr)].blocks
return build_slices(blocks, self.shape[0])


def build_slices(
blocks: List[Union[Tuple, VmmBlock]], alloc_size: int
) -> List[VmmArraySlice]:
assert len(blocks) > 0

cur_size = 0
ret = []
if isinstance(blocks[0], VmmBlock):
for block in blocks:
block_size = min(alloc_size - cur_size, block.size)
ret.append(VmmArraySlice(block._ptr, block_size))
cur_size += block.size
if cur_size >= alloc_size:
break
else:
for block in blocks:
block_size = min(alloc_size - cur_size, block[1])
ret.append(VmmArraySlice(block[0], block_size))
cur_size += block[1]
if cur_size >= alloc_size:
break
return ret
60 changes: 52 additions & 8 deletions ucp/benchmarks/backends/ucp_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ucp
from ucp._libs.arr import Array
from ucp._libs.utils import print_key_value
from ucp._libs.vmm import copy_to_device, copy_to_host, get_vmm_allocator
from ucp.benchmarks.backends.base import BaseClient, BaseServer


Expand Down Expand Up @@ -46,16 +47,24 @@ def __init__(self, args: Namespace, xp: Any, queue: Queue):
self.args = args
self.xp = xp
self.queue = queue
self.vmm = None

async def run(self):
ucp.init()

register_am_allocators(self.args)

vmm_allocator = get_vmm_allocator(self.vmm)

async def server_handler(ep):
recv_msg_vmm = None
if not self.args.enable_am:
if self.args.reuse_alloc:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
if self.vmm:
recv_msg_vmm = vmm_allocator(self.args.n_bytes)
recv_msg = Array(recv_msg_vmm)
else:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))

assert recv_msg.nbytes == self.args.n_bytes

Expand All @@ -65,10 +74,20 @@ async def server_handler(ep):
await ep.am_send(recv)
else:
if not self.args.reuse_alloc:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
if self.vmm:
recv_msg = Array(vmm_allocator(self.args.n_bytes))
else:
recv_msg = Array(
self.xp.empty(self.args.n_bytes, dtype="u1")
)

await ep.recv(recv_msg)
await ep.send(recv_msg)

if self.vmm and self.args.vmm_debug:
h_recv_msg = self.xp.empty(self.args.n_bytes, dtype="u1")
copy_to_host(h_recv_msg, recv_msg.ptr, self.args.n_bytes)
print(f"Server recv msg: {h_recv_msg}")
await ep.close()
lf.close()

Expand All @@ -88,6 +107,7 @@ def __init__(
self.args = args
self.xp = xp
self.queue = queue
self.vmm = None
self.server_address = server_address
self.port = port

Expand All @@ -96,17 +116,29 @@ async def run(self):

register_am_allocators(self.args)

vmm_allocator = get_vmm_allocator(self.vmm)

ep = await ucp.create_endpoint(self.server_address, self.port)

if self.args.enable_am:
msg = self.xp.arange(self.args.n_bytes, dtype="u1")
else:
send_msg = Array(self.xp.arange(self.args.n_bytes, dtype="u1"))
if self.args.reuse_alloc:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
if self.vmm:
h_send_msg = self.xp.arange(self.args.n_bytes, dtype="u1")
print(f"Client send: {h_send_msg}")
send_msg = Array(vmm_allocator(self.args.n_bytes))
copy_to_device(send_msg.ptr, h_send_msg, send_msg.shape[0])
if self.args.reuse_alloc:
recv_msg = Array(vmm_allocator(self.args.n_bytes))

assert recv_msg.nbytes == self.args.n_bytes
else:
send_msg = Array(self.xp.arange(self.args.n_bytes, dtype="u1"))
if self.args.reuse_alloc:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))

assert send_msg.nbytes == self.args.n_bytes
assert recv_msg.nbytes == self.args.n_bytes
assert send_msg.nbytes == self.args.n_bytes
assert recv_msg.nbytes == self.args.n_bytes

if self.args.cuda_profile:
self.xp.cuda.profiler.start()
Expand All @@ -118,11 +150,23 @@ async def run(self):
await ep.am_recv()
else:
if not self.args.reuse_alloc:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))
if self.vmm:
recv_msg = Array(vmm_allocator(self.args.n_bytes))
else:
recv_msg = Array(self.xp.zeros(self.args.n_bytes, dtype="u1"))

await ep.send(send_msg)
await ep.recv(recv_msg)
stop = monotonic()

if self.vmm and self.args.vmm_debug:
import numpy as np

h_recv_msg = self.xp.empty(self.args.n_bytes, dtype="u1")
copy_to_host(h_recv_msg, recv_msg.ptr, recv_msg.shape[0])
print(f"Client recv: {h_recv_msg}")
np.testing.assert_equal(h_recv_msg, h_send_msg)

if i >= self.args.n_warmup_iter:
times.append(stop - start)
if self.args.cuda_profile:
Expand Down
Loading