Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevents potential deadlocks and improves thread safety. #2110

Merged
merged 6 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 58 additions & 39 deletions core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ void CommonConfigProvider::Stop() {

void CommonConfigProvider::LoadConfigFile() {
error_code ec;
lock_guard<mutex> pipelineInfomaplock(mContinuousPipelineInfoMapMux);
lock_guard<mutex> lockPipeline(mContinuousPipelineMux);
for (auto const& entry : filesystem::directory_iterator(mContinuousPipelineConfigDir, ec)) {
Json::Value detail;
if (LoadConfigDetailFromFile(entry, detail)) {
Expand All @@ -148,12 +146,13 @@ void CommonConfigProvider::LoadConfigFile() {
}
info.status = ConfigFeedbackStatus::APPLYING;
info.detail = detail.toStyledString();
mContinuousPipelineConfigInfoMap[info.name] = info;
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mContinuousPipelineConfigInfoMap[info.name] = info;
}
ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(info.name, this);
}
}
lock_guard<mutex> instanceInfomaplock(mInstanceInfoMapMux);
lock_guard<mutex> lockInstance(mInstanceMux);
for (auto const& entry : filesystem::directory_iterator(mInstanceSourceDir, ec)) {
Json::Value detail;
if (LoadConfigDetailFromFile(entry, detail)) {
Expand All @@ -165,7 +164,10 @@ void CommonConfigProvider::LoadConfigFile() {
}
info.status = ConfigFeedbackStatus::APPLYING;
info.detail = detail.toStyledString();
mInstanceConfigInfoMap[info.name] = info;
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mInstanceConfigInfoMap[info.name] = info;
}
ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(info.name, this);
}
}
Expand Down Expand Up @@ -283,19 +285,27 @@ configserver::proto::v2::HeartbeatRequest CommonConfigProvider::PrepareHeartbeat
heartbeatReq.set_running_status("running");
heartbeatReq.set_startup_time(mStartTime);

lock_guard<mutex> pipelineinfomaplock(mContinuousPipelineInfoMapMux);
for (const auto& configInfo : mContinuousPipelineConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_continuous_pipeline_configs());
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
for (const auto& configInfo : mContinuousPipelineConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_continuous_pipeline_configs());
}
}
lock_guard<mutex> instanceinfomaplock(mInstanceInfoMapMux);
for (const auto& configInfo : mInstanceConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_instance_configs());

{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
for (const auto& configInfo : mInstanceConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_instance_configs());
}
}

lock_guard<mutex> onetimeinfomaplock(mOnetimePipelineInfoMapMux);
for (const auto& configInfo : mOnetimePipelineConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_onetime_pipeline_configs());
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
for (const auto& configInfo : mOnetimePipelineConfigInfoMap) {
addConfigInfoToRequest(configInfo, heartbeatReq.add_onetime_pipeline_configs());
}
}

return heartbeatReq;
}

Expand Down Expand Up @@ -410,27 +420,35 @@ void CommonConfigProvider::UpdateRemotePipelineConfig(
"dir", sourceDir.string())("error code", ec.value())("error msg", ec.message()));
return;
}

// 保证每次往磁盘上dump文件的时候,config watcher不会读到一半的内容,相当于是个目录锁
lock_guard<mutex> lock(mContinuousPipelineMux);
lock_guard<mutex> infomaplock(mContinuousPipelineInfoMapMux);
for (const auto& config : configs) {
filesystem::path filePath = sourceDir / (config.name() + ".json");
if (config.version() == -1) {
mContinuousPipelineConfigInfoMap.erase(config.name());
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mContinuousPipelineConfigInfoMap.erase(config.name());
}
filesystem::remove(filePath, ec);
ConfigFeedbackReceiver::GetInstance().UnregisterContinuousPipelineConfig(config.name());
} else {
if (!DumpConfigFile(config, sourceDir)) {
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(),
.version = config.version(),
.status = ConfigFeedbackStatus::FAILED,
.detail = config.detail()};
}
continue;
}
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(),
.version = config.version(),
.status = ConfigFeedbackStatus::FAILED,
.status = ConfigFeedbackStatus::APPLYING,
.detail = config.detail()};
continue;
}
mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(),
.version = config.version(),
.status = ConfigFeedbackStatus::APPLYING,
.detail = config.detail()};
ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(config.name(), this);
}
}
Expand All @@ -448,35 +466,36 @@ void CommonConfigProvider::UpdateRemoteInstanceConfig(
"dir", sourceDir.string())("error code", ec.value())("error msg", ec.message()));
return;
}

