Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOC: collective communication #68

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
19 changes: 19 additions & 0 deletions doc/source/reference/collective-communication.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.. _ref_collective_communication:

=========================
Collective communitcation
=========================

.. autosummary::
:toctree: generated/

xoscar.collective.init_process_group
xoscar.collective.new_group
xoscar.collective.reduce
xoscar.collective.allreduce
xoscar.collective.gather
xoscar.collective.allgather
xoscar.collective.scatter
xoscar.collective.reduce_scatter
xoscar.collective.alltoall
xoscar.collective.broadcast
1 change: 1 addition & 0 deletions doc/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ API Reference

actor-pool
actor
collective-communication
173 changes: 173 additions & 0 deletions doc/source/user_guide/collective-communication.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
.. _colletive-communication:

=========================
Collective communitcation
=========================

Collective communication is a global communication operation in which all processes in a process
group participate.

xoscar supports collective communication among actors. It utilizes the Gloo backend on CPU and
the NCCL backend on GPU. You can determine which backend to use by setting the parameter ``backend``
of function ``init_process_group`` when establishing the process group.

To perform collective communication, you need to create an actor to invoke the relevant interfaces
for collective communication. First, you need to initialize the process group. After initializing
the process group, you can create smaller process groups within this overall process group for
collective communication.

.. seealso::
:ref:`ref_collective_communication`


Collective communication on CPU
-------------------------------
xoscar uses Gloo as backend on CPU, here is an example of how to perform a broadcast operation on CPU:

.. code-block:: python

from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config
from xoscar.context import get_context
from xoscar.collective.common import(
RANK_ADDRESS_ENV_KEY,
RENDEZVOUS_MASTER_IP_ENV_KEY,
RENDEZVOUS_MASTER_PORT_ENV_KEY,
)
from xoscar.collective import (
RankActor,
broadcast,
init_process_group,
new_group,
)
import os
import numpy as np
import asyncio

class WorkerActor(Actor):
def __init__(self, rank, world, *args, **kwargs):
self._rank = rank
self._world = world

async def init_process_group(self):
os.environ[RANK_ADDRESS_ENV_KEY] = self.address
return await init_process_group(self._rank, self._world)

async def test_broadcast(self):
root = 1
_group = [0, 1]
sendbuf = np.zeros((2, 3), dtype=np.int64)
if self._rank == _group[root]:
sendbuf = sendbuf + self._rank
recvbuf = np.zeros_like(sendbuf, dtype=np.int64)
group = await new_group(_group)
if group is not None:
await broadcast(sendbuf, recvbuf, root=root, group_name=group)
print(np.equal(recvbuf, np.zeros_like(recvbuf) + _group[root]))

pool = await create_actor_pool(
"127.0.0.1",
n_process=2,
envs=[
{
RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1",
RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001",
}
]
* 2,
)
main_addr = pool.external_address
config = (await get_pool_config(pool.external_address)).as_dict()
all_addrs = list(config["mapping"].keys())
all_addrs.remove(main_addr)

async with pool:
ctx = get_context()
r0 = await ctx.create_actor(WorkerActor, 0, 2, address=all_addrs[0])
r1 = await ctx.create_actor(WorkerActor, 1, 2, address=all_addrs[1])
t0 = r0.init_process_group()
t1 = r1.init_process_group()
await asyncio.gather(*[t0, t1])

t0 = r0.test_broadcast()
t1 = r1.test_broadcast()
await asyncio.gather(*[t0, t1])

Collective communication on GPU
-------------------------------
xoscar uses NCCL as backend on GPU and it depends on cupy. Therefore, before using
collective communication with xOSCAR on GPU, you need to install the appropriate
version of Cupy based on your NVIDIA driver version. You can refer to https://docs.cupy.dev/en/stable/install.html#installing-cupy
for the installation steps and compatibility information. Here is an example
of how to perform a broadcast operation on GPU(2 GPUs are needed for this example):

.. code-block:: python

from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config
from xoscar.context import get_context
from xoscar.collective.common import(
RANK_ADDRESS_ENV_KEY,
RENDEZVOUS_MASTER_IP_ENV_KEY,
RENDEZVOUS_MASTER_PORT_ENV_KEY,
)
from xoscar.collective import (
RankActor,
broadcast,
init_process_group,
new_group,
)
import os
import numpy as np
import asyncio

