Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to Set Thread Num by Cgroups when Users Did Not Do So #188

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions include/knowhere/comp/cgroup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <algorithm>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <string>
#include <vector>

#include "knowhere/log.h"

namespace knowhere {

namespace fs = std::filesystem;

/*
* Try to obtain the number of cpu from cgroups when limited by cpu quota and period.
* Only support cgroup v1 Linux system.
* If failed, fallback to std::thread::hardware_concurrency().
*/
class CgroupCpuReader {
private:
static auto
split(const std::string& s, char delimiter) -> std::vector<std::string> {
std::vector<std::string> tokens;
std::string token;
std::istringstream tokenStream(s);
while (std::getline(tokenStream, token, delimiter)) {
tokens.push_back(token);
}
return tokens;
}

// Returns cgroup path of cpu subsystem. The fields /proc/self/cgroup are separted by columns
// id:subsystem:path
// The subsystem may be a comma-separated list of subsystems.
static auto
getCgroupCpuPath() -> fs::path {
std::ifstream fin("/proc/self/cgroup");
std::string line;
while (std::getline(fin, line)) {
auto fields = split(line, ':');
if (fields.size() >= 3) {
if (auto sub_sys = split(fields[1], ',');
std::find(sub_sys.cbegin(), sub_sys.cend(), "cpu") != sub_sys.cend()) {
return fields[2];
}
}
}
throw std::runtime_error("Unable to get cgroup file");
}

// Finds the line which filesystem_type is cgroup and superReturns the root and mount path of cgroup cpu subsystem
// The fields /proc/self/mountinfo are separted by space
// (0)mount_id (1)parent_id (2)major:minor (3)root (4)mount_point (5)mount_options (6)optional_fields(var length)
// separator(-) filesystem_type mount_source super_options
static auto
getCgroupMountPath() -> std::pair<fs::path, fs::path> {
std::ifstream fin("/proc/self/mountinfo");
std::string line;
while (std::getline(fin, line)) {
auto fields = split(line, ' ');
if (auto it = std::find(fields.cbegin(), fields.cend(), "-"); it != fields.cend()) {
if (*std::next(it) == "cgroup") {
auto sub_systems = split(*std::next(it, 3), ',');
if (std::find(sub_systems.cbegin(), sub_systems.cend(), "cpu") != sub_systems.cend()) {
return {fields[3], fields[4]};
}
}
}
}
throw std::runtime_error("Unable to get mount info");
}

CgroupCpuReader() = default;
~CgroupCpuReader() = default;

public:
CgroupCpuReader(const CgroupCpuReader&) = delete;
CgroupCpuReader(CgroupCpuReader&&) noexcept = delete;
auto
operator=(const CgroupCpuReader&) -> CgroupCpuReader& = delete;
auto
operator=(CgroupCpuReader&&) noexcept -> CgroupCpuReader& = delete;

static auto
GetCpuNum() -> int {
#ifdef __linux__
try {
auto readIntFromFile = [](const fs::path& path) -> int {
std::ifstream fin(path);
int ret;
if (fin >> ret) {
return ret;
}
throw std::runtime_error("Failed to get int value from " + path.generic_string());
};
auto [root, mount_path] = getCgroupMountPath();

// Get cgroup path of cpu subsystem
// The root_path and cgroup_cpu_path sometimes maybe different, for example:
// root_path: /, cgroup_cpu_path: /user.slice
// Note that the base path of fs::relative is the second argument
auto cgroup_path = (mount_path / fs::relative(getCgroupCpuPath(), root));

// The quota and period file contains only one int value shows its cpu quota and period in ms
auto quota_file_path = cgroup_path / "cpu.cfs_quota_us";
auto period_file_path = cgroup_path / "cpu.cfs_period_us";

int quota = readIntFromFile(quota_file_path);
// if no limit on cpu quota
if (quota < 0) {
return std::thread::hardware_concurrency();
} else if (quota == 0) {
throw std::runtime_error("Cpu quota is 0");
}
int period = readIntFromFile(period_file_path);
int cpu_num = quota / period;
return std::max(cpu_num, 1);
} catch (const std::exception& e) {
LOG_KNOWHERE_WARNING_ << "Failed to get cpu num from cgroups: " << e.what()
<< ". Fallback to hardware concurrency";
return std::thread::hardware_concurrency();
}
#else
return std::thread::hardware_concurrency();
#endif
}
};
} // namespace knowhere
8 changes: 4 additions & 4 deletions include/knowhere/comp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
#include <thread>
#include <utility>

#include "cgroup.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/futures/Future.h"
#include "knowhere/log.h"

namespace knowhere {

class ThreadPool {
#ifdef __linux__
private:
Expand Down Expand Up @@ -138,7 +138,7 @@ class ThreadPool {
static std::shared_ptr<ThreadPool>
GetGlobalBuildThreadPool() {
if (global_build_thread_pool_size_ == 0) {
InitThreadPool(std::thread::hardware_concurrency(), global_build_thread_pool_size_);
InitThreadPool(CgroupCpuReader::GetCpuNum(), global_build_thread_pool_size_);
LOG_KNOWHERE_WARNING_ << "Global Build ThreadPool has not been initialized yet, init it with threads num: "
<< global_build_thread_pool_size_;
}
Expand All @@ -149,7 +149,7 @@ class ThreadPool {
static std::shared_ptr<ThreadPool>
GetGlobalSearchThreadPool() {
if (global_search_thread_pool_size_ == 0) {
InitThreadPool(std::thread::hardware_concurrency(), global_search_thread_pool_size_);
InitThreadPool(CgroupCpuReader::GetCpuNum(), global_search_thread_pool_size_);
LOG_KNOWHERE_WARNING_ << "Global Search ThreadPool has not been initialized yet, init it with threads num: "
<< global_search_thread_pool_size_;
}
Expand All @@ -163,7 +163,7 @@ class ThreadPool {
public:
explicit ScopedOmpSetter(int num_threads = 0) {
if (global_build_thread_pool_size_ == 0) { // this should not happen in prod
omp_before = omp_get_max_threads();
omp_before = CgroupCpuReader::GetCpuNum();
} else {
omp_before = global_build_thread_pool_size_;
}
Expand Down