From 22668fa1d9ea5918f463c52bcdcb5ef181e5d1d0 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Fri, 11 Oct 2024 04:52:16 -0400 Subject: [PATCH] Small improvements (#493) This PR makes a couple of small, general improvements. - Improve the use of `std::tolower` to avoid UB. References [cppreference](https://en.cppreference.com/w/cpp/string/byte/tolower). - Improve the readability of POSIX read/write interface. - Improve the use of `std::map` to avoid a repeated search in the self-balancing BST, which takes `O(n)`. Authors: - Tianyu Liu (https://github.com/kingcrimsontianyu) - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - MithunR (https://github.com/mythrocks) URL: https://github.com/rapidsai/kvikio/pull/493 --- cpp/include/kvikio/defaults.hpp | 9 +++- cpp/include/kvikio/file_handle.hpp | 16 +++--- cpp/include/kvikio/posix_io.hpp | 83 +++++++++++++++++++----------- 3 files changed, 71 insertions(+), 37 deletions(-) diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 9cb9331bf7..32367686d2 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -57,7 +57,14 @@ inline bool getenv_or(std::string_view env_var_name, bool default_val) } // Convert to lowercase std::string str{env_val}; - std::transform(str.begin(), str.end(), str.begin(), ::tolower); + // Special considerations regarding the case conversion: + // - std::tolower() is not an addressable function. Passing it to std::transform() as + // a function pointer, if the compile turns out successful, causes the program behavior + // "unspecified (possibly ill-formed)", hence the lambda. ::tolower() is addressable + // and does not have this problem, but the following item still applies. + // - To avoid UB in std::tolower() or ::tolower(), the character must be cast to unsigned char. + std::transform( + str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::tolower(c); }); // Trim whitespaces std::stringstream trimmer; trimmer << str; diff --git a/cpp/include/kvikio/file_handle.hpp b/cpp/include/kvikio/file_handle.hpp index f84e792489..19445f1333 100644 --- a/cpp/include/kvikio/file_handle.hpp +++ b/cpp/include/kvikio/file_handle.hpp @@ -330,7 +330,8 @@ class FileHandle { bool sync_default_stream = true) { if (_compat_mode) { - return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); + return detail::posix_device_read( + _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); } if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } @@ -381,7 +382,8 @@ class FileHandle { _nbytes = 0; // Invalidate the computed file size if (_compat_mode) { - return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); + return detail::posix_device_write( + _fd_direct_off, devPtr_base, size, file_offset, devPtr_offset); } if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); } @@ -438,7 +440,8 @@ class FileHandle { std::size_t file_offset, std::size_t hostPtr_offset) -> std::size_t { char* buf = static_cast(hostPtr_base) + hostPtr_offset; - return posix_host_read(_fd_direct_off, buf, size, file_offset, false); + return detail::posix_host_read( + _fd_direct_off, buf, size, file_offset); }; return parallel_io(op, buf, size, file_offset, task_size, 0); @@ -450,7 +453,7 @@ class FileHandle { if (size < gds_threshold) { auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { PushAndPopContext c(ctx); - return posix_device_read(_fd_direct_off, buf, size, file_offset, 0); + return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0); }; return std::async(std::launch::deferred, task); } @@ -513,7 +516,8 @@ class FileHandle { std::size_t file_offset, std::size_t hostPtr_offset) -> std::size_t { const char* buf = static_cast(hostPtr_base) + hostPtr_offset; - return posix_host_write(_fd_direct_off, buf, size, file_offset, false); + return detail::posix_host_write( + _fd_direct_off, buf, size, file_offset); }; return parallel_io(op, buf, size, file_offset, task_size, 0); @@ -525,7 +529,7 @@ class FileHandle { if (size < gds_threshold) { auto task = [this, ctx, buf, size, file_offset]() -> std::size_t { PushAndPopContext c(ctx); - return posix_device_write(_fd_direct_off, buf, size, file_offset, 0); + return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0); }; return std::async(std::launch::deferred, task); } diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index e044ab0bca..0437ef69f8 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -26,9 +26,23 @@ #include #include -namespace kvikio { +namespace kvikio::detail { -namespace detail { +/** + * @brief Type of the IO operation. + */ +enum class IOOperationType : uint8_t { + READ, ///< POSIX read. + WRITE, ///< POSIX write. +}; + +/** + * @brief Specifies whether all requested bytes are to be processed or not. + */ +enum class PartialIO : uint8_t { + YES, ///< POSIX read/write is called only once, which may not process all bytes requested. + NO, ///< POSIX read/write is called repeatedly until all requested bytes are processed. +}; /** * @brief Singleton class to retrieve a CUDA stream for device-host copying @@ -60,12 +74,14 @@ class StreamsByThread { auto key = std::make_pair(ctx, thd_id); // Create a new stream if `ctx` doesn't have one. - if (_instance._streams.find(key) == _instance._streams.end()) { + if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { CUstream stream{}; CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); _instance._streams[key] = stream; + return stream; + } else { + return search->second; } - return _instance._streams.at(key); } static CUstream get() @@ -84,29 +100,30 @@ class StreamsByThread { /** * @brief Read or write host memory to or from disk using POSIX * - * @tparam IsReadOperation Whether the operation is a read or a write + * @tparam Operation Whether the operation is a read or a write. + * @tparam PartialIOStatus Whether all requested data are processed or not. If `FULL`, all of + * `count` bytes are read or written. * @param fd File descriptor * @param buf Buffer to write * @param count Number of bytes to write * @param offset File offset - * @param partial If false, all of `count` bytes are read or written. * @return The number of bytes read or written (always gather than zero) */ -template -ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool partial) +template +ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset) { off_t cur_offset = offset; size_t byte_remaining = count; char* buffer = const_cast(static_cast(buf)); while (byte_remaining > 0) { ssize_t nbytes = 0; - if constexpr (IsReadOperation) { + if constexpr (Operation == IOOperationType::READ) { nbytes = ::pread(fd, buffer, byte_remaining, cur_offset); } else { nbytes = ::pwrite(fd, buffer, byte_remaining, cur_offset); } if (nbytes == -1) { - const std::string name = IsReadOperation ? "pread" : "pwrite"; + const std::string name = Operation == IOOperationType::READ ? "pread" : "pwrite"; if (errno == EBADF) { throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" + KVIKIO_STRINGIFY(__LINE__) + ": Operation not permitted"}; @@ -114,13 +131,13 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" + KVIKIO_STRINGIFY(__LINE__) + ": " + strerror(errno)}; } - if constexpr (IsReadOperation) { + if constexpr (Operation == IOOperationType::READ) { if (nbytes == 0) { throw CUfileException{std::string{"POSIX error on pread at: "} + __FILE__ + ":" + KVIKIO_STRINGIFY(__LINE__) + ": EOF"}; } } - if (partial) { return nbytes; } + if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes; } buffer += nbytes; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic) cur_offset += nbytes; byte_remaining -= nbytes; @@ -131,7 +148,7 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool /** * @brief Read or write device memory to or from disk using POSIX * - * @tparam IsReadOperation Whether the operation is a read or a write + * @tparam Operation Whether the operation is a read or a write. * @param fd File descriptor * @param devPtr_base Device pointer to read or write to. * @param size Number of bytes to read or write. @@ -139,7 +156,7 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool * @param devPtr_offset Byte offset to the start of the device pointer. * @return Number of bytes read or written. */ -template +template std::size_t posix_device_io(int fd, const void* devPtr_base, std::size_t size, @@ -158,15 +175,17 @@ std::size_t posix_device_io(int fd, while (byte_remaining > 0) { const off_t nbytes_requested = std::min(chunk_size2, byte_remaining); ssize_t nbytes_got = nbytes_requested; - if constexpr (IsReadOperation) { - nbytes_got = posix_host_io(fd, alloc.get(), nbytes_requested, cur_file_offset, true); + if constexpr (Operation == IOOperationType::READ) { + nbytes_got = posix_host_io( + fd, alloc.get(), nbytes_requested, cur_file_offset); CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream)); CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); } else { // Is a write operation CUDA_DRIVER_TRY( cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream)); CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream)); - posix_host_io(fd, alloc.get(), nbytes_requested, cur_file_offset, false); + posix_host_io( + fd, alloc.get(), nbytes_requested, cur_file_offset); } cur_file_offset += nbytes_got; devPtr += nbytes_got; @@ -175,26 +194,26 @@ std::size_t posix_device_io(int fd, return size; } -} // namespace detail - /** * @brief Read from disk to host memory using POSIX * * If `size` or `file_offset` isn't aligned with `page_size` then * `fd` cannot have been opened with the `O_DIRECT` flag. * + * @tparam PartialIOStatus Whether all requested data are processed or not. If `FULL`, all of + * `count` bytes are read. * @param fd File descriptor * @param buf Base address of buffer in host memory. * @param size Size in bytes to read. * @param file_offset Offset in the file to read from. - * @param partial If false, all of `size` bytes are read. * @return Size of bytes that were successfully read. */ -inline std::size_t posix_host_read( - int fd, void* buf, std::size_t size, std::size_t file_offset, bool partial) +template +std::size_t posix_host_read(int fd, void* buf, std::size_t size, std::size_t file_offset) { KVIKIO_NVTX_FUNC_RANGE("posix_host_read()", size); - return detail::posix_host_io(fd, buf, size, convert_size2off(file_offset), partial); + return detail::posix_host_io( + fd, buf, size, convert_size2off(file_offset)); } /** @@ -203,18 +222,20 @@ inline std::size_t posix_host_read( * If `size` or `file_offset` isn't aligned with `page_size` then * `fd` cannot have been opened with the `O_DIRECT` flag. * + * @tparam ioDataCompletionLevel Whether all requested data are processed or not. If `FULL`, all of + * `count` bytes are written. * @param fd File descriptor * @param buf Base address of buffer in host memory. * @param size Size in bytes to write. * @param file_offset Offset in the file to write to. - * @param partial If false, all of `size` bytes are written. * @return Size of bytes that were successfully read. */ -inline std::size_t posix_host_write( - int fd, const void* buf, std::size_t size, std::size_t file_offset, bool partial) +template +std::size_t posix_host_write(int fd, const void* buf, std::size_t size, std::size_t file_offset) { KVIKIO_NVTX_FUNC_RANGE("posix_host_write()", size); - return detail::posix_host_io(fd, buf, size, convert_size2off(file_offset), partial); + return detail::posix_host_io( + fd, buf, size, convert_size2off(file_offset)); } /** @@ -237,7 +258,8 @@ inline std::size_t posix_device_read(int fd, std::size_t devPtr_offset) { KVIKIO_NVTX_FUNC_RANGE("posix_device_read()", size); - return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset); + return detail::posix_device_io( + fd, devPtr_base, size, file_offset, devPtr_offset); } /** @@ -260,7 +282,8 @@ inline std::size_t posix_device_write(int fd, std::size_t devPtr_offset) { KVIKIO_NVTX_FUNC_RANGE("posix_device_write()", size); - return detail::posix_device_io(fd, devPtr_base, size, file_offset, devPtr_offset); + return detail::posix_device_io( + fd, devPtr_base, size, file_offset, devPtr_offset); } -} // namespace kvikio +} // namespace kvikio::detail