From 694268af8cfb4cad16d3a8a0987ef19d48b49bd3 Mon Sep 17 00:00:00 2001 From: Patrick Weizhi Xu Date: Thu, 9 Nov 2023 11:48:40 +0800 Subject: [PATCH] Fallback to Set Thread Num by Cgroups when Users Did Not Do So Signed-off-by: Patrick Weizhi Xu --- include/knowhere/comp/cgroup.h | 141 ++++++++++++++++++++++++++++ include/knowhere/comp/thread_pool.h | 111 +++++++++++++++++++++- 2 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 include/knowhere/comp/cgroup.h diff --git a/include/knowhere/comp/cgroup.h b/include/knowhere/comp/cgroup.h new file mode 100644 index 000000000..fce296689 --- /dev/null +++ b/include/knowhere/comp/cgroup.h @@ -0,0 +1,141 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace knowhere { + +namespace fs = std::filesystem; + +/* + * Try to obtain the number of cpu from cgroups when limited by cpu quota and period. + * Only support cgroup v1 Linux system. + * If failed, fallback to std::thread::hardware_concurrency(). + */ +class CgroupCpuReader { + private: + static auto + split(const std::string& s, char delimiter) -> std::vector { + std::vector tokens; + std::string token; + std::istringstream tokenStream(s); + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; + } + + // Returns cgroup path of cpu subsystem. The fields /proc/self/cgroup are separted by columns + // id:subsystem:path + // The subsystem may be a comma-separated list of subsystems. + static auto + getCgroupCpuPath() -> fs::path { + std::ifstream fin("/proc/self/cgroup"); + std::string line; + while (std::getline(fin, line)) { + auto fields = split(line, ':'); + if (fields.size() >= 3) { + if (auto sub_sys = split(fields[1], ','); + std::find(sub_sys.cbegin(), sub_sys.cend(), "cpu") != sub_sys.cend()) { + return fields[2]; + } + } + } + throw std::runtime_error("Unable to get cgroup file"); + } + + // Finds the line which filesystem_type is cgroup and superReturns the root and mount path of cgroup cpu subsystem + // The fields /proc/self/mountinfo are separted by space + // (0)mount_id (1)parent_id (2)major:minor (3)root (4)mount_point (5)mount_options (6)optional_fields(var length) + // separator(-) filesystem_type mount_source super_options + static auto + getCgroupMountPath() -> std::pair { + std::ifstream fin("/proc/self/mountinfo"); + std::string line; + while (std::getline(fin, line)) { + auto fields = split(line, ' '); + if (auto it = std::find(fields.cbegin(), fields.cend(), "-"); it != fields.cend()) { + if (*std::next(it) == "cgroup") { + auto sub_systems = split(*std::next(it, 3), ','); + if (std::find(sub_systems.cbegin(), sub_systems.cend(), "cpu") != sub_systems.cend()) { + return {fields[3], fields[4]}; + } + } + } + } + throw std::runtime_error("Unable to get mount info"); + } + + CgroupCpuReader() = default; + ~CgroupCpuReader() = default; + + public: + CgroupCpuReader(const CgroupCpuReader&) = delete; + CgroupCpuReader(CgroupCpuReader&&) noexcept = delete; + auto + operator=(const CgroupCpuReader&) -> CgroupCpuReader& = delete; + auto + operator=(CgroupCpuReader&&) noexcept -> CgroupCpuReader& = delete; + + static auto + GetCpuNum() -> int { +#ifdef __linux__ + try { + auto readIntFromFile = [](const fs::path& path) -> int { + std::ifstream fin(path); + int ret; + if (fin >> ret) { + return ret; + } + throw std::runtime_error("Failed to get int value from " + path.generic_string()); + }; + auto [root, mount_path] = getCgroupMountPath(); + + // Get cgroup path of cpu subsystem + // The root_path and cgroup_cpu_path sometimes maybe different, for example: + // root_path: /, cgroup_cpu_path: /user.slice + // Note that the base path of fs::relative is the second argument + auto cgroup_path = (mount_path / fs::relative(getCgroupCpuPath(), root)); + + // The quota and period file contains only one int value shows its cpu quota and period in ms + auto quota_file_path = cgroup_path / "cpu.cfs_quota_us"; + auto period_file_path = cgroup_path / "cpu.cfs_period_us"; + + int quota = readIntFromFile(quota_file_path); + // if no limit on cpu quota + if (quota < 0) { + return std::thread::hardware_concurrency(); + } else if (quota == 0) { + throw std::runtime_error("Cpu quota is 0"); + } + int period = readIntFromFile(period_file_path); + int cpu_num = quota / period; + return std::max(cpu_num, 1); + } catch (const std::exception& e) { + LOG_KNOWHERE_WARNING_ << "Failed to get cpu num from cgroups: " << e.what() + << ". Fallback to hardware concurrency"; + return std::thread::hardware_concurrency(); + } +#else + return std::thread::hardware_concurrency(); +#endif + } +}; +} // namespace knowhere diff --git a/include/knowhere/comp/thread_pool.h b/include/knowhere/comp/thread_pool.h index a00920de6..e8a232208 100644 --- a/include/knowhere/comp/thread_pool.h +++ b/include/knowhere/comp/thread_pool.h @@ -14,18 +14,123 @@ #include #include +#include #include #include +#include +#include +#include #include +#include +#include +#include #include #include +#include +#include "cgroup.h" #include "folly/executors/CPUThreadPoolExecutor.h" #include "folly/futures/Future.h" #include "knowhere/log.h" namespace knowhere { +namespace fs = std::filesystem; +class CgroupCpuReader { + private: + static auto + split(const std::string& s, char delimiter) -> std::vector { + std::vector tokens; + std::string token; + std::istringstream tokenStream(s); + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; + } + + static auto + getCgroupCpuPath() -> fs::path { + std::ifstream fin("/proc/self/cgroup"); + std::string line; + while (std::getline(fin, line)) { + auto fields = split(line, ':'); + if (fields.size() >= 3) { + if (auto sub_sys = split(fields[1], ','); + std::find(sub_sys.cbegin(), sub_sys.cend(), "cpu") != sub_sys.cend()) { + return fields[2]; + } + } + } + throw std::runtime_error("Unable to get cgroup file"); + } + + static auto + getCgroupMountPath() -> std::pair { + std::ifstream fin("/proc/self/mountinfo"); + std::string line; + while (std::getline(fin, line)) { + auto fields = split(line, ' '); + if (auto it = std::find(fields.cbegin(), fields.cend(), "-"); it != fields.cend()) { + if (*std::next(it) == "cgroup") { + auto sub_systems = split(*std::next(it, 3), ','); + if (std::find(sub_systems.cbegin(), sub_systems.cend(), "cpu") != sub_systems.cend()) { + return {fields[3], fields[4]}; + } + } + } + } + throw std::runtime_error("Unable to get mount info"); + } + + CgroupCpuReader() = default; + ~CgroupCpuReader() = default; + + public: + CgroupCpuReader(const CgroupCpuReader&) = delete; + CgroupCpuReader(CgroupCpuReader&&) noexcept = delete; + auto + operator=(const CgroupCpuReader&) -> CgroupCpuReader& = delete; + auto + operator=(CgroupCpuReader&&) noexcept -> CgroupCpuReader& = delete; + + static auto + GetCpuNum() -> int { +#ifdef __linux__ + try { + auto readIntFromFile = [](const fs::path& path) -> int { + std::ifstream fin(path); + int ret; + if (fin >> ret) { + return ret; + } + throw std::runtime_error("Failed to get int value from " + path.generic_string()); + }; + auto [root, mount_path] = getCgroupMountPath(); + auto cgroup_path = (mount_path / fs::relative(getCgroupCpuPath(), root)); + auto quota_file_path = cgroup_path / "cpu.cfs_quota_us"; + auto period_file_path = cgroup_path / "cpu.cfs_period_us"; + int quota = readIntFromFile(quota_file_path); + // if no limit on cpu quota + if (quota < 0) { + return std::thread::hardware_concurrency(); + } else if (quota == 0) { + throw std::runtime_error("Cpu quota is 0"); + } + int period = readIntFromFile(period_file_path); + int cpu_num = quota / period; + return std::max(cpu_num, 1); + } catch (const std::exception& e) { + LOG_KNOWHERE_WARNING_ << "Failed to get cpu num from cgroups: " << e.what() + << ". Fallback to hardware concurrency"; + return std::thread::hardware_concurrency(); + } +#else + return std::thread::hardware_concurrency(); +#endif + } +}; + class ThreadPool { #ifdef __linux__ private: @@ -138,7 +243,7 @@ class ThreadPool { static std::shared_ptr GetGlobalBuildThreadPool() { if (global_build_thread_pool_size_ == 0) { - InitThreadPool(std::thread::hardware_concurrency(), global_build_thread_pool_size_); + InitThreadPool(CgroupCpuReader::GetCpuNum(), global_build_thread_pool_size_); LOG_KNOWHERE_WARNING_ << "Global Build ThreadPool has not been initialized yet, init it with threads num: " << global_build_thread_pool_size_; } @@ -149,7 +254,7 @@ class ThreadPool { static std::shared_ptr GetGlobalSearchThreadPool() { if (global_search_thread_pool_size_ == 0) { - InitThreadPool(std::thread::hardware_concurrency(), global_search_thread_pool_size_); + InitThreadPool(CgroupCpuReader::GetCpuNum(), global_search_thread_pool_size_); LOG_KNOWHERE_WARNING_ << "Global Search ThreadPool has not been initialized yet, init it with threads num: " << global_search_thread_pool_size_; } @@ -163,7 +268,7 @@ class ThreadPool { public: explicit ScopedOmpSetter(int num_threads = 0) { if (global_build_thread_pool_size_ == 0) { // this should not happen in prod - omp_before = omp_get_max_threads(); + omp_before = CgroupCpuReader::GetCpuNum(); } else { omp_before = global_build_thread_pool_size_; }