class WorkerActor(Actor):
def __init__(self, rank, world, *args, **kwargs):
self._rank = rank
self._world = world

async def init_process_group(self):
os.environ[RANK_ADDRESS_ENV_KEY] = self.address
return await init_process_group(self._rank, self._world, "nccl")

async def test_broadcast(self):
import cupy as cp

root = 1
_group = [0, 1]
sendbuf = cp.zeros((2, 3), dtype=np.int64)
if self._rank == _group[root]:
sendbuf = sendbuf + self._rank
recvbuf = cp.zeros_like(sendbuf, dtype=np.int64)
group = await new_group(_group)
if group is not None:
await broadcast(sendbuf, recvbuf, root=root, group_name=group)
cp.testing.assert_array_equal(recvbuf, cp.zeros_like(recvbuf) + _group[root])

pool = await create_actor_pool(
"127.0.0.1",
n_process=2,
envs=[
{
RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1",
RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001",
COLLECTIVE_DEVICE_ID_ENV_KEY: "0",
},
{
RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1",
RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001",
COLLECTIVE_DEVICE_ID_ENV_KEY: "1",
},
],
)

config = (await get_pool_config(pool.external_address)).as_dict()
all_addrs = list(config["mapping"].keys())
async with pool:
ctx = get_context()
r0 = await ctx.create_actor(NcclWorkerActor, 0, 2, address=all_addrs[0])
r1 = await ctx.create_actor(NcclWorkerActor, 1, 2, address=all_addrs[1])
t0 = r0.init_process_group()
t1 = r1.init_process_group()
await asyncio.gather(t0, t1)
t0 = r0.test_collective_np()
t1 = r1.test_collective_np()
await asyncio.gather(t0, t1)
3 changes: 2 additions & 1 deletion doc/source/user_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ User Guide
:maxdepth: 2

actor
actor-pool
actor-pool
collective-communication
24 changes: 10 additions & 14 deletions python/xoscar/collective/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ async def init_process_group(
address: Optional[str] = None,
):
"""
Initializes the default distributed process group, and this will also
initialize the distributed package.
Initializes the default distributed process group.

Args:
rank (int): Rank of the current process (it should be a
Expand All @@ -347,9 +346,9 @@ async def init_process_group(
``nccl``. If the backend is not provided, then a ``gloo`` backend
will be created.

device_id(int, optional): GPU ID the actor will bind, default ``None``
If it is None and backend is gloo, it will try to get it from the environment variable COLLECTIVE_DEVICE_ID_ENV_KEY.
If the environment variable is not set either, it will return an error.
device_id(int, optional): GPU id that the actor will bind, default ``None``.
If it is ``None`` and backend is ``gloo``, it will try to get it from the environment variable ``COLLECTIVE_DEVICE_ID_ENV_KEY``.
If the environment variable is not set either, it will return an error.

address(str, optional): actor address. default ``None``
"""
Expand Down Expand Up @@ -394,12 +393,10 @@ async def new_group(

This function requires that all processes in the main group (i.e. all
processes that are part of the distributed job) enter this function, even
if they are not going to be members of the group. Additionally, groups
should be created in the same order in all processes.
if they are not going to be members of the group.

Args:
ranks (list[int]): List of ranks of group members. If ``None``, will be
set to all ranks. Default is ``None``.
ranks (list[int]): List of ranks of group members.

pg_options (ProcessGroupOptions, optional): process group options
specifying what additional options need to be passed in during
Expand Down Expand Up @@ -476,11 +473,9 @@ async def allreduce(
the final result.

Args:
send_data (Any): Input of the collective. The function
operates in-place.
send_data (Any): Input of the collective.

recv_data (Any): Output of the collective. The function
operates in-place.
recv_data (Any): Output of the collective.

op (xoscar.collective.common.CollectiveReduceOp): One of the values from
``xoscar.collective.common.CollectiveReduceOp``
Expand Down Expand Up @@ -632,6 +627,7 @@ async def reduce_scatter(
):
"""
Reduces, then scatters a list of numpy or cupy data to all processes in a group.
It can be only used on linux.

Args:
send_data (Any): Input data.
Expand Down Expand Up @@ -706,7 +702,7 @@ async def broadcast(
stream: Optional[Any] = None,
):
"""
Broadcasts the tensor to the whole group.
Broadcasts the numpy or cupy data to the whole group.

data must have the same number of elements in all processes
participating in the collective.
Expand Down