// 保证每次往磁盘上dump文件的时候,config watcher不会读到一半的内容,相当于是个目录锁
lock_guard<mutex> lock(mInstanceMux);
lock_guard<mutex> infomaplock(mInstanceInfoMapMux);
for (const auto& config : configs) {
filesystem::path filePath = sourceDir / (config.name() + ".json");
if (config.version() == -1) {
mInstanceConfigInfoMap.erase(config.name());
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mInstanceConfigInfoMap.erase(config.name());
}
filesystem::remove(filePath, ec);
ConfigFeedbackReceiver::GetInstance().UnregisterInstanceConfig(config.name());
} else {
filesystem::path filePath = sourceDir / (config.name() + ".json");
if (config.version() == -1) {
mInstanceConfigInfoMap.erase(config.name());
filesystem::remove(filePath, ec);
ConfigFeedbackReceiver::GetInstance().UnregisterInstanceConfig(config.name());
} else {
if (!DumpConfigFile(config, sourceDir)) {
if (!DumpConfigFile(config, sourceDir)) {
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(),
.version = config.version(),
.status = ConfigFeedbackStatus::FAILED,
.detail = config.detail()};
continue;
}
continue;
}
{
lock_guard<mutex> lockInfoMap(mInfoMapMux);
mInstanceConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(),
.version = config.version(),
.status = ConfigFeedbackStatus::APPLYING,
.detail = config.detail()};
ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(config.name(), this);
}
ConfigFeedbackReceiver::GetInstance().RegisterInstanceConfig(config.name(), this);
}
}
}
Expand Down Expand Up @@ -537,7 +556,7 @@ bool CommonConfigProvider::FetchPipelineConfigFromServer(

void CommonConfigProvider::FeedbackContinuousPipelineConfigStatus(const std::string& name,
ConfigFeedbackStatus status) {
lock_guard<mutex> infomaplock(mContinuousPipelineInfoMapMux);
lock_guard<mutex> lockInfoMap(mInfoMapMux);
auto info = mContinuousPipelineConfigInfoMap.find(name);
if (info != mContinuousPipelineConfigInfoMap.end()) {
info->second.status = status;
Expand All @@ -547,7 +566,7 @@ void CommonConfigProvider::FeedbackContinuousPipelineConfigStatus(const std::str
ToStringView(status)));
}
void CommonConfigProvider::FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) {
lock_guard<mutex> infomaplock(mInstanceInfoMapMux);
lock_guard<mutex> lockInfoMap(mInfoMapMux);
auto info = mInstanceConfigInfoMap.find(name);
if (info != mInstanceConfigInfoMap.end()) {
info->second.status = status;
Expand All @@ -558,7 +577,7 @@ void CommonConfigProvider::FeedbackInstanceConfigStatus(const std::string& name,
void CommonConfigProvider::FeedbackOnetimePipelineConfigStatus(const std::string& type,
const std::string& name,
ConfigFeedbackStatus status) {
lock_guard<mutex> infomaplock(mOnetimePipelineInfoMapMux);
lock_guard<mutex> lockInfoMap(mInfoMapMux);
auto info = mOnetimePipelineConfigInfoMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name));
if (info != mOnetimePipelineConfigInfoMap.end()) {
info->second.status = status;
Expand Down
4 changes: 1 addition & 3 deletions core/config/common_provider/CommonConfigProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable {
mutable std::condition_variable mStopCV;
bool mConfigServerAvailable = false;

mutable std::mutex mInstanceInfoMapMux;
mutable std::mutex mContinuousPipelineInfoMapMux;
mutable std::mutex mOnetimePipelineInfoMapMux;
mutable std::mutex mInfoMapMux;

std::unordered_map<std::string, ConfigInfo> mContinuousPipelineConfigInfoMap;
std::unordered_map<std::string, ConfigInfo> mInstanceConfigInfoMap;
Expand Down
42 changes: 30 additions & 12 deletions core/config/feedbacker/ConfigFeedbackReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,46 @@ void ConfigFeedbackReceiver::UnregisterOnetimePipelineConfig(const std::string&

void ConfigFeedbackReceiver::FeedbackContinuousPipelineConfigStatus(const std::string& name,
ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mContinuousPipelineConfigFeedbackableMap.find(name);
if (iter != mContinuousPipelineConfigFeedbackableMap.end()) {
iter->second->FeedbackContinuousPipelineConfigStatus(name, status);
ConfigFeedbackable* feedbackable = nullptr;
{
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mContinuousPipelineConfigFeedbackableMap.find(name);
if (iter != mContinuousPipelineConfigFeedbackableMap.end()) {
feedbackable = iter->second;
}
}
if (feedbackable) {
feedbackable->FeedbackContinuousPipelineConfigStatus(name, status);
}
}

void ConfigFeedbackReceiver::FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mInstanceConfigFeedbackableMap.find(name);
if (iter != mInstanceConfigFeedbackableMap.end()) {
iter->second->FeedbackInstanceConfigStatus(name, status);
ConfigFeedbackable* feedbackable = nullptr;
{
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mInstanceConfigFeedbackableMap.find(name);
if (iter != mInstanceConfigFeedbackableMap.end()) {
feedbackable = iter->second;
}
}
if (feedbackable) {
feedbackable->FeedbackInstanceConfigStatus(name, status);
}
}

void ConfigFeedbackReceiver::FeedbackOnetimePipelineConfigStatus(const std::string& type,
const std::string& name,
ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mOnetimePipelineConfigFeedbackableMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name));
if (iter != mOnetimePipelineConfigFeedbackableMap.end()) {
iter->second->FeedbackOnetimePipelineConfigStatus(type, name, status);
ConfigFeedbackable* feedbackable = nullptr;
{
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mOnetimePipelineConfigFeedbackableMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name));
if (iter != mOnetimePipelineConfigFeedbackableMap.end()) {
feedbackable = iter->second;
}
}
if (feedbackable) {
feedbackable->FeedbackOnetimePipelineConfigStatus(type, name, status);
}
}

Expand Down
1 change: 1 addition & 0 deletions core/config/provider/ConfigProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ConfigProvider {
std::filesystem::path mInstanceSourceDir;
mutable std::mutex mContinuousPipelineMux;
mutable std::mutex mInstanceMux;
mutable std::mutex mOnetimePipelineMux;
};

} // namespace logtail
Loading