Skip to content

Commit

Permalink
Small improvements (rapidsai#493)
Browse files Browse the repository at this point in the history
This PR makes a couple of small, general improvements.
- Improve the use of `std::tolower` to avoid UB. References [cppreference](https://en.cppreference.com/w/cpp/string/byte/tolower).
- Improve the readability of POSIX read/write interface.
- Improve the use of `std::map` to avoid a repeated search in the self-balancing BST, which takes `O(n)`.

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - MithunR (https://github.com/mythrocks)

URL: rapidsai#493
  • Loading branch information
kingcrimsontianyu authored Oct 11, 2024
1 parent 1ef4094 commit 22668fa
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 37 deletions.
9 changes: 8 additions & 1 deletion cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@ inline bool getenv_or(std::string_view env_var_name, bool default_val)
}
// Convert to lowercase
std::string str{env_val};
std::transform(str.begin(), str.end(), str.begin(), ::tolower);
// Special considerations regarding the case conversion:
// - std::tolower() is not an addressable function. Passing it to std::transform() as
// a function pointer, if the compile turns out successful, causes the program behavior
// "unspecified (possibly ill-formed)", hence the lambda. ::tolower() is addressable
// and does not have this problem, but the following item still applies.
// - To avoid UB in std::tolower() or ::tolower(), the character must be cast to unsigned char.
std::transform(
str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::tolower(c); });
// Trim whitespaces
std::stringstream trimmer;
trimmer << str;
Expand Down
16 changes: 10 additions & 6 deletions cpp/include/kvikio/file_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ class FileHandle {
bool sync_default_stream = true)
{
if (_compat_mode) {
return posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
return detail::posix_device_read(
_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }

Expand Down Expand Up @@ -381,7 +382,8 @@ class FileHandle {
_nbytes = 0; // Invalidate the computed file size

if (_compat_mode) {
return posix_device_write(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
return detail::posix_device_write(
_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }

Expand Down Expand Up @@ -438,7 +440,8 @@ class FileHandle {
std::size_t file_offset,
std::size_t hostPtr_offset) -> std::size_t {
char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
return posix_host_read(_fd_direct_off, buf, size, file_offset, false);
return detail::posix_host_read<detail::PartialIO::NO>(
_fd_direct_off, buf, size, file_offset);
};

return parallel_io(op, buf, size, file_offset, task_size, 0);
Expand All @@ -450,7 +453,7 @@ class FileHandle {
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}
Expand Down Expand Up @@ -513,7 +516,8 @@ class FileHandle {
std::size_t file_offset,
std::size_t hostPtr_offset) -> std::size_t {
const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
return posix_host_write(_fd_direct_off, buf, size, file_offset, false);
return detail::posix_host_write<detail::PartialIO::NO>(
_fd_direct_off, buf, size, file_offset);
};

return parallel_io(op, buf, size, file_offset, task_size, 0);
Expand All @@ -525,7 +529,7 @@ class FileHandle {
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}
Expand Down
83 changes: 53 additions & 30 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,23 @@
#include <kvikio/shim/cuda.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {
namespace kvikio::detail {

namespace detail {
/**
* @brief Type of the IO operation.
*/
enum class IOOperationType : uint8_t {
READ, ///< POSIX read.
WRITE, ///< POSIX write.
};

/**
* @brief Specifies whether all requested bytes are to be processed or not.
*/
enum class PartialIO : uint8_t {
YES, ///< POSIX read/write is called only once, which may not process all bytes requested.
NO, ///< POSIX read/write is called repeatedly until all requested bytes are processed.
};

/**
* @brief Singleton class to retrieve a CUDA stream for device-host copying
Expand Down Expand Up @@ -60,12 +74,14 @@ class StreamsByThread {
auto key = std::make_pair(ctx, thd_id);

// Create a new stream if `ctx` doesn't have one.
if (_instance._streams.find(key) == _instance._streams.end()) {
if (auto search = _instance._streams.find(key); search == _instance._streams.end()) {
CUstream stream{};
CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT));
_instance._streams[key] = stream;
return stream;
} else {
return search->second;
}
return _instance._streams.at(key);
}

static CUstream get()
Expand All @@ -84,43 +100,44 @@ class StreamsByThread {
/**
* @brief Read or write host memory to or from disk using POSIX
*
* @tparam IsReadOperation Whether the operation is a read or a write
* @tparam Operation Whether the operation is a read or a write.
* @tparam PartialIOStatus Whether all requested data are processed or not. If `FULL`, all of
* `count` bytes are read or written.
* @param fd File descriptor
* @param buf Buffer to write
* @param count Number of bytes to write
* @param offset File offset
* @param partial If false, all of `count` bytes are read or written.
* @return The number of bytes read or written (always gather than zero)
*/
template <bool IsReadOperation>
ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool partial)
template <IOOperationType Operation, PartialIO PartialIOStatus>
ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset)
{
off_t cur_offset = offset;
size_t byte_remaining = count;
char* buffer = const_cast<char*>(static_cast<const char*>(buf));
while (byte_remaining > 0) {
ssize_t nbytes = 0;
if constexpr (IsReadOperation) {
if constexpr (Operation == IOOperationType::READ) {
nbytes = ::pread(fd, buffer, byte_remaining, cur_offset);
} else {
nbytes = ::pwrite(fd, buffer, byte_remaining, cur_offset);
}
if (nbytes == -1) {
const std::string name = IsReadOperation ? "pread" : "pwrite";
const std::string name = Operation == IOOperationType::READ ? "pread" : "pwrite";
if (errno == EBADF) {
throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" +
KVIKIO_STRINGIFY(__LINE__) + ": Operation not permitted"};
}
throw CUfileException{std::string{"POSIX error on " + name + " at: "} + __FILE__ + ":" +
KVIKIO_STRINGIFY(__LINE__) + ": " + strerror(errno)};
}
if constexpr (IsReadOperation) {
if constexpr (Operation == IOOperationType::READ) {
if (nbytes == 0) {
throw CUfileException{std::string{"POSIX error on pread at: "} + __FILE__ + ":" +
KVIKIO_STRINGIFY(__LINE__) + ": EOF"};
}
}
if (partial) { return nbytes; }
if constexpr (PartialIOStatus == PartialIO::YES) { return nbytes; }
buffer += nbytes; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
cur_offset += nbytes;
byte_remaining -= nbytes;
Expand All @@ -131,15 +148,15 @@ ssize_t posix_host_io(int fd, const void* buf, size_t count, off_t offset, bool
/**
* @brief Read or write device memory to or from disk using POSIX
*
* @tparam IsReadOperation Whether the operation is a read or a write
* @tparam Operation Whether the operation is a read or a write.
* @param fd File descriptor
* @param devPtr_base Device pointer to read or write to.
* @param size Number of bytes to read or write.
* @param file_offset Byte offset to the start of the file.
* @param devPtr_offset Byte offset to the start of the device pointer.
* @return Number of bytes read or written.
*/
template <bool IsReadOperation>
template <IOOperationType Operation>
std::size_t posix_device_io(int fd,
const void* devPtr_base,
std::size_t size,
Expand All @@ -158,15 +175,17 @@ std::size_t posix_device_io(int fd,
while (byte_remaining > 0) {
const off_t nbytes_requested = std::min(chunk_size2, byte_remaining);
ssize_t nbytes_got = nbytes_requested;
if constexpr (IsReadOperation) {
nbytes_got = posix_host_io<true>(fd, alloc.get(), nbytes_requested, cur_file_offset, true);
if constexpr (Operation == IOOperationType::READ) {
nbytes_got = posix_host_io<IOOperationType::READ, PartialIO::YES>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(devPtr, alloc.get(), nbytes_got, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
} else { // Is a write operation
CUDA_DRIVER_TRY(
cudaAPI::instance().MemcpyDtoHAsync(alloc.get(), devPtr, nbytes_requested, stream));
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
posix_host_io<false>(fd, alloc.get(), nbytes_requested, cur_file_offset, false);
posix_host_io<IOOperationType::WRITE, PartialIO::NO>(
fd, alloc.get(), nbytes_requested, cur_file_offset);
}
cur_file_offset += nbytes_got;
devPtr += nbytes_got;
Expand All @@ -175,26 +194,26 @@ std::size_t posix_device_io(int fd,
return size;
}

} // namespace detail

/**
* @brief Read from disk to host memory using POSIX
*
* If `size` or `file_offset` isn't aligned with `page_size` then
* `fd` cannot have been opened with the `O_DIRECT` flag.
*
* @tparam PartialIOStatus Whether all requested data are processed or not. If `FULL`, all of
* `count` bytes are read.
* @param fd File descriptor
* @param buf Base address of buffer in host memory.
* @param size Size in bytes to read.
* @param file_offset Offset in the file to read from.
* @param partial If false, all of `size` bytes are read.
* @return Size of bytes that were successfully read.
*/
inline std::size_t posix_host_read(
int fd, void* buf, std::size_t size, std::size_t file_offset, bool partial)
template <PartialIO PartialIOStatus>
std::size_t posix_host_read(int fd, void* buf, std::size_t size, std::size_t file_offset)
{
KVIKIO_NVTX_FUNC_RANGE("posix_host_read()", size);
return detail::posix_host_io<true>(fd, buf, size, convert_size2off(file_offset), partial);
return detail::posix_host_io<IOOperationType::READ, PartialIOStatus>(
fd, buf, size, convert_size2off(file_offset));
}

/**
Expand All @@ -203,18 +222,20 @@ inline std::size_t posix_host_read(
* If `size` or `file_offset` isn't aligned with `page_size` then
* `fd` cannot have been opened with the `O_DIRECT` flag.
*
* @tparam ioDataCompletionLevel Whether all requested data are processed or not. If `FULL`, all of
* `count` bytes are written.
* @param fd File descriptor
* @param buf Base address of buffer in host memory.
* @param size Size in bytes to write.
* @param file_offset Offset in the file to write to.
* @param partial If false, all of `size` bytes are written.
* @return Size of bytes that were successfully read.
*/
inline std::size_t posix_host_write(
int fd, const void* buf, std::size_t size, std::size_t file_offset, bool partial)
template <PartialIO PartialIOStatus>
std::size_t posix_host_write(int fd, const void* buf, std::size_t size, std::size_t file_offset)
{
KVIKIO_NVTX_FUNC_RANGE("posix_host_write()", size);
return detail::posix_host_io<false>(fd, buf, size, convert_size2off(file_offset), partial);
return detail::posix_host_io<IOOperationType::WRITE, PartialIOStatus>(
fd, buf, size, convert_size2off(file_offset));
}

/**
Expand All @@ -237,7 +258,8 @@ inline std::size_t posix_device_read(int fd,
std::size_t devPtr_offset)
{
KVIKIO_NVTX_FUNC_RANGE("posix_device_read()", size);
return detail::posix_device_io<true>(fd, devPtr_base, size, file_offset, devPtr_offset);
return detail::posix_device_io<IOOperationType::READ>(
fd, devPtr_base, size, file_offset, devPtr_offset);
}

/**
Expand All @@ -260,7 +282,8 @@ inline std::size_t posix_device_write(int fd,
std::size_t devPtr_offset)
{
KVIKIO_NVTX_FUNC_RANGE("posix_device_write()", size);
return detail::posix_device_io<false>(fd, devPtr_base, size, file_offset, devPtr_offset);
return detail::posix_device_io<IOOperationType::WRITE>(
fd, devPtr_base, size, file_offset, devPtr_offset);
}

} // namespace kvikio
} // namespace kvikio::detail

0 comments on commit 22668fa

Please sign in to comment.