From a3c184a66e3805cb4bd82f0c705a8fce6af6f879 Mon Sep 17 00:00:00 2001 From: ZJU3190105746 <2632839426@qq.com> Date: Mon, 28 Aug 2023 14:05:00 +0800 Subject: [PATCH 1/7] add doc for collective communication --- .../reference/collective-communication.rst | 20 ++++ doc/source/reference/index.rst | 1 + .../user_guide/collective-communication.rst | 92 +++++++++++++++++++ doc/source/user_guide/index.rst | 3 +- python/xoscar/collective/core.py | 4 +- 5 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 doc/source/reference/collective-communication.rst create mode 100644 doc/source/user_guide/collective-communication.rst diff --git a/doc/source/reference/collective-communication.rst b/doc/source/reference/collective-communication.rst new file mode 100644 index 00000000..c2a20065 --- /dev/null +++ b/doc/source/reference/collective-communication.rst @@ -0,0 +1,20 @@ +.. _ref_collective_communication: + +========================= +Collective communitcation +========================= + +.. autosummary:: + :toctree: generated/ + + xoscar.collective.core.init_process_group + xoscar.collective.core.new_group + xoscar.collective.core.reduce + xoscar.collective.core.allreduce + xoscar.collective.core.gather + xoscar.collective.core.allgather + xoscar.collective.core.scatter + xoscar.collective.core.alltoall + xoscar.collective.core.broadcast + + diff --git a/doc/source/reference/index.rst b/doc/source/reference/index.rst index 9d520a91..0ab66e15 100644 --- a/doc/source/reference/index.rst +++ b/doc/source/reference/index.rst @@ -9,3 +9,4 @@ API Reference actor-pool actor + collective-communication diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst new file mode 100644 index 00000000..b2166167 --- /dev/null +++ b/doc/source/user_guide/collective-communication.rst @@ -0,0 +1,92 @@ +.. _colletive-communication: + +========================= +Collective communitcation +========================= + +Collective communication is a global communication operation in which all processes in a process +group participate. + +xoscar supports collective communication among actors. It utilizes the Gloo backend on CPU and +the NCCL backend on GPU. You can determine which backend to use by setting the parameter ``backend`` +when establishing the process group. + +.. seealso:: + :ref:`ref_collective_communication` + + +Collective communication example +-------------------------------- + +To perform collective communication, you need to create an actor to invoke the relevant interfaces +for collective communication. First, you need to initialize the process group. After initializing +the process group, you can create smaller process groups within this overall process group for +collective communication. Here is an example of how to perform a broadcast operation: + +.. code-block:: python + + from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config + from xoscar.context import get_context + from xoscar.collective.common import( + RANK_ADDRESS_ENV_KEY, + RENDEZVOUS_MASTER_IP_ENV_KEY, + RENDEZVOUS_MASTER_PORT_ENV_KEY, + ) + from xoscar.collective.core import ( + RankActor, + broadcast, + init_process_group, + new_group, + ) + import os + import numpy as np + import asyncio + + class WorkerActor(Actor): + def __init__(self, rank, world, *args, **kwargs): + self._rank = rank + self._world = world + + async def init_process_group(self): + os.environ[RANK_ADDRESS_ENV_KEY] = self.address + return await init_process_group(self._rank, self._world) + + async def test_broadcast(self): + root = 1 + _group = [0, 1] + sendbuf = np.zeros((2, 3), dtype=np.int64) + if self._rank == _group[root]: + sendbuf = sendbuf + self._rank + recvbuf = np.zeros_like(sendbuf, dtype=np.int64) + group = await new_group(_group) + if group is not None: + await broadcast(sendbuf, recvbuf, root=root, group_name=group) + print(np.equal(recvbuf, np.zeros_like(recvbuf) + _group[root])) + + pool = await create_actor_pool( + "127.0.0.1", + n_process=2, + envs=[ + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + } + ] + * 2, + ) + main_addr = pool.external_address + config = (await get_pool_config(pool.external_address)).as_dict() + all_addrs = list(config["mapping"].keys()) + all_addrs.remove(main_addr) + + async with pool: + ctx = get_context() + r0 = await ctx.create_actor(WorkerActor, 0, 2, address=all_addrs[0]) + r1 = await ctx.create_actor(WorkerActor, 1, 2, address=all_addrs[1]) + t0 = r0.init_process_group() + t1 = r1.init_process_group() + await asyncio.gather(*[t0, t1]) + + t0 = r0.test_broadcast() + t1 = r1.test_broadcast() + await asyncio.gather(*[t0, t1]) diff --git a/doc/source/user_guide/index.rst b/doc/source/user_guide/index.rst index 1cbcda7a..deedab78 100644 --- a/doc/source/user_guide/index.rst +++ b/doc/source/user_guide/index.rst @@ -8,4 +8,5 @@ User Guide :maxdepth: 2 actor - actor-pool \ No newline at end of file + actor-pool + collective-communication \ No newline at end of file diff --git a/python/xoscar/collective/core.py b/python/xoscar/collective/core.py index a2f50782..32d8b75a 100644 --- a/python/xoscar/collective/core.py +++ b/python/xoscar/collective/core.py @@ -348,8 +348,8 @@ async def init_process_group( will be created. device_id(int, optional): GPU ID the actor will bind, default ``None`` - If it is None and backend is gloo, it will try to get it from the environment variable COLLECTIVE_DEVICE_ID_ENV_KEY. - If the environment variable is not set either, it will return an error. + If it is None and backend is gloo, it will try to get it from the environment variable COLLECTIVE_DEVICE_ID_ENV_KEY. + If the environment variable is not set either, it will return an error. address(str, optional): actor address. default ``None`` """ From 8d57b400aab039a89fac82d10bc87d6e8a095d8f Mon Sep 17 00:00:00 2001 From: ZJU3190105746 <2632839426@qq.com> Date: Mon, 28 Aug 2023 14:09:24 +0800 Subject: [PATCH 2/7] add doc for collective communication --- doc/source/user_guide/collective-communication.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst index b2166167..32561ae0 100644 --- a/doc/source/user_guide/collective-communication.rst +++ b/doc/source/user_guide/collective-communication.rst @@ -9,7 +9,7 @@ group participate. xoscar supports collective communication among actors. It utilizes the Gloo backend on CPU and the NCCL backend on GPU. You can determine which backend to use by setting the parameter ``backend`` -when establishing the process group. +of function ``init_process_group`` when establishing the process group. .. seealso:: :ref:`ref_collective_communication` @@ -24,7 +24,7 @@ the process group, you can create smaller process groups within this overall pro collective communication. Here is an example of how to perform a broadcast operation: .. code-block:: python - + from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config from xoscar.context import get_context from xoscar.collective.common import( From 4ec17d0d54ecbc3c1055f3ac431ae168119aeda1 Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+YibinLiu666@users.noreply.github.com> Date: Mon, 28 Aug 2023 15:37:04 +0800 Subject: [PATCH 3/7] Update collective-communication.rst --- .../user_guide/collective-communication.rst | 95 +++++++++++++++++-- 1 file changed, 88 insertions(+), 7 deletions(-) diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst index 32561ae0..d262e4d6 100644 --- a/doc/source/user_guide/collective-communication.rst +++ b/doc/source/user_guide/collective-communication.rst @@ -11,17 +11,18 @@ xoscar supports collective communication among actors. It utilizes the Gloo back the NCCL backend on GPU. You can determine which backend to use by setting the parameter ``backend`` of function ``init_process_group`` when establishing the process group. +To perform collective communication, you need to create an actor to invoke the relevant interfaces +for collective communication. First, you need to initialize the process group. After initializing +the process group, you can create smaller process groups within this overall process group for +collective communication. + .. seealso:: :ref:`ref_collective_communication` -Collective communication example --------------------------------- - -To perform collective communication, you need to create an actor to invoke the relevant interfaces -for collective communication. First, you need to initialize the process group. After initializing -the process group, you can create smaller process groups within this overall process group for -collective communication. Here is an example of how to perform a broadcast operation: +Collective communication on CPU +------------------------------- +xoscar uses Gloo as backend on CPU, here is an example of how to perform a broadcast operation on CPU: .. code-block:: python @@ -90,3 +91,83 @@ collective communication. Here is an example of how to perform a broadcast opera t0 = r0.test_broadcast() t1 = r1.test_broadcast() await asyncio.gather(*[t0, t1]) + +Collective communication on GPU +------------------------------- +xoscar uses NCCL as backend on GPU and it depends on cupy. Therefore, before using +collective communication with xOSCAR on GPU, you need to install the appropriate +version of Cupy based on your NVIDIA driver version. You can refer to https://docs.cupy.dev/en/stable/install.html#installing-cupy +for the installation steps and compatibility information. Here is an example +of how to perform a broadcast operation on GPU(2 GPUs are needed for this example): + +.. code-block:: python + + from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config + from xoscar.context import get_context + from xoscar.collective.common import( + RANK_ADDRESS_ENV_KEY, + RENDEZVOUS_MASTER_IP_ENV_KEY, + RENDEZVOUS_MASTER_PORT_ENV_KEY, + ) + from xoscar.collective.core import ( + RankActor, + broadcast, + init_process_group, + new_group, + ) + import os + import numpy as np + import asyncio + + class WorkerActor(Actor): + def __init__(self, rank, world, *args, **kwargs): + self._rank = rank + self._world = world + + async def init_process_group(self): + os.environ[RANK_ADDRESS_ENV_KEY] = self.address + return await init_process_group(self._rank, self._world, "nccl") + + async def test_broadcast(self): + import cupy as cp + + root = 1 + _group = [0, 1] + sendbuf = cp.zeros((2, 3), dtype=np.int64) + if self._rank == _group[root]: + sendbuf = sendbuf + self._rank + recvbuf = cp.zeros_like(sendbuf, dtype=np.int64) + group = await new_group(_group) + if group is not None: + await broadcast(sendbuf, recvbuf, root=root, group_name=group) + cp.testing.assert_array_equal(recvbuf, cp.zeros_like(recvbuf) + _group[root]) + + pool = await create_actor_pool( + "127.0.0.1", + n_process=2, + envs=[ + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + COLLECTIVE_DEVICE_ID_ENV_KEY: "0", + }, + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + COLLECTIVE_DEVICE_ID_ENV_KEY: "1", + }, + ], + ) + + config = (await get_pool_config(pool.external_address)).as_dict() + all_addrs = list(config["mapping"].keys()) + async with pool: + ctx = get_context() + r0 = await ctx.create_actor(NcclWorkerActor, 0, 2, address=all_addrs[0]) + r1 = await ctx.create_actor(NcclWorkerActor, 1, 2, address=all_addrs[1]) + t0 = r0.init_process_group() + t1 = r1.init_process_group() + await asyncio.gather(t0, t1) + t0 = r0.test_collective_np() + t1 = r1.test_collective_np() + await asyncio.gather(t0, t1) From 57f2b005b7be20f5a7aa1647e072e32e013bc153 Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+YibinLiu666@users.noreply.github.com> Date: Thu, 31 Aug 2023 11:17:30 +0800 Subject: [PATCH 4/7] Update collective-communication.rst --- doc/source/user_guide/collective-communication.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst index d262e4d6..0424ec1d 100644 --- a/doc/source/user_guide/collective-communication.rst +++ b/doc/source/user_guide/collective-communication.rst @@ -94,8 +94,8 @@ xoscar uses Gloo as backend on CPU, here is an example of how to perform a broad Collective communication on GPU ------------------------------- -xoscar uses NCCL as backend on GPU and it depends on cupy. Therefore, before using -collective communication with xOSCAR on GPU, you need to install the appropriate +xoscar uses NCCL as backend on GPU and it depends on ``cupy``. Therefore, before using +collective communication with xoscar on GPU, you need to install the appropriate version of Cupy based on your NVIDIA driver version. You can refer to https://docs.cupy.dev/en/stable/install.html#installing-cupy for the installation steps and compatibility information. Here is an example of how to perform a broadcast operation on GPU(2 GPUs are needed for this example): From 093efe82e40ace20a422588f672d2d5751f69c58 Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+YibinLiu666@users.noreply.github.com> Date: Thu, 31 Aug 2023 13:41:31 +0800 Subject: [PATCH 5/7] Update collective-communication.rst --- .../reference/collective-communication.rst | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/doc/source/reference/collective-communication.rst b/doc/source/reference/collective-communication.rst index c2a20065..ff013899 100644 --- a/doc/source/reference/collective-communication.rst +++ b/doc/source/reference/collective-communication.rst @@ -7,14 +7,13 @@ Collective communitcation .. autosummary:: :toctree: generated/ - xoscar.collective.core.init_process_group - xoscar.collective.core.new_group - xoscar.collective.core.reduce - xoscar.collective.core.allreduce - xoscar.collective.core.gather - xoscar.collective.core.allgather - xoscar.collective.core.scatter - xoscar.collective.core.alltoall - xoscar.collective.core.broadcast - - + xoscar.collective.init_process_group + xoscar.collective.new_group + xoscar.collective.reduce + xoscar.collective.allreduce + xoscar.collective.gather + xoscar.collective.allgather + xoscar.collective.scatter + xoscar.collective.reduce_scatter + xoscar.collective.alltoall + xoscar.collective.broadcast From 8eaa4b895c2b49bfd5c50d7cca775927cf715337 Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+YibinLiu666@users.noreply.github.com> Date: Thu, 31 Aug 2023 13:42:09 +0800 Subject: [PATCH 6/7] Update collective-communication.rst --- doc/source/user_guide/collective-communication.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst index 0424ec1d..c0b85766 100644 --- a/doc/source/user_guide/collective-communication.rst +++ b/doc/source/user_guide/collective-communication.rst @@ -33,7 +33,7 @@ xoscar uses Gloo as backend on CPU, here is an example of how to perform a broad RENDEZVOUS_MASTER_IP_ENV_KEY, RENDEZVOUS_MASTER_PORT_ENV_KEY, ) - from xoscar.collective.core import ( + from xoscar.collective import ( RankActor, broadcast, init_process_group, @@ -94,8 +94,8 @@ xoscar uses Gloo as backend on CPU, here is an example of how to perform a broad Collective communication on GPU ------------------------------- -xoscar uses NCCL as backend on GPU and it depends on ``cupy``. Therefore, before using -collective communication with xoscar on GPU, you need to install the appropriate +xoscar uses NCCL as backend on GPU and it depends on cupy. Therefore, before using +collective communication with xOSCAR on GPU, you need to install the appropriate version of Cupy based on your NVIDIA driver version. You can refer to https://docs.cupy.dev/en/stable/install.html#installing-cupy for the installation steps and compatibility information. Here is an example of how to perform a broadcast operation on GPU(2 GPUs are needed for this example): @@ -109,7 +109,7 @@ of how to perform a broadcast operation on GPU(2 GPUs are needed for this exampl RENDEZVOUS_MASTER_IP_ENV_KEY, RENDEZVOUS_MASTER_PORT_ENV_KEY, ) - from xoscar.collective.core import ( + from xoscar.collective import ( RankActor, broadcast, init_process_group, From 0b8200909ea8783bea844dcfca60e340ab663a3f Mon Sep 17 00:00:00 2001 From: YibLiu <68105073+YibinLiu666@users.noreply.github.com> Date: Thu, 31 Aug 2023 15:10:27 +0800 Subject: [PATCH 7/7] Update core.py --- python/xoscar/collective/core.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/python/xoscar/collective/core.py b/python/xoscar/collective/core.py index 32d8b75a..92480a6c 100644 --- a/python/xoscar/collective/core.py +++ b/python/xoscar/collective/core.py @@ -332,8 +332,7 @@ async def init_process_group( address: Optional[str] = None, ): """ - Initializes the default distributed process group, and this will also - initialize the distributed package. + Initializes the default distributed process group. Args: rank (int): Rank of the current process (it should be a @@ -347,8 +346,8 @@ async def init_process_group( ``nccl``. If the backend is not provided, then a ``gloo`` backend will be created. - device_id(int, optional): GPU ID the actor will bind, default ``None`` - If it is None and backend is gloo, it will try to get it from the environment variable COLLECTIVE_DEVICE_ID_ENV_KEY. + device_id(int, optional): GPU id that the actor will bind, default ``None``. + If it is ``None`` and backend is ``gloo``, it will try to get it from the environment variable ``COLLECTIVE_DEVICE_ID_ENV_KEY``. If the environment variable is not set either, it will return an error. address(str, optional): actor address. default ``None`` @@ -394,12 +393,10 @@ async def new_group( This function requires that all processes in the main group (i.e. all processes that are part of the distributed job) enter this function, even - if they are not going to be members of the group. Additionally, groups - should be created in the same order in all processes. + if they are not going to be members of the group. Args: - ranks (list[int]): List of ranks of group members. If ``None``, will be - set to all ranks. Default is ``None``. + ranks (list[int]): List of ranks of group members. pg_options (ProcessGroupOptions, optional): process group options specifying what additional options need to be passed in during @@ -476,11 +473,9 @@ async def allreduce( the final result. Args: - send_data (Any): Input of the collective. The function - operates in-place. + send_data (Any): Input of the collective. - recv_data (Any): Output of the collective. The function - operates in-place. + recv_data (Any): Output of the collective. op (xoscar.collective.common.CollectiveReduceOp): One of the values from ``xoscar.collective.common.CollectiveReduceOp`` @@ -632,6 +627,7 @@ async def reduce_scatter( ): """ Reduces, then scatters a list of numpy or cupy data to all processes in a group. + It can be only used on linux. Args: send_data (Any): Input data. @@ -706,7 +702,7 @@ async def broadcast( stream: Optional[Any] = None, ): """ - Broadcasts the tensor to the whole group. + Broadcasts the numpy or cupy data to the whole group. data must have the same number of elements in all processes participating in the collective.