diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3170d56f43..f204e7ecdb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -126,10 +126,17 @@ jobs: matrix: platform: [amd64, arm64] toolchain: [llvm, clang16, gcc12] + mpi: [openmpi, mpich] + exclude: + - toolchain: llvm + mpi: mpich + - toolchain: clang16 + mpi: mpich fail-fast: false uses: ./.github/workflows/test_in_devenv.yml with: platform: linux/${{ matrix.platform }} + mpi: ${{ matrix.mpi }} devdeps_image: ${{ fromJson(needs.config.outputs.json).image_hash[format('{0}-{1}', matrix.platform, matrix.toolchain)] }} devdeps_cache: ${{ fromJson(needs.config.outputs.json).cache_key[format('{0}-{1}', matrix.platform, matrix.toolchain)] }} devdeps_archive: ${{ fromJson(needs.config.outputs.json).tar_archive[format('{0}-{1}', matrix.platform, matrix.toolchain)] }} diff --git a/.github/workflows/publishing.yml b/.github/workflows/publishing.yml index 6e8e51d6e5..3ddc5a39a8 100644 --- a/.github/workflows/publishing.yml +++ b/.github/workflows/publishing.yml @@ -607,6 +607,35 @@ jobs: exit $status_sum fi + - name: MPI validation + shell: bash + run: | + set +e # Allow script to keep going through errors + for ex in `find /home/cudaq/examples/other/distributed/ -name '*.cpp'`; do + # Set CUDAQ_ENABLE_MPI_EXAMPLE to activate these examples. + nvq++ -DCUDAQ_ENABLE_MPI_EXAMPLE=1 $ex + status=$? + if [ $status -eq 0 ]; then + # Run with mpiexec + mpiexec --allow-run-as-root -np 4 ./a.out + status=$? + if [ $status -eq 0 ]; then + echo ":white_check_mark: Successfully ran $filename." >> $GITHUB_STEP_SUMMARY + else + echo ":x: Failed to execute $filename." >> $GITHUB_STEP_SUMMARY + status_sum=$((status_sum+1)) + fi + else + echo ":x: Compilation failed for $filename." >> $GITHUB_STEP_SUMMARY + status_sum=$((status_sum+1)) + fi + done + set -e # Re-enable exit code error checking + if [ ! $status_sum -eq 0 ]; then + echo "::error::$status_sum examples failed; see step summary for a list of failures." + exit $status_sum + fi + create_release: name: CUDA Quantum Release needs: [assets, cudaq_hpc, cudaq_wheels, validation] diff --git a/.github/workflows/python_wheels.yml b/.github/workflows/python_wheels.yml index b62b49f472..b33b002d59 100644 --- a/.github/workflows/python_wheels.yml +++ b/.github/workflows/python_wheels.yml @@ -216,6 +216,50 @@ jobs: fi done + - name: Run Python MPI tests + if: matrix.os_image == 'redhat/ubi9:9.2' + uses: ./.github/actions/run-in-docker + with: + image: wheel_validation:local + shell: bash + run: | + # Install openmpi and mpi4py by conda + # Install conda + dnf install -y --nobest --setopt=install_weak_deps=False wget openssh-clients.$(uname -m) + mkdir -p ~/miniconda3 + wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-$(uname -m).sh -O ~/miniconda3/miniconda.sh + bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 + ~/miniconda3/bin/conda init bash + source ~/.bashrc + # Extract the setup script from Python wheel's readme + readme_file="/tmp/README.md" + python_version="${{ inputs.python_version }}" + # Parse README file to install openmpi + conda_script="$(awk '/(Begin conda install)/{flag=1;next}/(End conda install)/{flag=0}flag' $readme_file | grep . | sed '/^```/d')" + # Skip the installation of CUDA since we don't need CUDA for this test. + regex='conda install -y -n cuda-quantum.*cuda' + while IFS= read -r line; do + if [[ ! "$line" =~ $regex ]]; then + # Replace Python version + line=${line//3.10/$python_version} + # Install the wheel file + line=${line//pip install cuda-quantum/pip install /tmp/cuda_quantum-0.0.0-*-manylinux_*_$(uname -m).whl} + eval "$line" + fi + done <<< "$conda_script" + ompi_script="$(awk '/(Begin ompi setup)/{flag=1;next}/(End ompi setup)/{flag=0}flag' $readme_file | grep . | sed '/^```/d')" + while IFS= read -r line; do + eval "$line" + done <<< "$ompi_script" + # Run the MPI test + python${{ inputs.python_version }} -m pip install pytest numpy + mpirun --allow-run-as-root -np 4 python${{ inputs.python_version }} -m pytest -v /tmp/tests/parallel/test_mpi_api.py + pytest_mpi_status=$? + if [ ! $pytest_mpi_status -eq 0 ]; then + echo "::error file=python_wheel.yml::Python MPI plugin test failed with status $pytest_mpi_status." + exit 1 + fi + - name: Validate Python examples run: | docker run --rm -dit --name wheel-validation wheel_validation:local diff --git a/.github/workflows/test_in_devenv.yml b/.github/workflows/test_in_devenv.yml index 6c929486a8..d5806023cc 100644 --- a/.github/workflows/test_in_devenv.yml +++ b/.github/workflows/test_in_devenv.yml @@ -5,6 +5,10 @@ on: type: string required: false default: linux/amd64 + mpi: + type: string + required: false + default: openmpi devdeps_image: required: false type: string @@ -65,7 +69,8 @@ jobs: DOCKER_BUILDKIT=1 docker build --platform ${{ inputs.platform }} \ -t cuda-quantum-dev:local -f docker/build/cudaq.dev.Dockerfile . \ --build-arg base_image=$base_image \ - --build-arg install="CMAKE_BUILD_TYPE=Debug" + --build-arg install="CMAKE_BUILD_TYPE=Debug" \ + --build-arg mpi="${{ inputs.mpi }}" - name: Test CUDA Quantum uses: ./.github/actions/run-in-docker @@ -84,6 +89,52 @@ jobs: exit 1 fi + - name: Test CUDA Quantum MPI4Py Plugin + uses: ./.github/actions/run-in-docker + with: + image: cuda-quantum-dev:local + shell: bash + run: | + cd $CUDAQ_REPO_ROOT + python3 -m pip install mpi4py~=3.1 + rm -f build/lib/plugins/libcudaq-comm-plugin.so + ctest --test-dir build -R MPIApiTest -V + mpi4py_status=$? + if [ ! $mpi4py_status -eq 0 ] ; then + echo "::error file=test_in_devenv.yml::Test CUDA Quantum MPI4Py Plugin failed with status $mpi4py_status." + exit 1 + fi + + - name: Test CUDA Quantum MPI Plugin Activation + uses: ./.github/actions/run-in-docker + with: + image: cuda-quantum-dev:local + shell: bash + run: | + # Set MPI_PATH depending on OMPI/MPICH + has_ompiinfo=$(which ompi_info || true) + if [[ ! -z $has_ompiinfo ]]; then + export MPI_PATH="/usr/lib/$(uname -m)-linux-gnu/openmpi/" + else + export MPI_PATH="/usr/lib/$(uname -m)-linux-gnu/mpich/" + fi + # Run the activation script + source $CUDAQ_INSTALL_PREFIX/distributed_interfaces/activate_custom_mpi.sh + external_plugin_build_status=$? + if [ ! $external_plugin_build_status -eq 0 ] ; then + echo "::error file=test_in_devenv.yml::Test CUDA Quantum MPI Plugin Activation failed to activate the plugin with status $external_plugin_build_status." + exit 1 + fi + echo $CUDAQ_MPI_COMM_LIB + # Rerun the MPI plugin test + cd $CUDAQ_REPO_ROOT + ctest --test-dir build -R MPIApiTest -V + external_plugin_status=$? + if [ ! $external_plugin_status -eq 0 ] ; then + echo "::error file=test_in_devenv.yml::Test CUDA Quantum MPI Plugin Activation failed with status $external_plugin_status." + exit 1 + fi + - name: Save environment id: env_save if: inputs.export_environment diff --git a/CMakeLists.txt b/CMakeLists.txt index e0c883fd9d..1114cdb0eb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -432,6 +432,8 @@ if (NOT CUDAQ_SKIP_MPI) find_package(MPI COMPONENTS CXX) if (MPI_FOUND) message(STATUS "MPI CXX Found: ${MPIEXEC}") + # Build the built-in MPI Comm plugin + add_subdirectory(runtime/cudaq/distributed/builtin) endif() endif() diff --git a/docker/build/cudaq.dev.Dockerfile b/docker/build/cudaq.dev.Dockerfile index 8f6cf20c30..8b393e7a47 100644 --- a/docker/build/cudaq.dev.Dockerfile +++ b/docker/build/cudaq.dev.Dockerfile @@ -35,6 +35,17 @@ ARG destination="$CUDAQ_REPO_ROOT" ADD "$workspace" "$destination" WORKDIR "$destination" +# mpich or openmpi +ARG mpi= +RUN if [ -n "$mpi" ]; \ + then \ + if [ ! -z "$MPI_PATH" ]; then \ + echo "Using a base image with MPI is not supported when passing a 'mpi' build argument." && exit 1; \ + else \ + apt update && apt install -y lib$mpi-dev ; \ + fi \ + fi + # Configuring a base image that contains the necessary dependencies for GPU # accelerated components and passing a build argument # install="CMAKE_BUILD_TYPE=Release FORCE_COMPILE_GPU_COMPONENTS=true" diff --git a/docker/test/debian.Dockerfile b/docker/test/debian.Dockerfile index 90912f0c9d..65d2f0d99d 100644 --- a/docker/test/debian.Dockerfile +++ b/docker/test/debian.Dockerfile @@ -32,6 +32,7 @@ ARG cuda_quantum_wheel=cuda_quantum-0.0.0-cp311-cp311-manylinux_2_28_x86_64.whl COPY $cuda_quantum_wheel /tmp/$cuda_quantum_wheel COPY docs/sphinx/examples/python /tmp/examples/ COPY python/tests /tmp/tests/ +COPY python/README.md /tmp/README.md RUN if [ -n "$pip_install_flags" ]; then \ # We can't install with a --user flag in a virtual environment unless we enable this. diff --git a/docker/test/fedora.Dockerfile b/docker/test/fedora.Dockerfile index 1605ea3b4e..ddb903c7a3 100644 --- a/docker/test/fedora.Dockerfile +++ b/docker/test/fedora.Dockerfile @@ -26,6 +26,7 @@ ARG cuda_quantum_wheel=cuda_quantum-0.0.0-cp310-cp310-manylinux_2_28_x86_64.whl COPY $cuda_quantum_wheel /tmp/$cuda_quantum_wheel COPY docs/sphinx/examples/python /tmp/examples/ COPY python/tests /tmp/tests/ +COPY python/README.md /tmp/README.md RUN python${python_version} -m pip install ${pip_install_flags} /tmp/$cuda_quantum_wheel RUN if [ -n "$optional_dependencies" ]; then python${python_version} -m pip install cuda-quantum[$optional_dependencies]; fi \ No newline at end of file diff --git a/docker/test/opensuse.Dockerfile b/docker/test/opensuse.Dockerfile index 528550bbb3..3029e4a71c 100644 --- a/docker/test/opensuse.Dockerfile +++ b/docker/test/opensuse.Dockerfile @@ -27,6 +27,7 @@ ARG cuda_quantum_wheel=cuda_quantum-0.0.0-cp39-cp39-manylinux_2_28_x86_64.whl COPY $cuda_quantum_wheel /tmp/$cuda_quantum_wheel COPY docs/sphinx/examples/python /tmp/examples/ COPY python/tests /tmp/tests/ +COPY python/README.md /tmp/README.md RUN python${python_version} -m pip install ${pip_install_flags} /tmp/$cuda_quantum_wheel RUN if [ -n "$optional_dependencies" ]; then python${python_version} -m pip install cuda-quantum[$optional_dependencies]; fi diff --git a/docker/test/redhat.Dockerfile b/docker/test/redhat.Dockerfile index b5eb93c4bb..af601bf540 100644 --- a/docker/test/redhat.Dockerfile +++ b/docker/test/redhat.Dockerfile @@ -26,6 +26,7 @@ ARG cuda_quantum_wheel=cuda_quantum-0.0.0-cp311-cp311-manylinux_2_28_x86_64.whl COPY $cuda_quantum_wheel /tmp/$cuda_quantum_wheel COPY docs/sphinx/examples/python /tmp/examples/ COPY python/tests /tmp/tests/ +COPY python/README.md /tmp/README.md RUN python${python_version} -m pip install ${pip_install_flags} /tmp/$cuda_quantum_wheel RUN if [ -n "$optional_dependencies" ]; then python${python_version} -m pip install cuda-quantum[$optional_dependencies]; fi diff --git a/docker/test/ubuntu.Dockerfile b/docker/test/ubuntu.Dockerfile index eed34370fe..0151296c2c 100644 --- a/docker/test/ubuntu.Dockerfile +++ b/docker/test/ubuntu.Dockerfile @@ -25,6 +25,7 @@ ARG cuda_quantum_wheel=cuda_quantum-0.0.0-cp310-cp310-manylinux_2_28_x86_64.whl COPY $cuda_quantum_wheel /tmp/$cuda_quantum_wheel COPY docs/sphinx/examples/python /tmp/examples/ COPY python/tests /tmp/tests/ +COPY python/README.md /tmp/README.md RUN python${python_version} -m pip install ${pip_install_flags} /tmp/$cuda_quantum_wheel RUN if [ -n "$optional_dependencies" ]; then python${python_version} -m pip install cuda-quantum[$optional_dependencies]; fi diff --git a/docs/sphinx/api/languages/cpp_api.rst b/docs/sphinx/api/languages/cpp_api.rst index e124431415..b318f95302 100644 --- a/docs/sphinx/api/languages/cpp_api.rst +++ b/docs/sphinx/api/languages/cpp_api.rst @@ -166,6 +166,8 @@ Namespaces .. doxygenfunction:: cudaq::mpi::finalize .. doxygenfunction:: cudaq::mpi::rank .. doxygenfunction:: cudaq::mpi::num_ranks -.. doxygenfunction:: cudaq::mpi::all_gather +.. doxygenfunction:: cudaq::mpi::all_gather(std::vector &global, const std::vector &local) +.. doxygenfunction:: cudaq::mpi::all_gather(std::vector &global, const std::vector &local) .. doxygenfunction:: cudaq::mpi::all_reduce(const T&, const Func&) .. doxygenfunction:: cudaq::mpi::all_reduce(const T &localValue, const BinaryFunction &function) +.. doxygenfunction:: cudaq::mpi::broadcast diff --git a/docs/sphinx/api/languages/python_api.rst b/docs/sphinx/api/languages/python_api.rst index 1717b4c576..31524e3541 100644 --- a/docs/sphinx/api/languages/python_api.rst +++ b/docs/sphinx/api/languages/python_api.rst @@ -207,5 +207,6 @@ MPI Submodule .. automethod:: cudaq.mpi::rank .. automethod:: cudaq.mpi::num_ranks .. automethod:: cudaq.mpi::all_gather +.. automethod:: cudaq.mpi::broadcast .. automethod:: cudaq.mpi::is_initialized .. automethod:: cudaq.mpi::finalize \ No newline at end of file diff --git a/docs/sphinx/examples/cpp/other/distributed/mpi.cpp b/docs/sphinx/examples/cpp/other/distributed/mpi.cpp new file mode 100644 index 0000000000..33a2ffe5ee --- /dev/null +++ b/docs/sphinx/examples/cpp/other/distributed/mpi.cpp @@ -0,0 +1,68 @@ +/******************************************************************************* + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +// Compile and run with: +// ``` +// nvq++ mpi.cpp -DCUDAQ_ENABLE_MPI_EXAMPLE=1 -o mpi.x && mpiexec -np 4 ./mpi.x +// ``` + +// This example demonstrates CUDA Quantum MPI support. + +#ifndef CUDAQ_ENABLE_MPI_EXAMPLE +#define CUDAQ_ENABLE_MPI_EXAMPLE 0 +#endif + +#include + +int main(int argc, char **argv) { +#if CUDAQ_ENABLE_MPI_EXAMPLE == 0 + return 0; +#else + // Initialize MPI + cudaq::mpi::initialize(); + + if (cudaq::mpi::rank() == 0) + printf("Running MPI example with %d processes.\n", cudaq::mpi::num_ranks()); + + using namespace cudaq::spin; + cudaq::spin_op h = 5.907 - 2.1433 * x(0) * x(1) - 2.1433 * y(0) * y(1) + + .21829 * z(0) - 6.125 * z(1); + auto ansatz = [](double theta) __qpu__ { + cudaq::qubit q, r; + x(q); + ry(theta, r); + x(r, q); + }; + + // In addition to the built-in `MQPU` platform, users can construct MPI + // application directly using CUDA Quantum MPI support. + const auto allParams = + cudaq::random_vector(-M_PI, M_PI, cudaq::mpi::num_ranks()); + + // For example, each MPI process can run `cudaq::observe` for a different + // parameter. + const double rankParam = allParams[cudaq::mpi::rank()]; + const double rankResult = cudaq::observe(ansatz, h, rankParam); + printf("[Process %d]: Energy(%lf) = %lf.\n", cudaq::mpi::rank(), rankParam, + rankResult); + // Then, using `cudaq::mpi::all_gather` to collect all the results. + std::vector gatherData(cudaq::mpi::num_ranks()); + cudaq::mpi::all_gather(gatherData, {rankResult}); + if (cudaq::mpi::rank() == 0) { + printf("Gathered data from all ranks: \n"); + for (const auto &x : gatherData) + printf("%lf\n", x); + } + + // Verify that the data has been assembled as expected. + if (std::abs(gatherData[cudaq::mpi::rank()] - rankResult) > 1e-12) + return -1; + cudaq::mpi::finalize(); + return 0; +#endif +} diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index c57959ddc6..440a588acd 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -72,6 +72,7 @@ else() install(TARGETS _pycudaq LIBRARY DESTINATION cudaq) endif() +add_subdirectory(runtime/cudaq/distributed) if (NOT CUDAQ_DISABLE_CPP_FRONTEND) add_subdirectory(runtime/cudaq/domains/plugins) endif() diff --git a/python/README.md b/python/README.md index a85c78832d..48afe9fd4c 100644 --- a/python/README.md +++ b/python/README.md @@ -42,6 +42,8 @@ you can install a minimal version following the instructions The following commands will create and activate a complete environment for CUDA Quantum with all its dependencies: +[//]: # (Begin conda install) + ```console conda create -y -n cuda-quantum python==3.10 pip conda install -y -n cuda-quantum -c "nvidia/label/cuda-11.8.0" cuda @@ -51,12 +53,18 @@ CUDA Quantum with all its dependencies: conda activate cuda-quantum ``` +[//]: # (End conda install) + You must configure MPI by setting the following environment variables: +[//]: # (Begin ompi setup) + ```console export OMPI_MCA_opal_cuda_support=true OMPI_MCA_btl='^openib' ``` +[//]: # (End ompi setup) + *If you do not set these variables you may encounter a segmentation fault.* **Important**: It is *not* sufficient to set these variable within the conda diff --git a/python/cudaq/_cudaq.cpp b/python/cudaq/_cudaq.cpp index 914b90d514..ccac011714 100644 --- a/python/cudaq/_cudaq.cpp +++ b/python/cudaq/_cudaq.cpp @@ -72,8 +72,29 @@ PYBIND11_MODULE(_pycudaq, mod) { cudaq::mpi::all_gather(global, local); return global; }, - "Gather and scatter the `local` list, returning a concatenation of all " + "Gather and scatter the `local` list of floating-point numbers, " + "returning a concatenation of all " "lists across all ranks. The total global list size must be provided."); + mpiSubmodule.def( + "all_gather", + [](std::size_t globalVectorSize, std::vector &local) { + std::vector global(globalVectorSize); + cudaq::mpi::all_gather(global, local); + return global; + }, + "Gather and scatter the `local` list of integers, returning a " + "concatenation of all " + "lists across all ranks. The total global list size must be provided."); + mpiSubmodule.def( + "broadcast", + [](std::vector &data, std::size_t bcastSize, int rootRank) { + if (data.size() < bcastSize) + data.resize(bcastSize); + cudaq::mpi::broadcast(data, rootRank); + return data; + }, + "Broadcast an array from a process (rootRank) to all other processes. " + "The size of broadcast array must be provided."); mpiSubmodule.def( "is_initialized", []() { return cudaq::mpi::is_initialized(); }, "Return true if MPI has already been initialized."); diff --git a/python/runtime/cudaq/distributed/CMakeLists.txt b/python/runtime/cudaq/distributed/CMakeLists.txt new file mode 100644 index 0000000000..b031bac3dc --- /dev/null +++ b/python/runtime/cudaq/distributed/CMakeLists.txt @@ -0,0 +1,22 @@ +# ============================================================================ # +# Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +message(STATUS "Building Python MPI Comm plugin based on mpi4py") +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib/plugins) +set(CMAKE_INSTALL_RPATH "$ORIGIN:$ORIGIN/..") +# IMPORTANT: Don't change this lib name without updating the getMpiPlugin function +set(LIBRARY_NAME cudaq-py-comm-plugin) +add_library(${LIBRARY_NAME} SHARED mpi_comm_impl.cpp) +if (NOT SKBUILD) + target_link_libraries(${LIBRARY_NAME} PRIVATE pybind11::embed) +else() + target_link_libraries(${LIBRARY_NAME} PRIVATE pybind11::module) + target_link_options(${LIBRARY_NAME} PRIVATE -Wl,--unresolved-symbols=ignore-in-object-files) +endif() +target_include_directories(${LIBRARY_NAME} PRIVATE ${CMAKE_SOURCE_DIR}/runtime/cudaq/distributed) +install(TARGETS ${LIBRARY_NAME} DESTINATION lib/plugins) diff --git a/python/runtime/cudaq/distributed/mpi_comm_impl.cpp b/python/runtime/cudaq/distributed/mpi_comm_impl.cpp new file mode 100644 index 0000000000..9e4052eaf0 --- /dev/null +++ b/python/runtime/cudaq/distributed/mpi_comm_impl.cpp @@ -0,0 +1,616 @@ +/******************************************************************************* + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +/*! \file mpi_comm_impl.cpp + \brief Implementation of CUDAQ MPI interface wrapper based on mpi4py + + If a natively-built MPI wrapper plugin is not available, CUDA Quantum also + provides a plugin implementation based on Python's mpi4py lib via embedded + interpreter. Rationale: this plugin targets CUDA Quantum wheel distribution: + we ship pre-built wheel binary and leverage mpi4py build-from-source + distribution mechanism to get MPI support for Python users. +*/ + +#include "distributed_capi.h" +#include +#include +#include +#include +#include +#include +#include + +namespace py = pybind11; + +namespace { + +/// @brief Reference to the pybind11 scoped interpreter +thread_local static std::unique_ptr interp; + +/// @brief Did we find mpi4py during library load? +static bool mpi4pyFound = false; + +/// @brief True if this plugin did the MPI_Init call +bool initCalledByThis = false; + +/// @brief Convert supported data type enum to the corresponding mpi4py's MPI +/// type +py::object convertType(DataType dataType) { + auto mpiMod = py::module::import("mpi4py.MPI"); + switch (dataType) { + case INT_8: + return mpiMod.attr("INT8_T"); + case INT_16: + return mpiMod.attr("INT16_T"); + case INT_32: + return mpiMod.attr("INT32_T"); + case INT_64: + return mpiMod.attr("INT64_T"); + case FLOAT_32: + return mpiMod.attr("FLOAT"); + case FLOAT_64: + return mpiMod.attr("DOUBLE"); + case FLOAT_COMPLEX: + return mpiMod.attr("C_FLOAT_COMPLEX"); + case DOUBLE_COMPLEX: + return mpiMod.attr("C_DOUBLE_COMPLEX"); + } + __builtin_unreachable(); +} + +/// @brief Convert supported data type enum to the corresponding mpi4py's MPI +/// type +py::object convertTypeMinLoc(DataType dataType) { + auto mpiMod = py::module::import("mpi4py.MPI"); + switch (dataType) { + case FLOAT_32: + return mpiMod.attr("FLOAT_INT"); + case FLOAT_64: + return mpiMod.attr("DOUBLE_INT"); + default: + throw std::runtime_error("Unsupported MINLOC data type"); + } + __builtin_unreachable(); +} + +/// @brief Get size (in bytes) for a data type +std::size_t getDataSize(DataType dataType) { + switch (dataType) { + case INT_8: + return sizeof(int8_t); + case INT_16: + return sizeof(int16_t); + case INT_32: + return sizeof(int32_t); + case INT_64: + return sizeof(int64_t); + case FLOAT_32: + return sizeof(float); + case FLOAT_64: + return sizeof(double); + case FLOAT_COMPLEX: + return sizeof(std::complex); + case DOUBLE_COMPLEX: + return sizeof(std::complex); + } + __builtin_unreachable(); +} + +/// @brief Convert supported op type enum to the corresponding mpi4py's MPI type +py::object convertType(ReduceOp opType) { + auto mpiMod = py::module::import("mpi4py.MPI"); + switch (opType) { + case SUM: + return mpiMod.attr("SUM"); + case PROD: + return mpiMod.attr("PROD"); + case MIN: + return mpiMod.attr("MIN"); + case MIN_LOC: + return mpiMod.attr("MINLOC"); + } + __builtin_unreachable(); +} + +/// @brief Unpack the type-erased communicator object into a mpi4py's Comm +/// object +py::object unpackMpiCommunicator(const cudaqDistributedCommunicator_t *comm) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto mpi_obj = mpiMod.attr("Intracomm")(); + auto address = mpiMod.attr("_addressof")(mpi_obj).cast(); + void **pointer = reinterpret_cast(address); + pointer[0] = *reinterpret_cast(comm->commPtr); + return mpi_obj; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + throw std::runtime_error( + "Invalid distributed communicator encountered in CUDAQ mpi4py plugin."); + } +} + +/// @brief Tracking in-flight non-blocking send and receive requests. +struct PendingRequest { + py::object requests[2]; + int nActiveRequests; + PendingRequest() : nActiveRequests(0){}; + static std::mutex g_mutex; + static std::unordered_map + g_requests; +}; +std::mutex PendingRequest::g_mutex; +std::unordered_map + PendingRequest::g_requests; +} // namespace +extern "C" { +/// @brief Wrapper of MPI_Init +static int mpi_initialize(int32_t *argc, char ***argv) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + if (mpiMod.attr("Is_initialized")().cast()) + return 0; + + mpiMod.attr("Init")(); + initCalledByThis = true; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Finalize +static int mpi_finalize() { + PendingRequest::g_requests.clear(); + if (!initCalledByThis) + return 0; + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + if (mpiMod.attr("Is_finalized")().cast()) + return 0; + + mpiMod.attr("Finalize")(); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Initialized +static int mpi_initialized(int32_t *flag) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + if (mpiMod.attr("Is_initialized")().cast()) + *flag = 1; + else + *flag = 0; + + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Finalized +static int mpi_finalized(int32_t *flag) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + if (mpiMod.attr("Is_finalized")().cast()) + *flag = 1; + else + *flag = 0; + + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Comm_size +static int mpi_getNumRanks(const cudaqDistributedCommunicator_t *comm, + int32_t *size) { + try { + auto pyComm = unpackMpiCommunicator(comm); + + *size = pyComm.attr("Get_size")().cast(); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Comm_rank +static int mpi_getProcRank(const cudaqDistributedCommunicator_t *comm, + int32_t *rank) { + try { + auto pyComm = unpackMpiCommunicator(comm); + + *rank = pyComm.attr("Get_rank")().cast(); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Returns the size of the local subgroup of processes sharing node +/// memory +static int mpi_getCommSizeShared(const cudaqDistributedCommunicator_t *comm, + int32_t *numRanks) { + *numRanks = 0; + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto COMM_TYPE_SHARED = mpiMod.attr("COMM_TYPE_SHARED"); + auto MPI_Info = mpiMod.attr("Info"); + auto info = MPI_Info.attr("Create")(); + info.attr("Set")("mpi_hw_resource_type", "mpi_shared_memory"); + auto pyComm = unpackMpiCommunicator(comm); + int procRank = pyComm.attr("Get_rank")().cast(); + auto localComm = + pyComm.attr("Split_type")(COMM_TYPE_SHARED, procRank, info); + + int nranks = localComm.attr("Get_size")().cast(); + *numRanks = nranks; + localComm.attr("Free")(); + + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Barrier +static int mpi_Barrier(const cudaqDistributedCommunicator_t *comm) { + try { + auto pyComm = unpackMpiCommunicator(comm); + + pyComm.attr("Barrier")(); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Helper to pack native data as mpi4py's memory/buffer object +static py::object packData(const void *buffer, int32_t count, DataType dataType, + bool readOnly = false) { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto mpiBuffer = mpiMod.attr("memory"); + const auto bBytes = getDataSize(dataType) * count; + auto pyBuffer = + mpiBuffer.attr("fromaddress")((int64_t)buffer, bBytes, readOnly); + return pyBuffer; +} + +/// @brief Wrapper of MPI_Bcast +static int mpi_Bcast(const cudaqDistributedCommunicator_t *comm, void *buffer, + int32_t count, DataType dataType, int32_t rootRank) { + try { + auto pyComm = unpackMpiCommunicator(comm); + + pyComm.attr("Bcast")(packData(buffer, count, dataType), rootRank); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Allreduce +static int mpi_Allreduce(const cudaqDistributedCommunicator_t *comm, + const void *sendBuffer, void *recvBuffer, + int32_t count, DataType dataType, ReduceOp opType) { + try { + auto pyComm = unpackMpiCommunicator(comm); + auto pyMpiType = (opType == MIN_LOC) ? convertTypeMinLoc(dataType) + : convertType(dataType); + py::tuple sendBuf = py::make_tuple( + packData(sendBuffer, count, dataType, true), count, pyMpiType); + py::tuple recvBuf = + py::make_tuple(packData(recvBuffer, count, dataType), count, pyMpiType); + pyComm.attr("Allreduce")(sendBuf, recvBuf, convertType(opType)); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Allreduce with MPI_IN_PLACE +static int mpi_AllreduceInplace(const cudaqDistributedCommunicator_t *comm, + void *recvBuffer, int32_t count, + DataType dataType, ReduceOp opType) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto inplaceAddr = mpiMod.attr("IN_PLACE"); + py::tuple recvBuf = py::make_tuple(packData(recvBuffer, count, dataType), + count, convertType(dataType)); + auto pyComm = unpackMpiCommunicator(comm); + + pyComm.attr("Allreduce")(inplaceAddr, recvBuf, convertType(opType)); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Allgather +static int mpi_Allgather(const cudaqDistributedCommunicator_t *comm, + const void *sendBuffer, void *recvBuffer, + int32_t count, DataType dataType) { + try { + auto pyComm = unpackMpiCommunicator(comm); + const auto size = pyComm.attr("Get_size")().cast(); + pyComm.attr("Allgather")(packData(sendBuffer, count, dataType, true), + packData(recvBuffer, size * count, dataType)); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Allgatherv +static int mpi_AllgatherV(const cudaqDistributedCommunicator_t *comm, + const void *sendBuf, int sendCount, void *recvBuf, + const int *recvCounts, const int *displs, + DataType dataType) { + try { + auto pyComm = unpackMpiCommunicator(comm); + const auto size = pyComm.attr("Get_size")().cast(); + py::array_t pyRecvCounts(size); + py::array_t pyDispls(size); + auto pyRecvCountsData = pyRecvCounts.mutable_unchecked<1>(); + auto pyDisplsData = pyDispls.mutable_unchecked<1>(); + for (int i = 0; i < size; ++i) { + pyRecvCountsData(i) = recvCounts[i]; + pyDisplsData(i) = displs[i]; + } + pyComm.attr("Allgatherv")( + packData(sendBuf, sendCount, dataType, true), + py::make_tuple(packData(recvBuf, size * sendCount, dataType), + pyRecvCounts, pyDispls, convertType(dataType))); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } + return 0; +} + +/// @brief Wrapper of MPI_Isend and track pending requests for synchronization +static int mpi_SendAsync(const cudaqDistributedCommunicator_t *comm, + const void *buf, int count, DataType dataType, + int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests == 2) + return -1; + try { + auto pyComm = unpackMpiCommunicator(comm); + auto request = + pyComm.attr("Isend")(packData(buf, count, dataType, true), peer, tag); + PendingRequest::g_requests[comm] + .requests[PendingRequest::g_requests[comm].nActiveRequests] = request; + ++PendingRequest::g_requests[comm].nActiveRequests; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } + return 0; +} + +/// @brief Wrapper of MPI_Irecv and track pending requests for synchronization +static int mpi_RecvAsync(const cudaqDistributedCommunicator_t *comm, void *buf, + int count, DataType dataType, int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests == 2) + return -1; + try { + auto pyComm = unpackMpiCommunicator(comm); + auto request = + pyComm.attr("Irecv")(packData(buf, count, dataType), peer, tag); + PendingRequest::g_requests[comm] + .requests[PendingRequest::g_requests[comm].nActiveRequests] = request; + ++PendingRequest::g_requests[comm].nActiveRequests; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } + return 0; +} + +/// @brief Combined MPI_Isend and MPI_Irecv requests +static int mpi_SendRecvAsync(const cudaqDistributedCommunicator_t *comm, + const void *sendbuf, void *recvbuf, int count, + DataType dataType, int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests != 0) + return -1; + try { + auto pyComm = unpackMpiCommunicator(comm); + auto sendRequest = pyComm.attr("Isend")( + packData(sendbuf, count, dataType, true), peer, tag); + auto recvRequest = + pyComm.attr("Irecv")(packData(recvbuf, count, dataType), peer, tag); + PendingRequest::g_requests[comm].requests[0] = sendRequest; + PendingRequest::g_requests[comm].requests[1] = recvRequest; + PendingRequest::g_requests[comm].nActiveRequests = 2; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } + return 0; +} + +/// @brief Wait for in-flight MPI_Isend and MPI_Irecv to complete +static int mpi_Synchronize(const cudaqDistributedCommunicator_t *comm) { + std::scoped_lock lock(PendingRequest::g_mutex); + try { + auto pyComm = unpackMpiCommunicator(comm); + for (int i = 0; i < PendingRequest::g_requests[comm].nActiveRequests; ++i) { + PendingRequest::g_requests[comm].requests[i].attr("Wait")(); + } + PendingRequest::g_requests[comm].nActiveRequests = 0; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } + return 0; +} + +/// @brief Wrapper of MPI_Abort +static int mpi_Abort(const cudaqDistributedCommunicator_t *comm, + int errorCode) { + try { + auto pyComm = unpackMpiCommunicator(comm); + pyComm.attr("Abort")(errorCode); + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Helper to convert mpi4py's Comm handle to void* +static void *voidPtrCast(py::handle src) { + PyObject *source = src.ptr(); + PyObject *tmp = PyNumber_Long(source); + if (!tmp) + return nullptr; + void *casted = PyLong_AsVoidPtr(tmp); + Py_DECREF(tmp); + return casted; +} + +/// @brief Wrapper of MPI_Comm_dup +static int mpi_CommDup(const cudaqDistributedCommunicator_t *comm, + cudaqDistributedCommunicator_t **newDupComm) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto pyComm = unpackMpiCommunicator(comm); + // Use std::deque to make sure pointers to elements are valid. + static std::deque> + dup_comms; + + const auto dup = pyComm.attr("Dup")(); + dup_comms.emplace_back(std::pair()); + auto &[newComm, commPtr] = dup_comms.back(); + commPtr = voidPtrCast(mpiMod.attr("_handleof")(dup)); + newComm.commPtr = &commPtr; + newComm.commSize = mpiMod.attr("_sizeof")(dup).cast(); + *newDupComm = &newComm; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Wrapper of MPI_Comm_split +static int mpi_CommSplit(const cudaqDistributedCommunicator_t *comm, + int32_t color, int32_t key, + cudaqDistributedCommunicator_t **newSplitComm) { + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto pyComm = unpackMpiCommunicator(comm); + // Use std::deque to make sure pointers to elements are valid. + static std::deque> + split_comms; + + const auto split = pyComm.attr("Split")(color, key); + split_comms.emplace_back( + std::pair()); + auto &[newComm, commPtr] = split_comms.back(); + commPtr = voidPtrCast(mpiMod.attr("_handleof")(split)); + newComm.commPtr = &commPtr; + newComm.commSize = mpiMod.attr("_sizeof")(split).cast(); + *newSplitComm = &newComm; + return 0; + } catch (std::exception &e) { + std::cerr << "[mpi4py] Caught exception \"" << e.what() << "\"\n"; + return 1; + } +} + +/// @brief Return the underlying MPI_Comm as a type-erased object +cudaqDistributedCommunicator_t *getMpiCommunicator() { + static cudaqDistributedCommunicator_t commWorld; + try { + auto mpiMod = py::module::import("mpi4py.MPI"); + auto pyCommWorld = mpiMod.attr("COMM_WORLD"); + static auto commPtr = + (void *)(mpiMod.attr("_handleof")(pyCommWorld).cast()); + commWorld.commPtr = &commPtr; + commWorld.commSize = + mpiMod.attr("_sizeof")(pyCommWorld).cast(); + } catch (std::exception &) { + commWorld.commPtr = nullptr; + commWorld.commSize = 0; + } + return &commWorld; +} + +/// @brief Return the MPI shim interface (as a function table) +cudaqDistributedInterface_t *getDistributedInterface() { + static cudaqDistributedInterface_t cudaqDistributedInterface{ + CUDAQ_DISTRIBUTED_INTERFACE_VERSION, + mpi_initialize, + mpi_finalize, + mpi_initialized, + mpi_finalized, + mpi_getNumRanks, + mpi_getProcRank, + mpi_getCommSizeShared, + mpi_Barrier, + mpi_Bcast, + mpi_Allreduce, + mpi_AllreduceInplace, + mpi_Allgather, + mpi_AllgatherV, + mpi_SendAsync, + mpi_RecvAsync, + mpi_SendRecvAsync, + mpi_Synchronize, + mpi_Abort, + mpi_CommDup, + mpi_CommSplit}; + return &cudaqDistributedInterface; +} +} + +__attribute__((constructor)) void dllMain() { + if (Py_IsInitialized() == 0) { + // Create a scoped interpreter if none exists. + // Note: if this was invoked from Python, an interpreter is already active. + interp = std::make_unique(); + } + try { + py::module::import("mpi4py"); + } catch (std::exception &e) { + // mpi4py not installed + mpi4pyFound = false; + return; + } + mpi4pyFound = true; + // Disable auto init + // https://mpi4py.readthedocs.io/en/stable/mpi4py.html#mpi4py.mpi4py.rc.initialize + auto mpiRcMod = py::module::import("mpi4py.rc"); + mpiRcMod.attr("initialize") = false; +} \ No newline at end of file diff --git a/python/tests/parallel/test_mpi_api.py b/python/tests/parallel/test_mpi_api.py new file mode 100644 index 0000000000..cd79d7a458 --- /dev/null +++ b/python/tests/parallel/test_mpi_api.py @@ -0,0 +1,62 @@ +# ============================================================================ # +# Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +import os, pytest, importlib +import cudaq +import numpy as np + +skipIfUnsupported = pytest.mark.skipif(importlib.util.find_spec('mpi4py') + is None, + reason="mpi4py not found") + + +@skipIfUnsupported +def testMPI(): + cudaq.mpi.initialize() + assert cudaq.mpi.is_initialized() == True + # Check rank API + if os.environ.get('OMPI_COMM_WORLD_RANK') is not None: + print("Rank:", os.environ.get('OMPI_COMM_WORLD_RANK')) + assert cudaq.mpi.rank() == int(os.environ.get('OMPI_COMM_WORLD_RANK')) + + if os.environ.get('OMPI_COMM_WORLD_SIZE') is not None: + assert cudaq.mpi.num_ranks() == int( + os.environ.get('OMPI_COMM_WORLD_SIZE')) + + # all_gather integers + localData = [cudaq.mpi.rank()] + gatherData = cudaq.mpi.all_gather(cudaq.mpi.num_ranks(), localData) + assert len(gatherData) == cudaq.mpi.num_ranks() + for idx, x in enumerate(gatherData): + assert x == idx + + # all_gather floats + localData = [float(cudaq.mpi.rank())] + gatherData = cudaq.mpi.all_gather(cudaq.mpi.num_ranks(), localData) + assert len(gatherData) == cudaq.mpi.num_ranks() + for idx, x in enumerate(gatherData): + assert abs(gatherData[idx] - float(idx)) < 1e-12 + + # Broadcast + ref_data = [1.0, 2.0, 3.0] + if cudaq.mpi.rank() == 0: + data = ref_data + else: + data = [] + + data = cudaq.mpi.broadcast(data, len(ref_data), 0) + for idx, x in enumerate(data): + assert abs(x - ref_data[idx]) < 1e-12 + + cudaq.mpi.finalize() + + +# leave for gdb debugging +if __name__ == "__main__": + loc = os.path.abspath(__file__) + pytest.main([loc, "-s"]) diff --git a/runtime/common/PluginUtils.h b/runtime/common/PluginUtils.h index 68d25806ba..1effaeee56 100644 --- a/runtime/common/PluginUtils.h +++ b/runtime/common/PluginUtils.h @@ -19,13 +19,14 @@ namespace cudaq { /// @param symbolName The name of the generator function /// @return The plugin instance template -PluginPointerType *getUniquePluginInstance(const std::string_view symbolName) { +PluginPointerType *getUniquePluginInstance(const std::string_view symbolName, + const char *libName = nullptr) { cudaq::info("Requesting {} plugin via symbol name {}.", typeid(PluginPointerType).name(), symbolName); std::mutex m; std::lock_guard l(m); using GetPluginFunction = PluginPointerType *(*)(); - auto handle = dlopen(NULL, RTLD_LAZY); + auto handle = dlopen(libName, RTLD_LAZY); GetPluginFunction fcn = (GetPluginFunction)(intptr_t)dlsym(handle, symbolName.data()); if (!fcn) diff --git a/runtime/cudaq.h b/runtime/cudaq.h index 69244dd99d..99c199c5ec 100644 --- a/runtime/cudaq.h +++ b/runtime/cudaq.h @@ -256,10 +256,22 @@ T all_reduce(const T &localValue, const BinaryFunction &function) { return details::allReduce(localValue, function); } -/// @brief Gather all vector data locally into the provided -/// global vector. Global vector must be sized to fit all +/// @brief Gather all vector data (floating point numbers) locally into the +/// provided global vector. +/// +/// Global vector must be sized to fit all vector +/// elements coming from individual ranks. +void all_gather(std::vector &global, const std::vector &local); + +/// @brief Gather all vector data (integers) locally into the provided +/// global vector. +/// +/// Global vector must be sized to fit all /// vector elements coming from individual ranks. -void all_gather(std::vector &global, std::vector &local); +void all_gather(std::vector &global, const std::vector &local); + +/// @brief Broadcast a vector from a process (rootRank) to all other processes. +void broadcast(std::vector &data, int rootRank); /// @brief Finalize MPI. This function /// is a no-op if there CUDA Quantum has not been built diff --git a/runtime/cudaq/CMakeLists.txt b/runtime/cudaq/CMakeLists.txt index 24e6ad3e55..753a91bfff 100644 --- a/runtime/cudaq/CMakeLists.txt +++ b/runtime/cudaq/CMakeLists.txt @@ -18,7 +18,8 @@ add_library(${LIBRARY_NAME} algorithms/state.cpp platform/quantum_platform.cpp qis/execution_manager.cpp - utils/cudaq_utils.cpp) + utils/cudaq_utils.cpp + distributed/mpi_plugin.cpp) if (CUDA_FOUND) enable_language(CUDA) @@ -48,11 +49,6 @@ else() PRIVATE nvqir fmt::fmt-header-only) endif() -if (MPI_CXX_FOUND) - target_compile_definitions(${LIBRARY_NAME} PRIVATE CUDAQ_HAS_MPI) - target_link_libraries(${LIBRARY_NAME} PRIVATE MPI::MPI_CXX) -endif() - add_subdirectory(qis/managers) add_subdirectory(algorithms) add_subdirectory(platform) @@ -65,3 +61,14 @@ install(EXPORT cudaq-targets FILE CUDAQTargets.cmake NAMESPACE cudaq:: DESTINATION lib/cmake/cudaq) + +# Install CUDAQ MPI interface header, reference implementation, and build script +# for manual activation on a local system if needed. +install( + FILES + distributed/distributed_capi.h + distributed/builtin/mpi_comm_impl.cpp + distributed/builtin/activate_custom_mpi.sh + DESTINATION distributed_interfaces + PERMISSIONS OWNER_WRITE OWNER_READ GROUP_READ WORLD_READ +) \ No newline at end of file diff --git a/runtime/cudaq/cudaq.cpp b/runtime/cudaq/cudaq.cpp index 3a4ae49cf2..22ef3152b0 100644 --- a/runtime/cudaq/cudaq.cpp +++ b/runtime/cudaq/cudaq.cpp @@ -15,80 +15,174 @@ #endif #include "cudaq/platform.h" #include "cudaq/utils/registry.h" +#include "distributed/mpi_plugin.h" #include +#include #include #include #include #include #include - -#ifdef CUDAQ_HAS_MPI -#include - namespace nvqir { void tearDownBeforeMPIFinalize(); void setRandomSeed(std::size_t); } // namespace nvqir namespace cudaq::mpi { +cudaq::MPIPlugin *getMpiPlugin(bool unsafe) { + // Locate and load the MPI comm plugin. + // Rationale: we don't want to explicitly link `libcudaq.so` against any + // specific MPI implementation for compatibility. Rather, MPI functionalities + // are encapsulated inside a runtime-loadable plugin. + static std::unique_ptr g_plugin; + if (!g_plugin) { + // Search priority: + // (1) Environment variable take precedence (e.g., by running the + // activation script) + // (2) Previously-activated custom plugin at its default location + // (3) Built-in comm plugin (e.g., docker container or build from source + // with MPI) + // (4) mpi4py-based wrapper + const char *mpiLibPath = std::getenv("CUDAQ_MPI_COMM_LIB"); + if (mpiLibPath) { + // The user has set the environment variable. + cudaq::info("Load MPI comm plugin from CUDAQ_MPI_COMM_LIB environment " + "variable at '{}'", + mpiLibPath); + g_plugin = std::make_unique(mpiLibPath); + } else { + // Try locate MPI plugins in the install directory + std::filesystem::path cudaqLibPath{cudaq::getCUDAQLibraryPath()}; + // First, look for the previously-activated plugin in the + // `distributed_interfaces/` directory. + const auto distributedInterfacesDir = + cudaqLibPath.parent_path().parent_path() / "distributed_interfaces"; + // Note: this file name must match the one defined in + // `activate_custom_mpi.sh`. + constexpr std::string_view activatedInterfaceLibFilename = + "libcudaq_distributed_interface_mpi.so"; + const auto activatedInterfaceLibFile = + distributedInterfacesDir / activatedInterfaceLibFilename; + if (std::filesystem::exists(activatedInterfaceLibFile)) { + cudaq::info("Load MPI comm plugin from '{}'", + activatedInterfaceLibFile.c_str()); + g_plugin = std::make_unique( + activatedInterfaceLibFile.c_str()); + } else { + const auto pluginsPath = cudaqLibPath.parent_path() / "plugins"; +#if defined(__APPLE__) && defined(__MACH__) + const std::string libSuffix = "dylib"; +#else + const std::string libSuffix = "so"; +#endif + // The builtin (native) plugin if present + const auto pluginLibFile = + pluginsPath / fmt::format("libcudaq-comm-plugin.{}", libSuffix); + // The mpi4py-based plugin + const auto pyPluginLibFile = + pluginsPath / fmt::format("libcudaq-py-comm-plugin.{}", libSuffix); + if (std::filesystem::exists(pluginLibFile)) { + cudaq::info("Load builtin MPI comm plugin from at '{}'", + pluginLibFile.c_str()); + g_plugin = std::make_unique(pluginLibFile.c_str()); + } else if (std::filesystem::exists(pyPluginLibFile)) { + cudaq::info("Try loading mpi4py MPI comm plugin from at '{}'", + pyPluginLibFile.c_str()); + g_plugin = + std::make_unique(pyPluginLibFile.c_str()); + // With mpi4py plugin, we need to check if it is actually working. + // If mpi4py is not present at runtime, we cannot use this plugin. + if (!g_plugin->isValid()) { + cudaq::info("Failed to load mpi4py MPI comm plugin (mpi4py is not " + "available)."); + // Don't use it since mpi4py is not available. + g_plugin.reset(); + } + } + } + } + } + if (!g_plugin) { + if (unsafe) + return nullptr; + else + throw std::runtime_error( + "No MPI support can be found when attempted to use cudaq::mpi APIs. " + "Please refer to the documentation for instructions to activate MPI " + "support."); + } + + return g_plugin.get(); +}; void initialize() { - int argc{0}; - char **argv = nullptr; - initialize(argc, argv); + auto *commPlugin = getMpiPlugin(); + commPlugin->initialize(); } void initialize(int argc, char **argv) { - int pid, np, thread_provided; - int mpi_error = - MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &thread_provided); - assert(mpi_error == MPI_SUCCESS && "MPI_Init_thread failed"); - assert(thread_provided == MPI_THREAD_MULTIPLE); - MPI_Comm_rank(MPI_COMM_WORLD, &pid); - MPI_Comm_size(MPI_COMM_WORLD, &np); + auto *commPlugin = getMpiPlugin(); + commPlugin->initialize(argc, argv); + const auto pid = commPlugin->rank(); + const auto np = commPlugin->num_ranks(); if (pid == 0) cudaq::info("MPI Initialized, nRanks = {}", np); } int rank() { - int pid; - MPI_Comm_rank(MPI_COMM_WORLD, &pid); - return pid; + auto *commPlugin = getMpiPlugin(); + return commPlugin->rank(); } int num_ranks() { - int np; - MPI_Comm_size(MPI_COMM_WORLD, &np); - return np; + auto *commPlugin = getMpiPlugin(); + return commPlugin->num_ranks(); } bool is_initialized() { - int i; - auto err = MPI_Initialized(&i); - assert(err == MPI_SUCCESS && "MPI_Initialized failed."); - return i == 1; + // Allow to probe is_initialized even without MPI support (hence unsafe = + // true) + auto *commPlugin = getMpiPlugin(true); + // If no MPI plugin is available, returns false (MPI is not initialized) + if (!commPlugin) + return false; + + return commPlugin->is_initialized(); } namespace details { -#define CUDAQ_ALL_REDUCE_IMPL(TYPE, MPI_TYPE, BINARY, MPI_OP) \ +#define CUDAQ_ALL_REDUCE_IMPL(TYPE, BINARY, REDUCE_OP) \ TYPE allReduce(const TYPE &local, const BINARY &) { \ - TYPE result; \ - MPI_Allreduce(&local, &result, 1, MPI_TYPE, MPI_OP, MPI_COMM_WORLD); \ - return result; \ + static_assert(std::is_floating_point::value, \ + "all_reduce argument must be a floating point number"); \ + std::vector result(1); \ + std::vector localVec{static_cast(local)}; \ + auto *commPlugin = getMpiPlugin(); \ + commPlugin->all_reduce(result, localVec, REDUCE_OP); \ + return static_cast(result.front()); \ } -CUDAQ_ALL_REDUCE_IMPL(float, MPI_FLOAT, std::plus, MPI_SUM) -CUDAQ_ALL_REDUCE_IMPL(float, MPI_FLOAT, std::multiplies, MPI_PROD) +CUDAQ_ALL_REDUCE_IMPL(float, std::plus, SUM) +CUDAQ_ALL_REDUCE_IMPL(float, std::multiplies, PROD) -CUDAQ_ALL_REDUCE_IMPL(double, MPI_DOUBLE, std::plus, MPI_SUM) -CUDAQ_ALL_REDUCE_IMPL(double, MPI_DOUBLE, std::multiplies, MPI_PROD) +CUDAQ_ALL_REDUCE_IMPL(double, std::plus, SUM) +CUDAQ_ALL_REDUCE_IMPL(double, std::multiplies, PROD) } // namespace details -void all_gather(std::vector &global, std::vector &local) { - MPI_Allgather(local.data(), local.size(), MPI_DOUBLE, global.data(), - local.size(), MPI_DOUBLE, MPI_COMM_WORLD); +#define CUDAQ_ALL_GATHER_IMPL(TYPE) \ + void all_gather(std::vector &global, const std::vector &local) { \ + auto *commPlugin = getMpiPlugin(); \ + commPlugin->all_gather(global, local); \ + } + +CUDAQ_ALL_GATHER_IMPL(double) +CUDAQ_ALL_GATHER_IMPL(int) + +void broadcast(std::vector &data, int rootRank) { + auto *commPlugin = getMpiPlugin(); + commPlugin->broadcast(data, rootRank); } void finalize() { @@ -98,51 +192,12 @@ void finalize() { // Inform the simulator that we are // about to run MPI Finalize nvqir::tearDownBeforeMPIFinalize(); - - // Check if finalize has been called. - int isFinalized; - MPI_Finalized(&isFinalized); - if (isFinalized) - return; - - // Finalize - int mpi_error = MPI_Finalize(); - assert(mpi_error == MPI_SUCCESS && "MPI_Finalize failed."); + auto *commPlugin = getMpiPlugin(); + if (!commPlugin->is_finalized()) + commPlugin->finalize(); } } // namespace cudaq::mpi -#else -namespace cudaq::mpi { - -void initialize() {} - -void initialize(int argc, char **argv) {} - -bool is_initialized() { return false; } - -int rank() { return 0; } - -int num_ranks() { return 1; } - -namespace details { - -#define CUDAQ_ALL_REDUCE_IMPL(TYPE, BINARY) \ - TYPE allReduce(const TYPE &local, const BINARY &) { return TYPE(); } - -CUDAQ_ALL_REDUCE_IMPL(float, std::plus) -CUDAQ_ALL_REDUCE_IMPL(float, std::multiplies) - -CUDAQ_ALL_REDUCE_IMPL(double, std::plus) -CUDAQ_ALL_REDUCE_IMPL(double, std::multiplies) - -} // namespace details - -void all_gather(std::vector &global, std::vector &local) {} - -void finalize() {} - -} // namespace cudaq::mpi -#endif namespace cudaq::__internal__ { std::map runtime_registered_mlir; diff --git a/runtime/cudaq/distributed/builtin/CMakeLists.txt b/runtime/cudaq/distributed/builtin/CMakeLists.txt new file mode 100644 index 0000000000..bc25b9cb90 --- /dev/null +++ b/runtime/cudaq/distributed/builtin/CMakeLists.txt @@ -0,0 +1,19 @@ +# ============================================================================ # +# Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +if (MPI_CXX_FOUND) + message(STATUS "Building default MPI Comm plugin") + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib/plugins) + set(CMAKE_INSTALL_RPATH "$ORIGIN:$ORIGIN/..") + # IMPORTANT: Don't change this lib name without updating the getMpiPlugin function + set(LIBRARY_NAME cudaq-comm-plugin) + add_library(${LIBRARY_NAME} SHARED mpi_comm_impl.cpp) + target_link_libraries(${LIBRARY_NAME} PRIVATE MPI::MPI_CXX) + target_include_directories(${LIBRARY_NAME} PRIVATE ..) + install(TARGETS ${LIBRARY_NAME} DESTINATION lib/plugins) +endif() \ No newline at end of file diff --git a/runtime/cudaq/distributed/builtin/activate_custom_mpi.sh b/runtime/cudaq/distributed/builtin/activate_custom_mpi.sh new file mode 100644 index 0000000000..49662a4aa1 --- /dev/null +++ b/runtime/cudaq/distributed/builtin/activate_custom_mpi.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +# ============================================================================ # +# Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +# This script will build and activate a **custom** CUDA Quantum MPI interface. +# Specifically, this script builds a MPI plugin interface against your local MPI installation +# and exports an environment variable $CUDAQ_MPI_COMM_LIB to tell CUDA Quantum to use this MPI Communicator plugin. +# +# It requires a GNU C++ compiler (g++). +# +# Please check/set the following environment variables: +# - $MPI_PATH = Path to your MPI library installation directory. +# If your MPI library is installed in system +# directories as opposed to its own (root) directory, +# ${MPI_PATH}/include is expected to contain the mpi.h header. +# ${MPI_PATH}/lib64 or ${MPI_PATH}/lib is expected to contain libmpi.so. +# +# Run: source /distributed_interfaces/activate_custom_mpi.sh +# +# You could add $CUDAQ_MPI_COMM_LIB to your ~/.bashrc file to persist the environment variable. + +if [ -z "${MPI_PATH}" ] +then + echo "Environment variable MPI_PATH is not set. Please set it to point to the MPI root directory!" + echo "Note that MPI_PATH/include is expected to contain the mpi.h header." + return +fi + +if [ -z "${CXX}" ]; then + CXX=g++ +fi + +this_file_dir=`dirname "$(readlink -f "${BASH_SOURCE[0]}")"` + +$CXX -shared -std=c++17 -fPIC \ + -I${MPI_PATH}/include \ + -I$this_file_dir/ \ + $this_file_dir/mpi_comm_impl.cpp \ + -L${MPI_PATH}/lib64 -L${MPI_PATH}/lib -lmpi \ + -o $this_file_dir/libcudaq_distributed_interface_mpi.so +export CUDAQ_MPI_COMM_LIB=$this_file_dir/libcudaq_distributed_interface_mpi.so diff --git a/runtime/cudaq/distributed/builtin/mpi_comm_impl.cpp b/runtime/cudaq/distributed/builtin/mpi_comm_impl.cpp new file mode 100644 index 0000000000..006ea8aa11 --- /dev/null +++ b/runtime/cudaq/distributed/builtin/mpi_comm_impl.cpp @@ -0,0 +1,376 @@ +/******************************************************************************* + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +/*! \file mpi_comm_impl.cpp + \brief Reference implementation of CUDAQ MPI interface wrapper + + This is an implementation of the MPI shim interface defined in + distributed_capi.h. This can be compiled and linked against an MPI + implementation (e.g., OpenMPI or MPICH) to produce a runtime loadable plugin + providing CUDA Quantum with necessary MPI functionalities. +*/ + +#include "distributed_capi.h" +#include +#include +#include +#include +#include +#include +#include + +// This MPI plugin does not use the [deprecated] C++ binding of MPI at ALL. The +// following flag makes sure the C++ bindings are not included. +#if !defined(MPICH_SKIP_MPICXX) +#define MPICH_SKIP_MPICXX 1 +#endif +#if !defined(OMPI_SKIP_MPICXX) +#define OMPI_SKIP_MPICXX 1 +#endif +#include + +namespace { +bool initCalledByThis = false; +MPI_Datatype convertType(DataType dataType) { + switch (dataType) { + case INT_8: + return MPI_INT8_T; + case INT_16: + return MPI_INT16_T; + case INT_32: + return MPI_INT32_T; + case INT_64: + return MPI_INT64_T; + case FLOAT_32: + return MPI_FLOAT; + case FLOAT_64: + return MPI_DOUBLE; + case FLOAT_COMPLEX: + return MPI_C_FLOAT_COMPLEX; + case DOUBLE_COMPLEX: + return MPI_C_DOUBLE_COMPLEX; + } + __builtin_unreachable(); +} + +MPI_Datatype convertTypeMinLoc(DataType dataType) { + switch (dataType) { + case FLOAT_32: + return MPI_FLOAT_INT; + case FLOAT_64: + return MPI_DOUBLE_INT; + default: + throw std::runtime_error("Unsupported MINLOC data type"); + } + __builtin_unreachable(); +} + +MPI_Op convertType(ReduceOp opType) { + switch (opType) { + case SUM: + return MPI_SUM; + case PROD: + return MPI_PROD; + case MIN: + return MPI_MIN; + case MIN_LOC: + return MPI_MINLOC; + } + __builtin_unreachable(); +} + +MPI_Comm unpackMpiCommunicator(const cudaqDistributedCommunicator_t *comm) { + if (comm->commPtr == NULL) + return MPI_COMM_NULL; + if (sizeof(MPI_Comm) != comm->commSize) { + printf("#FATAL: MPI_Comm object has unexpected size!\n"); + exit(EXIT_FAILURE); + } + return *((MPI_Comm *)(comm->commPtr)); +} + +/// @brief Tracking in-flight non-blocking send and receive requests. +struct PendingRequest { + MPI_Request requests[2]; + int nActiveRequests; + PendingRequest() : nActiveRequests(0){}; + static std::mutex g_mutex; + static std::unordered_map + g_requests; +}; + +std::mutex PendingRequest::g_mutex; +std::unordered_map + PendingRequest::g_requests; +} // namespace +extern "C" { + +/// @brief Wrapper of MPI_Init +static int mpi_initialize(int32_t *argc, char ***argv) { + int flag = 0; + int res = MPI_Initialized(&flag); + if (res != MPI_SUCCESS) + return res; + // This has been initialized, nothing to do. + if (flag) + return MPI_SUCCESS; + initCalledByThis = true; + return MPI_Init(argc, argv); +} + +/// @brief Wrapper of MPI_Finalize +static int mpi_finalize() { + if (!initCalledByThis) + return MPI_SUCCESS; + return MPI_Finalize(); +} + +/// @brief Wrapper of MPI_Initialized +static int mpi_initialized(int32_t *flag) { return MPI_Initialized(flag); } + +/// @brief Wrapper of MPI_Finalized +static int mpi_finalized(int32_t *flag) { return MPI_Finalized(flag); } + +/// @brief Wrapper of MPI_Comm_size +static int mpi_getNumRanks(const cudaqDistributedCommunicator_t *comm, + int32_t *size) { + return MPI_Comm_size(unpackMpiCommunicator(comm), size); +} + +/// @brief Wrapper of MPI_Comm_rank +static int mpi_getProcRank(const cudaqDistributedCommunicator_t *comm, + int32_t *rank) { + return MPI_Comm_rank(unpackMpiCommunicator(comm), rank); +} + +/// @brief Returns the size of the local subgroup of processes sharing node +/// memory +static int mpi_getCommSizeShared(const cudaqDistributedCommunicator_t *comm, + int32_t *numRanks) { + *numRanks = 0; + MPI_Info info; + MPI_Info_create(&info); + MPI_Info_set(info, "mpi_hw_resource_type", "mpi_shared_memory"); + int procRank = -1; + int mpiErr = MPI_Comm_rank(unpackMpiCommunicator(comm), &procRank); + if (mpiErr == MPI_SUCCESS) { + MPI_Comm localComm; + mpiErr = + MPI_Comm_split_type(unpackMpiCommunicator(comm), MPI_COMM_TYPE_SHARED, + procRank, info, &localComm); + if (mpiErr == MPI_SUCCESS) { + int nranks = 0; + mpiErr = MPI_Comm_size(localComm, &nranks); + *numRanks = nranks; + MPI_Comm_free(&localComm); + } + } + return mpiErr; +} + +/// @brief Wrapper of MPI_Barrier +static int mpi_Barrier(const cudaqDistributedCommunicator_t *comm) { + return MPI_Barrier(unpackMpiCommunicator(comm)); +} + +/// @brief Wrapper of MPI_Bcast +static int mpi_Bcast(const cudaqDistributedCommunicator_t *comm, void *buffer, + int32_t count, DataType dataType, int32_t rootRank) { + return MPI_Bcast(buffer, count, convertType(dataType), rootRank, + unpackMpiCommunicator(comm)); +} + +/// @brief Wrapper of MPI_Allreduce +static int mpi_Allreduce(const cudaqDistributedCommunicator_t *comm, + const void *sendBuffer, void *recvBuffer, + int32_t count, DataType dataType, ReduceOp opType) { + if (opType == MIN_LOC) { + return MPI_Allreduce(sendBuffer, recvBuffer, count, + convertTypeMinLoc(dataType), convertType(opType), + unpackMpiCommunicator(comm)); + } else { + return MPI_Allreduce(sendBuffer, recvBuffer, count, convertType(dataType), + convertType(opType), unpackMpiCommunicator(comm)); + } +} + +/// @brief Wrapper of MPI_Allreduce with MPI_IN_PLACE +static int mpi_AllreduceInplace(const cudaqDistributedCommunicator_t *comm, + void *recvBuffer, int32_t count, + DataType dataType, ReduceOp opType) { + return MPI_Allreduce(MPI_IN_PLACE, recvBuffer, count, convertType(dataType), + convertType(opType), unpackMpiCommunicator(comm)); +} + +/// @brief Wrapper of MPI_Allgather +static int mpi_Allgather(const cudaqDistributedCommunicator_t *comm, + const void *sendBuffer, void *recvBuffer, + int32_t count, DataType dataType) { + return MPI_Allgather(sendBuffer, count, convertType(dataType), recvBuffer, + count, convertType(dataType), + unpackMpiCommunicator(comm)); +} + +/// @brief Wrapper of MPI_Allgatherv +static int mpi_AllgatherV(const cudaqDistributedCommunicator_t *comm, + const void *sendBuf, int sendCount, void *recvBuf, + const int *recvCounts, const int *displs, + DataType dataType) { + return MPI_Allgatherv(sendBuf, sendCount, convertType(dataType), recvBuf, + recvCounts, displs, convertType(dataType), + unpackMpiCommunicator(comm)); +} + +/// @brief Wrapper of MPI_Isend and track pending requests for synchronization +static int mpi_SendAsync(const cudaqDistributedCommunicator_t *comm, + const void *buf, int count, DataType dataType, + int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests == 2) + return -1; + MPI_Request *request = + &(PendingRequest::g_requests[comm] + .requests[PendingRequest::g_requests[comm].nActiveRequests]); + int res = MPI_Isend(buf, count, convertType(dataType), peer, tag, + unpackMpiCommunicator(comm), request); + if (res != MPI_SUCCESS) { + MPI_Cancel(request); + return res; + } + ++PendingRequest::g_requests[comm].nActiveRequests; + return 0; +} + +/// @brief Wrapper of MPI_Irecv and track pending requests for synchronization +static int mpi_RecvAsync(const cudaqDistributedCommunicator_t *comm, void *buf, + int count, DataType dataType, int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests == 2) + return -1; + MPI_Request *request = + &(PendingRequest::g_requests[comm] + .requests[PendingRequest::g_requests[comm].nActiveRequests]); + int res = MPI_Irecv(buf, count, convertType(dataType), peer, tag, + unpackMpiCommunicator(comm), request); + if (res != MPI_SUCCESS) { + MPI_Cancel(request); + return res; + } + ++PendingRequest::g_requests[comm].nActiveRequests; + return 0; +} + +/// @brief Combined MPI_Isend and MPI_Irecv requests +static int mpi_SendRecvAsync(const cudaqDistributedCommunicator_t *comm, + const void *sendbuf, void *recvbuf, int count, + DataType dataType, int peer, int32_t tag) { + std::scoped_lock lock(PendingRequest::g_mutex); + if (PendingRequest::g_requests[comm].nActiveRequests != 0) + return -1; + MPI_Request *sendRequest = &(PendingRequest::g_requests[comm].requests[0]); + MPI_Request *recvRequest = &(PendingRequest::g_requests[comm].requests[1]); + int resSend = MPI_Isend(sendbuf, count, convertType(dataType), peer, tag, + unpackMpiCommunicator(comm), sendRequest); + int resRecv = MPI_Irecv(recvbuf, count, convertType(dataType), peer, tag, + unpackMpiCommunicator(comm), recvRequest); + if ((resSend != MPI_SUCCESS) || (resRecv != MPI_SUCCESS)) { + MPI_Cancel(sendRequest); + MPI_Cancel(recvRequest); + return resSend != MPI_SUCCESS ? resSend : resRecv; + } + PendingRequest::g_requests[comm].nActiveRequests = 2; + return 0; +} + +/// @brief Wait for in-flight MPI_Isend and MPI_Irecv to complete +static int mpi_Synchronize(const cudaqDistributedCommunicator_t *comm) { + std::scoped_lock lock(PendingRequest::g_mutex); + MPI_Status statuses[2]; + std::memset(statuses, 0, sizeof(statuses)); + int res = MPI_Waitall(PendingRequest::g_requests[comm].nActiveRequests, + PendingRequest::g_requests[comm].requests, statuses); + PendingRequest::g_requests[comm].nActiveRequests = 0; + return res; +} + +/// @brief Wrapper of MPI_Abort +static int mpi_Abort(const cudaqDistributedCommunicator_t *comm, + int errorCode) { + return MPI_Abort(unpackMpiCommunicator(comm), errorCode); +} + +/// @brief Wrapper of MPI_Comm_dup +static int mpi_CommDup(const cudaqDistributedCommunicator_t *comm, + cudaqDistributedCommunicator_t **newDupComm) { + // Use std::deque to make sure pointers to elements are valid. + static std::deque> + dup_comms; + dup_comms.emplace_back(std::pair()); + auto &[dupComm, newComm] = dup_comms.back(); + auto status = MPI_Comm_dup(unpackMpiCommunicator(comm), &dupComm); + newComm.commPtr = &dupComm; + newComm.commSize = sizeof(dupComm); + *newDupComm = &newComm; + return status; +} + +/// @brief Wrapper of MPI_Comm_split +static int mpi_CommSplit(const cudaqDistributedCommunicator_t *comm, + int32_t color, int32_t key, + cudaqDistributedCommunicator_t **newSplitComm) { + + // Use std::deque to make sure pointers to elements are valid. + static std::deque> + split_comms; + split_comms.emplace_back( + std::pair()); + auto &[splitComm, newComm] = split_comms.back(); + auto status = + MPI_Comm_split(unpackMpiCommunicator(comm), color, key, &splitComm); + newComm.commPtr = &splitComm; + newComm.commSize = sizeof(splitComm); + *newSplitComm = &newComm; + return status; +} + +/// @brief Return the underlying MPI_Comm as a type-erased object +cudaqDistributedCommunicator_t *getMpiCommunicator() { + static MPI_Comm pluginComm = MPI_COMM_WORLD; + static cudaqDistributedCommunicator_t commWorld{&pluginComm, + sizeof(pluginComm)}; + return &commWorld; +} + +/// @brief Return the MPI shim interface (as a function table) +cudaqDistributedInterface_t *getDistributedInterface() { + static cudaqDistributedInterface_t cudaqDistributedInterface{ + CUDAQ_DISTRIBUTED_INTERFACE_VERSION, + mpi_initialize, + mpi_finalize, + mpi_initialized, + mpi_finalized, + mpi_getNumRanks, + mpi_getProcRank, + mpi_getCommSizeShared, + mpi_Barrier, + mpi_Bcast, + mpi_Allreduce, + mpi_AllreduceInplace, + mpi_Allgather, + mpi_AllgatherV, + mpi_SendAsync, + mpi_RecvAsync, + mpi_SendRecvAsync, + mpi_Synchronize, + mpi_Abort, + mpi_CommDup, + mpi_CommSplit}; + return &cudaqDistributedInterface; +} +} diff --git a/runtime/cudaq/distributed/distributed_capi.h b/runtime/cudaq/distributed/distributed_capi.h new file mode 100644 index 0000000000..813a7eb8d9 --- /dev/null +++ b/runtime/cudaq/distributed/distributed_capi.h @@ -0,0 +1,123 @@ +/****************************************************************-*- C++ -*-**** + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +#pragma once +#include +#include + +/*! \file distributed_capi.h + \brief CUDA Quantum Shim C-API for MPI support + + This header file defines a wrapper interface for MPI functionalities that + CUDA Quantum and its backends need. The interface is defined in a + MPI-independent manner so that CUDA Quantum libraries doesn't need to be + linked against a particular MPI implementation. MPI support will be provided + at runtime via dynamical library loading. +*/ + +extern "C" { +/// @brief Type-erasure representation of a MPI communicator (MPI_Comm) +typedef struct { + /// @brief Pointer to the encapsulated MPI_Comm + /// i.e., MPI_Comm* -> void * conversion. + void *commPtr; + /// @brief Size of MPI_Comm type for checking/verification purposes. + std::size_t commSize; +} cudaqDistributedCommunicator_t; + +#define CUDAQ_DISTRIBUTED_INTERFACE_VERSION 1 + +/// @brief Data type that we support +// Plugin implementation need to convert it to MPI data type enum as needed. +enum DataType { + INT_8, + INT_16, + INT_32, + INT_64, + FLOAT_32, + FLOAT_64, + FLOAT_COMPLEX, + DOUBLE_COMPLEX +}; + +/// @brief Type of MPI reduce ops that we support +// Plugin implementation need to convert it to MPI enum as needed. +enum ReduceOp { SUM, PROD, MIN, MIN_LOC }; + +/// @brief Encapsulates MPI functionalities as a function table. +// Plugin implementations will redirect these functions into proper MPI API +// calls. +typedef struct { + /// @brief Version number for compatibility checking. + int version; + /// @brief MPI_Init + int (*initialize)(int32_t *, char ***); + /// @brief MPI_Finalize + int (*finalize)(); + /// @brief MPI_Initialized + int (*initialized)(int32_t *); + /// @brief MPI_Finalized + int (*finalized)(int32_t *); + /// @brief MPI_Comm_size + int (*getNumRanks)(const cudaqDistributedCommunicator_t *, int32_t *); + /// @brief MPI_Comm_rank + int (*getProcRank)(const cudaqDistributedCommunicator_t *, int32_t *); + /// Returns the size of the local subgroup of processes sharing node memory + int (*getCommSizeShared)(const cudaqDistributedCommunicator_t *comm, + int32_t *); + /// @brief MPI_Barrier + int (*Barrier)(const cudaqDistributedCommunicator_t *); + /// @brief MPI_Bcast + int (*Bcast)(const cudaqDistributedCommunicator_t *, void *, int32_t, + DataType, int32_t); + /// @brief MPI_Allreduce + int (*Allreduce)(const cudaqDistributedCommunicator_t *, const void *, void *, + int32_t, DataType, ReduceOp); + /// @brief MPI_Allreduce with MPI_IN_PLACE + int (*AllreduceInPlace)(const cudaqDistributedCommunicator_t *, void *, + int32_t, DataType, ReduceOp); + /// @brief MPI_Allgather + int (*Allgather)(const cudaqDistributedCommunicator_t *, const void *, void *, + int32_t, DataType); + /// @brief MPI_Allgatherv + int (*AllgatherV)(const cudaqDistributedCommunicator_t *, const void *, + int32_t, void *, const int32_t *, const int32_t *, + DataType); + /// @brief MPI_Isend + /// @note The MPI plugin API allows for a maximum of two concurrent + /// non-blocking requests (e.g., one `Isend` and one `Irecv`). Hence, + /// `Synchronize` should be called as appropriate to resolve in-flight + /// requests before making new ones. + int (*SendAsync)(const cudaqDistributedCommunicator_t *, const void *, int, + DataType, int, int32_t); + /// @brief MPI_Irecv + /// @note The MPI plugin API allows for a maximum of two concurrent + /// non-blocking requests (e.g., one `Isend` and one `Irecv`). Hence, + /// `Synchronize` should be called as appropriate to resolve in-flight + /// requests before making new ones. + int (*RecvAsync)(const cudaqDistributedCommunicator_t *, void *, int, + DataType, int, int32_t); + /// @brief MPI_Isend and MPI_Irecv in one call + /// @note The MPI plugin API allows for a maximum of two concurrent + /// non-blocking requests (e.g., one `Isend` and one `Irecv`). Since this + /// `SendRecvAsync` creates two pending requests, `Synchronize` must be called + /// to resolve in-flight requests before making new ones. + int (*SendRecvAsync)(const cudaqDistributedCommunicator_t *, const void *, + void *, int, DataType, int, int32_t); + /// @brief Wait for previous non-blocking MPI_Isend and MPI_Irecv to complete + int (*Synchronize)(const cudaqDistributedCommunicator_t *); + /// @brief MPI_Abort + int (*Abort)(const cudaqDistributedCommunicator_t *, int); + /// @brief MPI_Comm_dup + int (*CommDup)(const cudaqDistributedCommunicator_t *, + cudaqDistributedCommunicator_t **); + /// @brief MPI_Comm_split + int (*CommSplit)(const cudaqDistributedCommunicator_t *, int32_t, int32_t, + cudaqDistributedCommunicator_t **); +} cudaqDistributedInterface_t; +} \ No newline at end of file diff --git a/runtime/cudaq/distributed/mpi_plugin.cpp b/runtime/cudaq/distributed/mpi_plugin.cpp new file mode 100644 index 0000000000..800aafa6a9 --- /dev/null +++ b/runtime/cudaq/distributed/mpi_plugin.cpp @@ -0,0 +1,115 @@ +/******************************************************************************* + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +#include "mpi_plugin.h" +#include "common/PluginUtils.h" +#include + +namespace { +#define HANDLE_MPI_ERROR(x) \ + { \ + const auto err = x; \ + if (err != 0) { \ + printf("MPI Error encountered in line %d\n", __LINE__); \ + fflush(stdout); \ + std::abort(); \ + } \ + }; +} // namespace + +namespace cudaq { +MPIPlugin::MPIPlugin(const std::string &distributedInterfaceLib) { + if (!dlopen(distributedInterfaceLib.c_str(), RTLD_GLOBAL | RTLD_NOW)) { + const std::string errorMsg(dlerror()); + throw std::runtime_error("Unable to open distributed interface library '" + + distributedInterfaceLib + "': " + errorMsg); + } + m_distributedInterface = getUniquePluginInstance( + DISTRIBUTED_INTERFACE_GETTER_SYMBOL_NAME, + distributedInterfaceLib.c_str()); + m_comm = getUniquePluginInstance( + COMM_GETTER_SYMBOL_NAME, distributedInterfaceLib.c_str()); + // getUniquePluginInstance should have thrown if cannot load. + assert(m_distributedInterface && m_comm); + m_valid = m_comm->commSize > 0; + m_libFile = distributedInterfaceLib; +} + +void MPIPlugin::initialize() { + int argc{0}; + char **argv = nullptr; + initialize(argc, argv); +} + +void MPIPlugin::initialize(int argc, char **argv) { + HANDLE_MPI_ERROR(m_distributedInterface->initialize(&argc, &argv)); +} + +int MPIPlugin::rank() { + int pid{0}; + HANDLE_MPI_ERROR(m_distributedInterface->getProcRank(m_comm, &pid)); + return pid; +} + +int MPIPlugin::num_ranks() { + int np{0}; + HANDLE_MPI_ERROR(m_distributedInterface->getNumRanks(m_comm, &np)); + return np; +} + +bool MPIPlugin::is_initialized() { + int i{0}; + HANDLE_MPI_ERROR(m_distributedInterface->initialized(&i)); + return i == 1; +} + +bool MPIPlugin::is_finalized() { + int f{0}; + HANDLE_MPI_ERROR(m_distributedInterface->finalized(&f)); + return f == 1; +} + +void MPIPlugin::all_gather(std::vector &global, + const std::vector &local) { + HANDLE_MPI_ERROR(m_distributedInterface->Allgather( + m_comm, local.data(), global.data(), local.size(), FLOAT_64)); +} + +void MPIPlugin::all_gather(std::vector &global, + const std::vector &local) { + const auto dataType = (sizeof(int) == sizeof(int64_t)) ? INT_64 : INT_32; + HANDLE_MPI_ERROR(m_distributedInterface->Allgather( + m_comm, local.data(), global.data(), local.size(), dataType)); +} + +void MPIPlugin::broadcast(std::vector &data, int rootRank) { + HANDLE_MPI_ERROR(m_distributedInterface->Bcast( + m_comm, data.data(), data.size(), FLOAT_64, rootRank)); +} + +void MPIPlugin::all_reduce(std::vector &global, + const std::vector &local, ReduceOp op) { + HANDLE_MPI_ERROR(m_distributedInterface->Allreduce( + m_comm, local.data(), global.data(), local.size(), FLOAT_64, op)); +} + +void MPIPlugin::finalize() { + if (rank() == 0) + cudaq::info("Finalizing MPI."); + + // Check if finalize has been called. + int isFinalized{0}; + HANDLE_MPI_ERROR(m_distributedInterface->finalized(&isFinalized)); + + if (isFinalized) + return; + + // Finalize + HANDLE_MPI_ERROR(m_distributedInterface->finalize()); +} +} // namespace cudaq \ No newline at end of file diff --git a/runtime/cudaq/distributed/mpi_plugin.h b/runtime/cudaq/distributed/mpi_plugin.h new file mode 100644 index 0000000000..61dcd29b34 --- /dev/null +++ b/runtime/cudaq/distributed/mpi_plugin.h @@ -0,0 +1,92 @@ +/****************************************************************-*- C++ -*-**** + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +#pragma once +#include "distributed_capi.h" +#include +#include +namespace cudaq { +class MPIPlugin { + cudaqDistributedInterface_t *m_distributedInterface; + cudaqDistributedCommunicator_t *m_comm; + bool m_valid; + std::string m_libFile; + +public: + static constexpr std::string_view COMM_GETTER_SYMBOL_NAME = + "getMpiCommunicator"; + static constexpr std::string_view DISTRIBUTED_INTERFACE_GETTER_SYMBOL_NAME = + "getDistributedInterface"; + MPIPlugin(const std::string &distributedInterfaceLib); + cudaqDistributedInterface_t *get() { return m_distributedInterface; } + cudaqDistributedCommunicator_t *getComm() { return m_comm; } + std::string getPluginPath() const { return m_libFile; } + void initialize(); + + /// @brief Initialize MPI if available. This function + /// is a no-op if there CUDA Quantum has not been built + /// against MPI. Takes program arguments as input. + void initialize(int argc, char **argv); + + /// @brief Return the rank of the calling process. + int rank(); + + /// @brief Return the number of MPI ranks. + int num_ranks(); + + /// @brief Return true if MPI is already initialized, false otherwise. + bool is_initialized(); + + /// @brief Return true if MPI is already finalized, false otherwise. + bool is_finalized(); + + /// @brief Gather all vector data (floating point numbers) locally into the + /// provided global vector. + /// + /// Global vector must be sized to fit all + /// vector elements coming from individual ranks. + void all_gather(std::vector &global, + const std::vector &local); + + /// @brief Gather all vector data (integers) locally into the provided + /// global vector. + /// + /// Global vector must be sized to fit all + /// vector elements coming from individual ranks. + void all_gather(std::vector &global, const std::vector &local); + + /// @brief Broadcast a vector from a root rank to all other ranks + void broadcast(std::vector &data, int rootRank); + + /// @brief Combines local vector data from all processes and distributes the + /// result back to all processes into the provided global vector. + void all_reduce(std::vector &global, const std::vector &local, + ReduceOp op); + + /// @brief Finalize MPI. This function + /// is a no-op if there CUDA Quantum has not been built + /// against MPI. + void finalize(); + + /// @brief Is the plugin valid? + // Due to external runtime dependencies, e.g. Python modules, a loaded plugin + // may not be valid and shouldn'd be used. + bool isValid() const { return m_valid; } +}; + +namespace mpi { +/// @brief Retrieve the runtime MPI plugin. +/// @note Throw an error if no runtime MPI plugin is available unless `unsafe` +/// is true. +/// @param unsafe If true, returns a `nullptr` rather than throwing an error if +/// no MPI plugin is available. Hence, the caller needs to check the returned +/// pointer before use. +/// @return Pointer to the runtime MPI plugin +extern ::cudaq::MPIPlugin *getMpiPlugin(bool unsafe = false); +} // namespace mpi +} // namespace cudaq diff --git a/runtime/nvqir/cutensornet/CMakeLists.txt b/runtime/nvqir/cutensornet/CMakeLists.txt index a08148fbf3..5f476c83d8 100644 --- a/runtime/nvqir/cutensornet/CMakeLists.txt +++ b/runtime/nvqir/cutensornet/CMakeLists.txt @@ -59,11 +59,8 @@ if (${CUTENSORNET_VERSION} VERSION_GREATER_EQUAL "2.3") nvqir_create_cutn_plugin(tensornet ${BASE_TENSOR_BACKEND_SRS} simulator_tensornet_register.cpp ) nvqir_create_cutn_plugin(tensornet-mps ${BASE_TENSOR_BACKEND_SRS} simulator_mps_register.cpp) add_library(tensornet-mpi-util OBJECT mpi_support.cpp) - target_include_directories(tensornet-mpi-util PRIVATE ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES} ${CUTENSORNET_INCLUDE_DIR}) - if (MPI_CXX_FOUND) - target_compile_definitions(tensornet-mpi-util PRIVATE CUDAQ_HAS_MPI) - target_link_libraries(tensornet-mpi-util PRIVATE MPI::MPI_CXX) - endif() + target_include_directories(tensornet-mpi-util PRIVATE ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES} ${CUTENSORNET_INCLUDE_DIR} ${CMAKE_SOURCE_DIR}/runtime) + target_link_libraries(tensornet-mpi-util PRIVATE cudaq-common fmt::fmt-header-only) # Note: only tensornet backend supports MPI at cutensornet level (distributed tensor computation) target_link_libraries(nvqir-tensornet PRIVATE tensornet-mpi-util) diff --git a/runtime/nvqir/cutensornet/mpi_support.cpp b/runtime/nvqir/cutensornet/mpi_support.cpp index a82e830b74..d2b7941197 100644 --- a/runtime/nvqir/cutensornet/mpi_support.cpp +++ b/runtime/nvqir/cutensornet/mpi_support.cpp @@ -5,48 +5,264 @@ * This source code and the accompanying materials are made available under * * the terms of the Apache License 2.0 which accompanies this distribution. * ******************************************************************************/ +#include "common/Logger.h" +#include "cudaq/distributed/mpi_plugin.h" #include "tensornet_utils.h" +#include +#include +#include +#include +#include +// Hook to query this shared lib file location at runtime. +extern "C" { +void getThisLibPath() { return; } +} + +/// @brief Query the full path to the this lib. +static const char *getThisSharedLibFilePath() { + static thread_local std::string LIB_PATH; + if (LIB_PATH.empty()) { + // Use dladdr query this .so file + void *needle = (void *)(intptr_t)getThisLibPath; + Dl_info DLInfo; + int err = dladdr(needle, &DLInfo); + if (err != 0) { + char link_path[PATH_MAX]; + // If the filename is a symlink, we need to resolve and return the + // location of the actual .so file. + if (realpath(DLInfo.dli_fname, link_path)) + LIB_PATH = link_path; + } + } + + return LIB_PATH.c_str(); +} + +/// @brief Retrieve the MPI plugin comm interface +static cudaqDistributedInterface_t *getMpiPluginInterface() { + auto mpiPlugin = cudaq::mpi::getMpiPlugin(); + if (!mpiPlugin) + throw std::runtime_error("Failed to retrieve MPI plugin"); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + if (!mpiInterface) + throw std::runtime_error("Invalid MPI distributed plugin encountered"); + return mpiInterface; +} + +/// @brief Retrieve the MPI plugin (type-erased) comm pointer +static cudaqDistributedCommunicator_t *getMpiCommWrapper() { + auto mpiPlugin = cudaq::mpi::getMpiPlugin(); + if (!mpiPlugin) + throw std::runtime_error("Failed to retrieve MPI plugin"); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + if (!comm) + throw std::runtime_error("Invalid MPI distributed plugin encountered"); + return comm; +} + +/// @brief Retrieve the path to the plugin implementation +static std::string getMpiPluginFilePath() { + auto mpiPlugin = cudaq::mpi::getMpiPlugin(); + if (!mpiPlugin) + throw std::runtime_error("Failed to retrieve MPI plugin"); + + return mpiPlugin->getPluginPath(); +} -#if defined CUDAQ_HAS_MPI -#include -#define HANDLE_MPI_ERROR(x) \ - { \ - const auto err = x; \ - if (err != MPI_SUCCESS) { \ - char error[MPI_MAX_ERROR_STRING]; \ - int len; \ - MPI_Error_string(err, error, &len); \ - printf("MPI Error: %s in line %d\n", error, __LINE__); \ - fflush(stdout); \ - MPI_Abort(MPI_COMM_WORLD, err); \ - } \ - }; void initCuTensornetComm(cutensornetHandle_t cutnHandle) { - // If the CUTENSORNET_COMM_LIB environment variable is not set, print a - // warning message since cutensornet will likely to fail. - // - // Note: this initialization only happens when the user initializes - // MPI explicitly. In this case, the user will need to define the environment - // variable CUTENSORNET_COMM_LIB as described in the Getting Started section - // of the cuTensorNet library documentation (Installation and Compilation). - if (std::getenv("CUTENSORNET_COMM_LIB") == nullptr) - printf("[Warning] Enabling cuTensorNet MPI without environment variable " - "CUTENSORNET_COMM_LIB.\nMPI parallelization inside cuTensorNet " - "library may cause an error.\n"); - MPI_Comm cutnComm; - // duplicate MPI communicator to dedicate it to cuTensorNet - HANDLE_MPI_ERROR(MPI_Comm_dup(MPI_COMM_WORLD, &cutnComm)); + cudaqDistributedInterface_t *mpiInterface = getMpiPluginInterface(); + cudaqDistributedCommunicator_t *comm = getMpiCommWrapper(); + assert(mpiInterface && comm); + cudaqDistributedCommunicator_t *dupComm = nullptr; + const auto dupStatus = mpiInterface->CommDup(comm, &dupComm); + if (dupStatus != 0 || dupComm == nullptr) + throw std::runtime_error("Failed to duplicate the MPI communicator when " + "initializing cutensornet MPI"); + + // If CUTENSORNET_COMM_LIB environment variable is not set, + // use this builtin plugin shim (redirect MPI calls to CUDAQ plugin) + if (std::getenv("CUTENSORNET_COMM_LIB") == nullptr) { + cudaq::info("Enabling cuTensorNet MPI without environment variable " + "CUTENSORNET_COMM_LIB. \nUse the builtin cuTensorNet " + "communicator lib from '{}' - cuda quantum MPI plugin {}.", + getThisSharedLibFilePath(), getMpiPluginFilePath()); + setenv("CUTENSORNET_COMM_LIB", getThisSharedLibFilePath(), 0); + } + HANDLE_CUTN_ERROR(cutensornetDistributedResetConfiguration( - cutnHandle, &cutnComm, sizeof(cutnComm))); + cutnHandle, dupComm->commPtr, dupComm->commSize)); } void resetCuTensornetComm(cutensornetHandle_t cutnHandle) { + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + if (!mpiPlugin) + throw std::runtime_error("Invalid MPI distributed plugin encountered when " + "initializing cutensornet MPI"); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + if (!mpiInterface || !comm) + throw std::runtime_error("Invalid MPI distributed plugin encountered when " + "initializing cutensornet MPI"); // Passing a nullptr to force a reset. HANDLE_CUTN_ERROR(cutensornetDistributedResetConfiguration( - cutnHandle, nullptr, sizeof(MPI_Comm))); + cutnHandle, nullptr, comm->commSize)); +} + +// Implementing cutensornet's COMM interface by delegating wrapped MPI calls to +// the underlying CUDA Quantum MPI plugin. This will make this library +// compatible with CUTENSORNET_COMM_LIB API. Converts CUDA data type to the +// corresponding CUDAQ shim type enum + +/// Convert cutensornet CUDA datatype enum +static DataType convertCudaToMpiDataType(const cudaDataType_t cudaDataType) { + switch (cudaDataType) { + case CUDA_R_8I: + return INT_8; + case CUDA_R_16I: + return INT_16; + case CUDA_R_32I: + return INT_32; + case CUDA_R_64I: + return INT_64; + case CUDA_R_32F: + return FLOAT_32; + case CUDA_R_64F: + return FLOAT_64; + case CUDA_C_32F: + return FLOAT_COMPLEX; + case CUDA_C_64F: + return DOUBLE_COMPLEX; + default: + throw std::runtime_error( + "Unsupported data type encountered in cutensornet communicator plugin"); + } + __builtin_unreachable(); +} + +/// Convert the type-erased Comm object +static cudaqDistributedCommunicator_t +convertMpiCommunicator(const cutensornetDistributedCommunicator_t *cutnComm) { + cudaqDistributedCommunicator_t comm{cutnComm->commPtr, cutnComm->commSize}; + return comm; +} + +#ifdef __cplusplus +extern "C" { +#endif +/// MPI_Comm_size wrapper +int cutensornetMpiCommSize(const cutensornetDistributedCommunicator_t *comm, + int32_t *numRanks) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->getNumRanks(&cudaqComm, numRanks); } -#else -// Noop if we don't have MPI -void initCuTensornetComm(cutensornetHandle_t cutnHandle) {} -void resetCuTensornetComm(cutensornetHandle_t cutnHandle) {} -#endif \ No newline at end of file + +/// Returns the size of the local subgroup of processes sharing node memory +int cutensornetMpiCommSizeShared( + const cutensornetDistributedCommunicator_t *comm, int32_t *numRanks) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->getCommSizeShared(&cudaqComm, numRanks); +} + +/// MPI_Comm_rank wrapper +int cutensornetMpiCommRank(const cutensornetDistributedCommunicator_t *comm, + int32_t *procRank) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->getProcRank(&cudaqComm, procRank); +} + +/// MPI_Barrier wrapper +int cutensornetMpiBarrier(const cutensornetDistributedCommunicator_t *comm) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->Barrier(&cudaqComm); +} + +/// MPI_Bcast wrapper +int cutensornetMpiBcast(const cutensornetDistributedCommunicator_t *comm, + void *buffer, int32_t count, cudaDataType_t datatype, + int32_t root) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->Bcast( + &cudaqComm, buffer, count, convertCudaToMpiDataType(datatype), root); +} + +/// MPI_Allreduce wrapper +int cutensornetMpiAllreduce(const cutensornetDistributedCommunicator_t *comm, + const void *bufferIn, void *bufferOut, + int32_t count, cudaDataType_t datatype) { + cudaq::ScopedTrace trace(__FUNCTION__); + // cutensornet expects MPI_SUM in this API + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->Allreduce( + &cudaqComm, bufferIn, bufferOut, count, + convertCudaToMpiDataType(datatype), SUM); +} + +/// MPI_Allreduce IN_PLACE wrapper +int cutensornetMpiAllreduceInPlace( + const cutensornetDistributedCommunicator_t *comm, void *buffer, + int32_t count, cudaDataType_t datatype) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + // cutensornet expects MPI_SUM in this API + return getMpiPluginInterface()->AllreduceInPlace( + &cudaqComm, buffer, count, convertCudaToMpiDataType(datatype), SUM); +} + +/// MPI_Allreduce IN_PLACE MIN wrapper +int cutensornetMpiAllreduceInPlaceMin( + const cutensornetDistributedCommunicator_t *comm, void *buffer, + int32_t count, cudaDataType_t datatype) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + // cutensornet expects MPI_SUM in this API + return getMpiPluginInterface()->AllreduceInPlace( + &cudaqComm, buffer, count, convertCudaToMpiDataType(datatype), MIN); +} + +/// MPI_Allreduce DOUBLE_INT MINLOC wrapper +int cutensornetMpiAllreduceDoubleIntMinloc( + const cutensornetDistributedCommunicator_t *comm, + const void *bufferIn, // *struct {double; int;} + void *bufferOut) // *struct {double; int;} +{ + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->Allreduce(&cudaqComm, bufferIn, bufferOut, 1, + FLOAT_64, MIN_LOC); +} + +/// MPI_Allgather wrapper +int cutensornetMpiAllgather(const cutensornetDistributedCommunicator_t *comm, + const void *bufferIn, void *bufferOut, + int32_t count, cudaDataType_t datatype) { + cudaq::ScopedTrace trace(__FUNCTION__); + auto cudaqComm = convertMpiCommunicator(comm); + return getMpiPluginInterface()->Allgather(&cudaqComm, bufferIn, bufferOut, + count, + convertCudaToMpiDataType(datatype)); +} + +/// Distributed communication service API wrapper binding table (imported by +/// cuTensorNet). The exposed C symbol must be named as +/// "cutensornetCommInterface". +cutensornetDistributedInterface_t cutensornetCommInterface = { + CUTENSORNET_DISTRIBUTED_INTERFACE_VERSION, + cutensornetMpiCommSize, + cutensornetMpiCommSizeShared, + cutensornetMpiCommRank, + cutensornetMpiBarrier, + cutensornetMpiBcast, + cutensornetMpiAllreduce, + cutensornetMpiAllreduceInPlace, + cutensornetMpiAllreduceInPlaceMin, + cutensornetMpiAllreduceDoubleIntMinloc, + cutensornetMpiAllgather}; + +#ifdef __cplusplus +} // extern "C" +#endif diff --git a/unittests/CMakeLists.txt b/unittests/CMakeLists.txt index 94c62456af..f0f357121c 100644 --- a/unittests/CMakeLists.txt +++ b/unittests/CMakeLists.txt @@ -178,6 +178,32 @@ target_link_libraries(test_photonics gtest_main) gtest_discover_tests(test_photonics) +# Create an executable for MPI UnitTests +# (only if MPI was found, i.e., the builtin plugin is available) +if (MPI_CXX_FOUND) + set(CUDAQ_MPI_TEST_SOURCES + mpi/mpi_tester.cpp + ) + add_executable(test_mpi_plugin ${CUDAQ_MPI_TEST_SOURCES}) + set(NUM_PROCS 4) + target_compile_definitions(test_mpi_plugin PRIVATE -DNUM_PROCS=${NUM_PROCS}) + target_link_libraries(test_mpi_plugin + PRIVATE + cudaq + cudaq-platform-default + nvqir-qpp + gtest + ) + target_link_options(test_mpi_plugin PRIVATE -Wl,--no-as-needed) + # Check if `--allow-run-as-root` is supported (OpenMPI) + # Note: MPICH doesn't need `--allow-run-as-root`. + execute_process(COMMAND ${MPIEXEC} --allow-run-as-root -np ${NUM_PROCS} hostname ERROR_VARIABLE CHECK_ALLOW_RUN_AS_ROOT_RESULTS ERROR_STRIP_TRAILING_WHITESPACE OUTPUT_QUIET) + if ("${CHECK_ALLOW_RUN_AS_ROOT_RESULTS}" STREQUAL "") + set(MPI_EXEC_CMD_ARGS "--allow-run-as-root") + endif() + + add_test(NAME MPIApiTest COMMAND ${MPIEXEC} ${MPI_EXEC_CMD_ARGS} -np ${NUM_PROCS} ${CMAKE_BINARY_DIR}/unittests/test_mpi_plugin) +endif() add_subdirectory(backends) add_subdirectory(Optimizer) diff --git a/unittests/mpi/mpi_tester.cpp b/unittests/mpi/mpi_tester.cpp new file mode 100644 index 0000000000..713e8abb5d --- /dev/null +++ b/unittests/mpi/mpi_tester.cpp @@ -0,0 +1,253 @@ +/******************************************************************************* + * Copyright (c) 2022 - 2023 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ +#include "cudaq/distributed/mpi_plugin.h" +#include +#include +#include + +TEST(MPITester, checkInit) { + EXPECT_TRUE(cudaq::mpi::is_initialized()); + std::cout << "Rank = " << cudaq::mpi::rank() << "\n"; +} + +TEST(MPITester, checkBroadcast) { + constexpr std::size_t numElements = 100; + const std::vector expectedData = + cudaq::random_vector(-M_PI, M_PI, numElements, /*seed = */ 1); + // Only rank 0 has the data + auto bcastVec = cudaq::mpi::rank() == 0 + ? expectedData + : std::vector(numElements, 0.0); + // Broadcast + cudaq::mpi::broadcast(bcastVec, 0); + + // All ranks have the same data + for (std::size_t i = 0; i < bcastVec.size(); ++i) { + EXPECT_EQ(bcastVec[i], expectedData[i]) + << "Broadcast data is corrupted at index " << i; + } +} + +TEST(MPITester, checkAllReduce) { + { + // Double type + const std::vector rankData = cudaq::random_vector( + -M_PI, M_PI, cudaq::mpi::num_ranks(), /*seed = */ 1); + const double localVal = rankData[cudaq::mpi::rank()]; + const double expectedSum = std::reduce(rankData.begin(), rankData.end()); + const double expectedProd = std::reduce(rankData.begin(), rankData.end(), + 1.0, std::multiplies()); + const double mpiSumReduce = + cudaq::mpi::all_reduce(localVal, std::plus()); + const double mpiProdReduce = + cudaq::mpi::all_reduce(localVal, std::multiplies()); + EXPECT_NEAR(expectedSum, mpiSumReduce, 1e-12) + << "All reduce SUM result does not match."; + EXPECT_NEAR(expectedProd, mpiProdReduce, 1e-12) + << "All reduce PROD result does not match."; + } + + { + // Float type + const std::vector rankData = cudaq::random_vector( + -M_PI, M_PI, cudaq::mpi::num_ranks(), /*seed = */ 2); + const double expectedSum = std::reduce(rankData.begin(), rankData.end()); + const double expectedProd = std::reduce(rankData.begin(), rankData.end(), + 1.0, std::multiplies()); + const float localVal = rankData[cudaq::mpi::rank()]; + const float mpiSumReduce = + cudaq::mpi::all_reduce(localVal, std::plus()); + const float mpiProdReduce = + cudaq::mpi::all_reduce(localVal, std::multiplies()); + EXPECT_NEAR(expectedSum, mpiSumReduce, 1e-6) + << "All reduce SUM result does not match."; + EXPECT_NEAR(expectedProd, mpiProdReduce, 1e-6) + << "All reduce PROD result does not match."; + } +} + +TEST(MPITester, checkAllGather) { + { + // double type + constexpr std::size_t numElements = 10; + const std::vector expectedGatherData = cudaq::random_vector( + -M_PI, M_PI, numElements * cudaq::mpi::num_ranks(), /*seed = */ 1); + // Slice this vector to each rank + const std::vector rankData( + expectedGatherData.begin() + numElements * cudaq::mpi::rank(), + expectedGatherData.begin() + numElements * cudaq::mpi::rank() + + numElements); + EXPECT_EQ(rankData.size(), numElements); + // Reconstruct the data vector with all_gather + std::vector gatherData(cudaq::mpi::num_ranks() * numElements); + cudaq::mpi::all_gather(gatherData, rankData); + for (std::size_t i = 0; i < gatherData.size(); ++i) { + EXPECT_EQ(gatherData[i], expectedGatherData[i]) + << "AllGather data is corrupted at index " << i; + } + } + { + // int type + const std::vector rankData{cudaq::mpi::rank()}; + std::vector expectedGatherData(cudaq::mpi::num_ranks()); + std::iota(expectedGatherData.begin(), expectedGatherData.end(), + 0); // Fill with 0, 1, ... + std::vector gatherData(cudaq::mpi::num_ranks()); + cudaq::mpi::all_gather(gatherData, rankData); + for (std::size_t i = 0; i < gatherData.size(); ++i) { + EXPECT_EQ(gatherData[i], expectedGatherData[i]) + << "AllGather data is corrupted at index " << i; + } + } +} + +TEST(MPITester, checkAllGatherV) { + const auto rank = cudaq::mpi::rank(); + const auto numRanks = cudaq::mpi::num_ranks(); + const int mySize = rank + 1; + const int refSize = numRanks * (numRanks + 1) / 2; + std::vector sizes(numRanks); + std::vector offsets(numRanks); + for (int iProc = 0; iProc < numRanks; iProc++) { + sizes[iProc] = iProc + 1; + offsets[iProc] = iProc * (iProc + 1) / 2; + } + const auto getSerialVector = [](int size, int offset) { + std::vector vector(size); + for (int i = 0; i < size; ++i) { + vector[i] = static_cast(i + offset); + } + return vector; + }; + const auto refVector = getSerialVector(refSize, 0); + const int offset = offsets[rank]; + const auto myVector = getSerialVector(mySize, offset); + std::vector vector(refSize); + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + EXPECT_TRUE(mpiPlugin != nullptr); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + EXPECT_TRUE(mpiInterface != nullptr); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + EXPECT_TRUE(comm != nullptr); + int initialized = 0; + EXPECT_EQ(mpiInterface->initialized(&initialized), 0); + EXPECT_EQ(initialized, 1); + EXPECT_EQ(mpiInterface->AllgatherV(comm, myVector.data(), mySize, + vector.data(), sizes.data(), + offsets.data(), FLOAT_64), + 0); + + for (std::size_t i = 0; i < vector.size(); ++i) { + EXPECT_EQ(vector[i], refVector[i]) + << "AllGatherV data is corrupted at index " << i; + } +} + +TEST(MPITester, checkSendAndRecv) { + constexpr int nElems = 1; + const auto rank = cudaq::mpi::rank(); + std::vector sendBuffer(nElems); + std::vector recvBuffer(nElems); + std::vector refBuffer(nElems); + const int sendRank = rank ^ 1; + const int recvRank = sendRank; + sendBuffer[0] = rank; + refBuffer[0] = sendRank; + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + EXPECT_TRUE(mpiPlugin != nullptr); + std::cout << "MPI plugin file: " << mpiPlugin->getPluginPath() << "\n"; + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + EXPECT_TRUE(mpiInterface != nullptr); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + EXPECT_TRUE(comm != nullptr); + EXPECT_EQ(mpiInterface->RecvAsync(comm, recvBuffer.data(), nElems, FLOAT_64, + recvRank, 0), + 0); + EXPECT_EQ(mpiInterface->SendAsync(comm, sendBuffer.data(), nElems, FLOAT_64, + sendRank, 0), + 0); + EXPECT_EQ(mpiInterface->Synchronize(comm), 0); + for (std::size_t i = 0; i < refBuffer.size(); ++i) { + EXPECT_EQ(refBuffer[i], recvBuffer[i]) + << "Send-Receive data is corrupted at index " << i; + } +} + +TEST(MPITester, checkSendRecv) { + constexpr int nElems = 1; + const auto rank = cudaq::mpi::rank(); + std::vector sendBuffer(nElems); + std::vector recvBuffer(nElems); + std::vector refBuffer(nElems); + const int sendRecvRank = rank ^ 1; + sendBuffer[0] = rank; + refBuffer[0] = sendRecvRank; + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + EXPECT_TRUE(mpiPlugin != nullptr); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + EXPECT_TRUE(mpiInterface != nullptr); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + EXPECT_TRUE(comm != nullptr); + EXPECT_EQ(mpiInterface->SendRecvAsync(comm, sendBuffer.data(), + recvBuffer.data(), nElems, FLOAT_64, + sendRecvRank, 0), + 0); + EXPECT_EQ(mpiInterface->Synchronize(comm), 0); + for (std::size_t i = 0; i < refBuffer.size(); ++i) { + EXPECT_EQ(refBuffer[i], recvBuffer[i]) + << "Send-Receive data is corrupted at index " << i; + } +} + +TEST(MPITester, checkCommDup) { + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + EXPECT_TRUE(mpiPlugin != nullptr); + const int origSize = cudaq::mpi::num_ranks(); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + EXPECT_TRUE(mpiInterface != nullptr); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + EXPECT_TRUE(comm != nullptr); + int initialized = 0; + EXPECT_EQ(mpiInterface->initialized(&initialized), 0); + EXPECT_EQ(initialized, 1); + cudaqDistributedCommunicator_t *dupComm = nullptr; + EXPECT_EQ(mpiInterface->CommDup(comm, &dupComm), 0); + EXPECT_TRUE(dupComm != nullptr); + int size = 0; + EXPECT_EQ(mpiInterface->getNumRanks(dupComm, &size), 0); + EXPECT_GT(size, 0); + EXPECT_EQ(size, origSize); +} + +TEST(MPITester, checkCommSplit) { + auto *mpiPlugin = cudaq::mpi::getMpiPlugin(); + EXPECT_TRUE(mpiPlugin != nullptr); + cudaqDistributedInterface_t *mpiInterface = mpiPlugin->get(); + EXPECT_TRUE(mpiInterface != nullptr); + cudaqDistributedCommunicator_t *comm = mpiPlugin->getComm(); + EXPECT_TRUE(comm != nullptr); + int initialized = 0; + EXPECT_EQ(mpiInterface->initialized(&initialized), 0); + EXPECT_EQ(initialized, 1); + cudaqDistributedCommunicator_t *dupComm = nullptr; + EXPECT_EQ(mpiInterface->CommSplit(comm, /*color=*/cudaq::mpi::rank(), + /*key=*/0, &dupComm), + 0); + EXPECT_TRUE(dupComm != nullptr); + int size = 0; + EXPECT_EQ(mpiInterface->getNumRanks(dupComm, &size), 0); + EXPECT_EQ(size, 1); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + cudaq::mpi::initialize(); + const auto testResult = RUN_ALL_TESTS(); + cudaq::mpi::finalize(); + return testResult; +}