在并发编程中,死锁是一个常见且棘手的问题。它会导致程序陷入无限等待状态,无法继续执行,甚至可能导致系统崩溃。理解死锁的成因及其解决方案是编写健壮并发程序的关键。
死锁是指两个或多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象。此时,如果没有外力干预,这些线程将永远无法继续执行。
死锁的发生必须同时满足以下四个条件:
- 互斥条件:资源一次只能被一个线程占用。
- 请求与保持条件:线程已经持有一个资源,但同时又在请求其他资源。
- 不可剥夺条件:资源在被线程占用期间,不能被其他线程强行剥夺。
- 循环等待条件:多个线程之间形成一个循环链,每个线程都在等待下一个线程所占用的资源。
在多线程编程中,多个线程需要同时访问多个共享资源时,如果锁的获取顺序不一致,就可能导致死锁。例如,线程A先获取锁1再获取锁2,而线程B先获取锁2再获取锁1,这种情况下就可能发生死锁。
当多个线程以不同的顺序获取锁时,可能会形成循环等待条件,从而导致死锁。例如,线程A持有锁1并等待锁2,而线程B持有锁2并等待锁1,这样就形成了一个循环等待链,导致死锁。
资源竞争是指多个线程同时访问共享资源,而共享资源通常需要通过锁来保护,以确保线程安全。
锁的获取顺序是避免死锁的关键。如果所有线程都以相同的顺序获取锁,就不会形成循环等待条件,从而避免死锁。
线程间的资源依赖关系是指线程在执行过程中需要获取多个资源,而这些资源可能已经被其他线程占用。如果线程之间的资源依赖关系形成了一个环,就可能导致死锁。
循环等待条件是指多个线程之间形成了一个循环链,每个线程都在等待下一个线程所占用的资源。例如,线程A等待线程B,线程B等待线程C,线程C等待线程A,这样就形成了一个循环等待链,导致死锁。
假设我们有两个共享资源,分别由两个互斥锁保护。两个线程需要同时访问这两个资源,但它们获取锁的顺序不同,从而导致死锁。
以下是一个简单的C++代码示例,展示了多锁顺序导致的死锁问题:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
void threadA() {
// 线程A先获取mutex1,再获取mutex2
mutex1.lock();
std::cout << "Thread A acquired mutex1" << std::endl;
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex2.lock();
std::cout << "Thread A acquired mutex2" << std::endl;
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
// 线程B先获取mutex2,再获取mutex1
mutex2.lock();
std::cout << "Thread B acquired mutex2" << std::endl;
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex1.lock();
std::cout << "Thread B acquired mutex1" << std::endl;
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex1.unlock();
mutex2.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
- 线程A:先获取
mutex1
,再获取mutex2
。 - 线程B:先获取
mutex2
,再获取mutex1
。
当两个线程同时运行时,可能会出现以下情况:
- 线程A获取了
mutex1
,线程B获取了mutex2
。 - 线程A尝试获取
mutex2
,但mutex2
已经被线程B占用,因此线程A等待。 - 线程B尝试获取
mutex1
,但mutex1
已经被线程A占用,因此线程B等待。
这样就形成了循环等待条件,导致死锁。
在并发编程中,多锁的使用场景非常广泛,尤其是在需要保护多个共享资源的场景下。以下是一些常见的多锁应用场景:
在数据库事务中,多个表的操作通常需要同时锁定多个资源。例如,转账操作需要同时锁定两个账户的记录,以确保数据的一致性。
#include <iostream>
#include <thread>
#include <mutex>
struct Account {
std::mutex mtx;
double balance;
};
void transfer(Account& from, Account& to, double amount) {
std::lock(from.mtx, to.mtx); // 一次性获取所有锁
std::lock_guard<std::mutex> lock1(from.mtx, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.mtx, std::adopt_lock);
from.balance -= amount;
to.balance += amount;
std::cout << "Transferred " << amount << " from Account A to Account B" << std::endl;
}
int main() {
Account accountA = {std::mutex(), 1000};
Account accountB = {std::mutex(), 2000};
std::thread t1([&]() { transfer(accountA, accountB, 100); });
std::thread t2([&]() { transfer(accountB, accountA, 50); });
t1.join();
t2.join();
return 0;
}
-
std::lock(from.mtx, to.mtx)
:- 一次性获取两个账户的锁,避免死锁。
-
std::lock_guard<std::mutex> lock1(from.mtx, std::adopt_lock)
:- 使用
std::lock_guard
管理锁的生命周期。
- 使用
-
转账操作:
- 从
from
账户扣除金额,增加到to
账户。
- 从
在并发数据结构中,多个锁通常用于保护不同的数据区域。例如,一个并发哈希表可能需要为每个桶使用一个独立的锁,以提高并发性能。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <unordered_map>
class ConcurrentHashMap {
public:
ConcurrentHashMap(size_t bucket_count) : buckets_(bucket_count) {}
void insert(int key, int value) {
size_t bucket_index = key % buckets_.size();
std::lock_guard<std::mutex> lock(buckets_[bucket_index].mtx);
buckets_[bucket_index].map[key] = value;
}
int get(int key) {
size_t bucket_index = key % buckets_.size();
std::lock_guard<std::mutex> lock(buckets_[bucket_index].mtx);
return buckets_[bucket_index].map[key];
}
private:
struct Bucket {
std::mutex mtx;
std::unordered_map<int, int> map;
};
std::vector<Bucket> buckets_;
};
int main() {
ConcurrentHashMap map(10);
std::thread t1([&]() { map.insert(1, 100); });
std::thread t2([&]() { map.insert(2, 200); });
t1.join();
t2.join();
std::cout << "Value of key 1: " << map.get(1) << std::endl;
std::cout << "Value of key 2: " << map.get(2) << std::endl;
return 0;
}
-
ConcurrentHashMap
类:- 使用一个
std::vector<Bucket>
来存储多个桶,每个桶包含一个std::mutex
和一个std::unordered_map
。
- 使用一个
-
insert()
方法:- 根据键计算桶的索引,并锁定对应的桶。
- 在锁的保护下,将键值对插入到桶的
std::unordered_map
中。
-
get()
方法:- 根据键计算桶的索引,并锁定对应的桶。
- 在锁的保护下,从桶的
std::unordered_map
中获取值。
在并发队列中,多个锁通常用于保护队列的不同部分。例如,一个双端队列可能需要为队列的头部和尾部使用独立的锁,以提高并发性能。
#include <iostream>
#include <thread>
#include <mutex>
#include <deque>
class ConcurrentDeque {
public:
void push_front(int value) {
std::lock_guard<std::mutex> lock(head_mtx_);
deque_.push_front(value);
}
void push_back(int value) {
std::lock_guard<std::mutex> lock(tail_mtx_);
deque_.push_back(value);
}
int pop_front() {
std::lock_guard<std::mutex> lock(head_mtx_);
if (deque_.empty()) {
throw std::runtime_error("Deque is empty");
}
int value = deque_.front();
deque_.pop_front();
return value;
}
int pop_back() {
std::lock_guard<std::mutex> lock(tail_mtx_);
if (deque_.empty()) {
throw std::runtime_error("Deque is empty");
}
int value = deque_.back();
deque_.pop_back();
return value;
}
private:
std::mutex head_mtx_;
std::mutex tail_mtx_;
std::deque<int> deque_;
};
int main() {
ConcurrentDeque deque;
std::thread t1([&]() { deque.push_front(100); });
std::thread t2([&]() { deque.push_back(200); });
t1.join();
t2.join();
std::cout << "Pop front: " << deque.pop_front() << std::endl;
std::cout << "Pop back: " << deque.pop_back() << std::endl;
return 0;
}
-
ConcurrentDeque
类:- 使用两个
std::mutex
分别保护队列的头部和尾部。
- 使用两个
-
push_front()
方法:- 锁定
head_mtx_
,在队列的头部插入元素。
- 锁定
-
push_back()
方法:- 锁定
tail_mtx_
,在队列的尾部插入元素。
- 锁定
-
pop_front()
方法:- 锁定
head_mtx_
,从队列的头部弹出元素。
- 锁定
-
pop_back()
方法:- 锁定
tail_mtx_
,从队列的尾部弹出元素。
- 锁定
在并发文件系统中,多个锁通常用于保护文件的不同部分。例如,一个文件可能需要为文件的头部和尾部使用独立的锁,以提高并发性能。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
class ConcurrentFile {
public:
void write_head(const std::vector<char>& data) {
std::lock_guard<std::mutex> lock(head_mtx_);
// 模拟写入文件头部
std::cout << "Writing to file head: " << data.size() << " bytes" << std::endl;
}
void write_tail(const std::vector<char>& data) {
std::lock_guard<std::mutex> lock(tail_mtx_);
// 模拟写入文件尾部
std::cout << "Writing to file tail: " << data.size() << " bytes" << std::endl;
}
private:
std::mutex head_mtx_;
std::mutex tail_mtx_;
};
int main() {
ConcurrentFile file;
std::thread t1([&]() { file.write_head({'H', 'E', 'A', 'D'}); });
std::thread t2([&]() { file.write_tail({'T', 'A', 'I', 'L'}); });
t1.join();
t2.join();
return 0;
}
-
ConcurrentFile
类:- 使用两个
std::mutex
分别保护文件的头部和尾部。
- 使用两个
-
write_head()
方法:- 锁定
head_mtx_
,模拟写入文件头部。
- 锁定
-
write_tail()
方法:- 锁定
tail_mtx_
,模拟写入文件尾部。
- 锁定
以下是一个简单的多锁顺序死锁的代码示例:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
void threadA() {
mutex1.lock(); // 线程A先获取mutex1
std::cout << "Thread A acquired mutex1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
mutex2.lock(); // 线程A尝试获取mutex2
std::cout << "Thread A acquired mutex2" << std::endl;
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
mutex2.lock(); // 线程B先获取mutex2
std::cout << "Thread B acquired mutex2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
mutex1.lock(); // 线程B尝试获取mutex1
std::cout << "Thread B acquired mutex1" << std::endl;
mutex1.unlock();
mutex2.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
- 线程A:先获取
mutex1
,再尝试获取mutex2
。 - 线程B:先获取
mutex2
,再尝试获取mutex1
。
当两个线程同时运行时,可能会出现以下情况:
- 线程A获取了
mutex1
,线程B获取了mutex2
。 - 线程A尝试获取
mutex2
,但mutex2
已经被线程B占用,因此线程A等待。 - 线程B尝试获取
mutex1
,但mutex1
已经被线程A占用,因此线程B等待。
这样就形成了循环等待条件,导致死锁。
以下是一个复杂场景下的多锁顺序死锁的代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mutex1;
std::mutex mutex2;
std::mutex mutex3;
void threadA() {
mutex1.lock(); // 线程A先获取mutex1
std::cout << "Thread A acquired mutex1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
mutex2.lock(); // 线程A尝试获取mutex2
std::cout << "Thread A acquired mutex2" << std::endl;
mutex3.lock(); // 线程A尝试获取mutex3
std::cout << "Thread A acquired mutex3" << std::endl;
mutex3.unlock();
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
mutex2.lock(); // 线程B先获取mutex2
std::cout << "Thread B acquired mutex2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
mutex3.lock(); // 线程B尝试获取mutex3
std::cout << "Thread B acquired mutex3" << std::endl;
mutex1.lock(); // 线程B尝试获取mutex1
std::cout << "Thread B acquired mutex1" << std::endl;
mutex1.unlock();
mutex3.unlock();
mutex2.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
- 线程A:先获取
mutex1
,再获取mutex2
,最后获取mutex3
。 - 线程B:先获取
mutex2
,再获取mutex3
,最后获取mutex1
。
当两个线程同时运行时,可能会出现以下情况:
- 线程A获取了
mutex1
,线程B获取了mutex2
。 - 线程A尝试获取
mutex2
,但mutex2
已经被线程B占用,因此线程A等待。 - 线程B尝试获取
mutex3
,但mutex3
已经被线程A占用,因此线程B等待。
这样就形成了循环等待条件,导致死锁。
为了避免死锁,可以为所有线程定义一个固定的锁获取顺序。所有线程都必须按照相同的顺序获取锁,从而避免循环等待条件。
通过强制所有线程按照相同的顺序获取锁,可以避免死锁的发生。例如,所有线程都先获取mutex1
,再获取mutex2
,最后获取mutex3
。
通过定义统一的锁获取顺序,所有线程都按照相同的顺序获取锁,避免了循环等待条件,从而避免了死锁。
以下是一个使用统一锁获取顺序的代码示例:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
std::mutex mutex3;
void threadA() {
mutex1.lock(); // 线程A先获取mutex1
mutex2.lock(); // 线程A再获取mutex2
mutex3.lock(); // 线程A最后获取mutex3
std::cout << "Thread A acquired all locks" << std::endl;
mutex3.unlock();
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
mutex1.lock(); // 线程B先获取mutex1
mutex2.lock(); // 线程B再获取mutex2
mutex3.lock(); // 线程B最后获取mutex3
std::cout << "Thread B acquired all locks" << std::endl;
mutex3.unlock();
mutex2.unlock();
mutex1.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
-
线程A:
- 先获取
mutex1
。 - 再获取
mutex2
。 - 最后获取
mutex3
。
- 先获取
-
线程B:
- 先获取
mutex1
。 - 再获取
mutex2
。 - 最后获取
mutex3
。
- 先获取
通过这种方式,所有线程都按照相同的顺序获取锁,避免了循环等待条件,从而避免了死锁。
层次锁是一种将锁分层管理的策略。每个锁都有一个层次编号,线程只能获取比当前持有锁层次更高的锁。
以下是一个简单的层次锁实现:
#include <iostream>
#include <thread>
#include <mutex>
#include <stdexcept>
class HierarchicalMutex {
public:
explicit HierarchicalMutex(int level) : level_(level), previous_level_(0) {}
void lock() {
check_for_hierarchy_violation();
internal_mutex_.lock();
update_hierarchy_value();
}
void unlock() {
if (this_thread_hierarchy_value_ != level_) {
throw std::logic_error("Mutex hierarchy violated");
}
this_thread_hierarchy_value_ = previous_level_;
internal_mutex_.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex_.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
private:
void check_for_hierarchy_violation() {
if (this_thread_hierarchy_value_ <= level_) {
throw std::logic_error("Mutex hierarchy violated");
}
}
void update_hierarchy_value() {
previous_level_ = this_thread_hierarchy_value_;
this_thread_hierarchy_value_ = level_;
}
std::mutex internal_mutex_;
const int level_;
int previous_level_;
static thread_local int this_thread_hierarchy_value_;
};
thread_local int HierarchicalMutex::this_thread_hierarchy_value_ = INT_MAX;
HierarchicalMutex hmutex1(1000);
HierarchicalMutex hmutex2(2000);
void threadA() {
hmutex1.lock(); // 线程A先获取hmutex1
std::cout << "Thread A acquired hmutex1" << std::endl;
hmutex2.lock(); // 线程A再获取hmutex2
std::cout << "Thread A acquired hmutex2" << std::endl;
hmutex2.unlock();
hmutex1.unlock();
}
void threadB() {
hmutex1.lock(); // 线程B先获取hmutex1
std::cout << "Thread B acquired hmutex1" << std::endl;
hmutex2.lock(); // 线程B再获取hmutex2
std::cout << "Thread B acquired hmutex2" << std::endl;
hmutex2.unlock();
hmutex1.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
class HierarchicalMutex {
public:
explicit HierarchicalMutex(int level) : level_(level), previous_level_(0) {}
void lock();
void unlock();
bool try_lock();
private:
void check_for_hierarchy_violation();
void update_hierarchy_value();
std::mutex internal_mutex_;
const int level_;
int previous_level_;
static thread_local int this_thread_hierarchy_value_;
};
HierarchicalMutex
类:这是一个层次锁的实现类,包含以下成员:internal_mutex_
:一个内部互斥锁,用于实际的锁操作。level_
:锁的层次级别,用于标识锁的优先级。previous_level_
:用于保存线程之前持有的锁的层次级别。this_thread_hierarchy_value_
:一个线程局部的静态变量,用于记录当前线程的层次级别。
void lock() {
check_for_hierarchy_violation();
internal_mutex_.lock();
update_hierarchy_value();
}
lock()
方法:check_for_hierarchy_violation()
:在获取锁之前,检查当前线程的层次级别是否违反了层次锁的规则。internal_mutex_.lock()
:如果层次检查通过,则获取内部互斥锁。update_hierarchy_value()
:更新当前线程的层次级别。
void unlock() {
if (this_thread_hierarchy_value_ != level_) {
throw std::logic_error("Mutex hierarchy violated");
}
this_thread_hierarchy_value_ = previous_level_;
internal_mutex_.unlock();
}
unlock()
方法:if (this_thread_hierarchy_value_ != level_)
:在释放锁之前,检查当前线程的层次级别是否与锁的层次级别一致。如果不一致,说明层次锁的使用顺序被破坏,抛出异常。this_thread_hierarchy_value_ = previous_level_
:恢复线程的层次级别为之前持有的锁的层次级别。internal_mutex_.unlock()
:释放内部互斥锁。
bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex_.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
try_lock()
方法:check_for_hierarchy_violation()
:在尝试获取锁之前,检查当前线程的层次级别是否违反了层次锁的规则。internal_mutex_.try_lock()
:尝试获取内部互斥锁,如果获取失败,返回false
。update_hierarchy_value()
:如果成功获取锁,更新当前线程的层次级别。return true
:如果成功获取锁,返回true
。
void check_for_hierarchy_violation() {
if (this_thread_hierarchy_value_ <= level_) {
throw std::logic_error("Mutex hierarchy violated");
}
}
check_for_hierarchy_violation()
方法:if (this_thread_hierarchy_value_ <= level_)
:检查当前线程的层次级别是否小于或等于要获取的锁的层次级别。如果是,说明层次锁的使用顺序被破坏,抛出异常。
void update_hierarchy_value() {
previous_level_ = this_thread_hierarchy_value_;
this_thread_hierarchy_value_ = level_;
}
update_hierarchy_value()
方法:previous_level_ = this_thread_hierarchy_value_
:保存当前线程的层次级别到previous_level_
。this_thread_hierarchy_value_ = level_
:更新当前线程的层次级别为当前锁的层次级别。
thread_local int HierarchicalMutex::this_thread_hierarchy_value_ = INT_MAX;
thread_local
变量:this_thread_hierarchy_value_
:这是一个线程局部的静态变量,用于记录当前线程的层次级别。初始值为INT_MAX
,表示线程尚未持有任何锁。
void threadA() {
hmutex1.lock(); // 线程A先获取hmutex1
std::cout << "Thread A acquired hmutex1" << std::endl;
hmutex2.lock(); // 线程A再获取hmutex2
std::cout << "Thread A acquired hmutex2" << std::endl;
hmutex2.unlock();
hmutex1.unlock();
}
void threadB() {
hmutex1.lock(); // 线程B先获取hmutex1
std::cout << "Thread B acquired hmutex1" << std::endl;
hmutex2.lock(); // 线程B再获取hmutex2
std::cout << "Thread B acquired hmutex2" << std::endl;
hmutex2.unlock();
hmutex1.unlock();
}
threadA()
和threadB()
:- 两个线程都按照相同的顺序获取锁,先获取
hmutex1
,再获取hmutex2
。 - 由于层次锁的实现,线程在获取锁时会检查层次级别,确保不会违反层次锁的规则。
- 两个线程都按照相同的顺序获取锁,先获取
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
main()
函数:- 创建两个线程
t1
和t2
,分别执行threadA
和threadB
。 - 通过
join()
等待线程执行完成。
- 创建两个线程
层次锁通过强制线程按照锁的层次顺序获取锁,避免了循环等待条件,从而避免了死锁。通过check_for_hierarchy_violation()
方法,层次锁确保线程不会违反锁的层次规则,从而保证了线程安全。
定时锁是一种在获取锁时设置超时时间的策略。如果线程在指定时间内无法获取锁,则放弃获取锁并释放已持有的锁。
- 优点:可以避免死锁,因为线程在超时后会放弃获取锁。
- 缺点:可能会导致资源竞争加剧,因为线程在超时后可能会重新尝试获取锁。
定时锁通过设置超时时间,避免了线程无限等待锁的情况,从而避免了死锁。
以下是一个使用定时锁的代码示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <atomic>
std::timed_mutex timed_mutex1;
std::timed_mutex timed_mutex2;
std::atomic<bool> stop_flag(false); // 添加退出标志
void threadA() {
while (!stop_flag) { // 检查退出标志
if (timed_mutex1.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread A acquired timed_mutex1" << std::endl;
if (timed_mutex2.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread A acquired timed_mutex2" << std::endl;
// 模拟工作
std::this_thread::sleep_for(std::chrono::milliseconds(50));
timed_mutex2.unlock();
}
timed_mutex1.unlock();
}
}
}
void threadB() {
while (!stop_flag) { // 检查退出标志
if (timed_mutex2.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread B acquired timed_mutex2" << std::endl;
if (timed_mutex1.try_lock_for(std::chrono::milliseconds(100))) {
std::cout << "Thread B acquired timed_mutex1" << std::endl;
// 模拟工作
std::this_thread::sleep_for(std::chrono::milliseconds(50));
timed_mutex1.unlock();
}
timed_mutex2.unlock();
}
}
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
// 模拟运行一段时间后退出
std::this_thread::sleep_for(std::chrono::seconds(2));
stop_flag = true; // 设置退出标志
t1.join();
t2.join();
std::cout << "Program terminated." << std::endl;
return 0;
}
-
线程A:
- 使用
try_lock_for
尝试获取timed_mutex1
,超时时间为100毫秒。 - 如果成功获取
timed_mutex1
,再尝试获取timed_mutex2
,超时时间为100毫秒。 - 如果成功获取
timed_mutex2
,执行工作并释放锁。
- 使用
-
线程B:
- 使用
try_lock_for
尝试获取timed_mutex2
,超时时间为100毫秒。 - 如果成功获取
timed_mutex2
,再尝试获取timed_mutex1
,超时时间为100毫秒。 - 如果成功获取
timed_mutex1
,执行工作并释放锁。
- 使用
-
std::atomic<bool> stop_flag(false)
:
- 添加一个原子布尔变量
stop_flag
,用于控制线程的退出。
while (!stop_flag)
:
- 在
threadA
和threadB
中,检查stop_flag
的值,如果为true
,则退出循环。
std::this_thread::sleep_for(std::chrono::seconds(2))
:
- 主线程模拟运行 2 秒后设置
stop_flag
为true
,通知线程退出。
stop_flag = true
:
- 主线程设置
stop_flag
为true
,通知线程退出。
t1.join()
和t2.join()
:
- 主线程等待
t1
和t2
线程完成。
无锁编程是一种不使用锁的并发编程技术,通过原子操作和内存顺序控制来实现线程安全。
无锁编程适用于对性能要求极高且锁竞争激烈的场景。
无锁编程不使用锁,因此不会出现死锁问题。
以下是一个简单的无锁编程示例,使用std::atomic
实现线程安全的计数器:
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 100000; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter value: " << counter.load() << std::endl;
return 0;
}
-
std::atomic<int> counter(0)
:- 定义一个原子整数
counter
,初始值为0。
- 定义一个原子整数
-
fetch_add(1, std::memory_order_relaxed)
:- 使用原子操作
fetch_add
对counter
进行加1操作,std::memory_order_relaxed
表示不使用内存屏障。
- 使用原子操作
-
counter.load()
:- 使用
load
方法读取counter
的值。
- 使用
通过使用原子操作,避免了锁的使用,从而避免了死锁问题。
std::lock
是C++标准库提供的一个函数,可以一次性获取多个互斥锁,避免死锁。
以下是一个使用std::lock
的代码示例:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
std::mutex mutex3;
void threadA() {
std::lock(mutex1, mutex2, mutex3); // 一次性获取所有锁
std::cout << "Thread A acquired all locks" << std::endl;
mutex3.unlock();
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
std::lock(mutex1, mutex2, mutex3); // 一次性获取所有锁
std::cout << "Thread B acquired all locks" << std::endl;
mutex3.unlock();
mutex2.unlock();
mutex1.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
std::lock
通过一次性获取所有锁,避免了线程之间的循环等待条件,从而避免了死锁。
-
std::lock(mutex1, mutex2, mutex3)
:- 一次性获取
mutex1
、mutex2
和mutex3
,避免了循环等待条件。
- 一次性获取
-
mutex3.unlock()
:- 释放
mutex3
。
- 释放
-
mutex2.unlock()
:- 释放
mutex2
。
- 释放
-
mutex1.unlock()
:- 释放
mutex1
。
- 释放
通过这种方式,所有线程都一次性获取所有锁,避免了循环等待条件,从而避免了死锁。
在并发编程中,死锁是一个常见的问题,尤其是在多线程或多进程环境中。为了帮助开发者检测和诊断死锁问题,许多工具和库被开发出来。以下是一些常见的死锁检测工具:
-
Valgrind:
- 介绍:Valgrind 是一个开源的内存调试和性能分析工具,支持多种平台。它可以通过
helgrind
工具来检测多线程程序中的死锁问题。 - 使用场景:适用于 C/C++ 程序的死锁检测。
- 介绍:Valgrind 是一个开源的内存调试和性能分析工具,支持多种平台。它可以通过
-
ThreadSanitizer:
- 介绍:ThreadSanitizer 是 Google 开发的一个工具,用于检测多线程程序中的数据竞争和死锁问题。它集成在 LLVM 和 GCC 编译器中。
- 使用场景:适用于 C/C++ 和 Go 语言的死锁检测。
-
Intel Inspector:
- 介绍:Intel Inspector 是一个性能分析工具,专门用于检测多线程程序中的死锁、数据竞争和内存泄漏问题。
- 使用场景:适用于 C/C++ 和 Fortran 程序的死锁检测。
-
GDB (GNU Debugger):
- 介绍:GDB 是一个强大的调试工具,可以通过设置断点和观察线程状态来手动检测死锁问题。
- 使用场景:适用于所有支持 GDB 的编程语言。
以下是使用 ThreadSanitizer
检测多锁顺序死锁的示例:
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mutex1;
std::mutex mutex2;
void threadA() {
mutex1.lock();
std::cout << "Thread A acquired mutex1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex2.lock();
std::cout << "Thread A acquired mutex2" << std::endl;
mutex2.unlock();
mutex1.unlock();
}
void threadB() {
mutex2.lock();
std::cout << "Thread B acquired mutex2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mutex1.lock();
std::cout << "Thread B acquired mutex1" << std::endl;
mutex1.unlock();
mutex2.unlock();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}
-
编译代码:
- 使用
clang++
或g++
编译代码,并启用ThreadSanitizer
:
clang++ -fsanitize=thread -g -o deadlock_example deadlock_example.cpp
- 使用
-
运行程序:
- 运行生成的可执行文件:
./deadlock_example
-
查看检测结果:
ThreadSanitizer
会输出死锁的详细信息,包括死锁发生的线程、锁的获取顺序等。
==================
WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock)
Cycle in lock order graph: M1 (0x7b08) -> M2 (0x7b10) -> M1
Thread 1 (Thread 0x7f9b8c0b3700 (LWP 12345)):
#0 mutex1.lock()
#1 threadA()
#2 void* std::__invoke_impl<void*, void (*)(void*)>(std::__invoke_other, void*&&, void*&&)
#3 std::__invoke_result<void*>::type std::__invoke<void*>(void*&&)
#4 void std::thread::_Invoker<std::tuple<void*>>::_M_invoke<0ul>(std::_Index_tuple<0ul>)
#5 std::thread::_Invoker<std::tuple<void*>>::operator()()
#6 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void*>>>::_M_run()
#7 execute_native_thread_routine
Thread 2 (Thread 0x7f9b8c0b3700 (LWP 12346)):
#0 mutex2.lock()
#1 threadB()
#2 void* std::__invoke_impl<void*, void (*)(void*)>(std::__invoke_other, void*&&, void*&&)
#3 std::__invoke_result<void*>::type std::__invoke<void*>(void*&&)
#4 void std::thread::_Invoker<std::tuple<void*>>::_M_invoke<0ul>(std::_Index_tuple<0ul>)
#5 std::thread::_Invoker<std::tuple<void*>>::operator()()
#6 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void*>>>::_M_run()
#7 execute_native_thread_routine
SUMMARY: ThreadSanitizer: lock-order-inversion (potential deadlock)
==================
死锁恢复是指在检测到死锁后,采取措施使系统恢复正常运行的过程。常见的死锁恢复方法包括:
-
终止线程:
- 强制终止一个或多个参与死锁的线程,释放它们持有的资源,从而打破死锁。
-
回滚事务:
- 在数据库或分布式系统中,回滚事务可以释放资源,打破死锁。
-
资源抢占:
- 强制抢占一个线程持有的资源,并将其分配给另一个线程,从而打破死锁。
以下是一个简单的死锁恢复示例,通过终止线程来打破死锁:
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <condition_variable>
std::mutex mutex1;
std::mutex mutex2;
std::condition_variable cv;
bool deadlock_detected = false;
void threadA() {
std::unique_lock<std::mutex> lock1(mutex1);
std::cout << "Thread A acquired mutex1" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock2(mutex2);
std::cout << "Thread A acquired mutex2" << std::endl;
lock2.unlock();
lock1.unlock();
}
void threadB() {
std::unique_lock<std::mutex> lock2(mutex2);
std::cout << "Thread B acquired mutex2" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock1(mutex1);
std::cout << "Thread B acquired mutex1" << std::endl;
lock1.unlock();
lock2.unlock();
}
void deadlock_monitor() {
std::this_thread::sleep_for(std::chrono::seconds(2));
deadlock_detected = true;
cv.notify_all();
}
int main() {
std::thread t1(threadA);
std::thread t2(threadB);
std::thread monitor(deadlock_monitor);
std::unique_lock<std::mutex> lock(mutex1, std::defer_lock);
cv.wait(lock, [] { return deadlock_detected; });
if (deadlock_detected) {
std::cout << "Deadlock detected! Terminating threads." << std::endl;
t1.detach();
t2.detach();
}
monitor.join();
return 0;
}
-
deadlock_monitor()
函数:- 模拟一个死锁检测器,在 2 秒后检测到死锁,并设置
deadlock_detected
标志。
- 模拟一个死锁检测器,在 2 秒后检测到死锁,并设置
-
cv.wait()
:- 主线程等待
deadlock_monitor()
通知死锁检测结果。
- 主线程等待
-
t1.detach()
和t2.detach()
:- 如果检测到死锁,主线程强制终止
t1
和t2
线程,打破死锁。
- 如果检测到死锁,主线程强制终止
在银行转账系统中,两个账户之间的转账操作需要同时锁定两个账户的资源。如果两个线程分别尝试从账户 A 转账到账户 B 和从账户 B 转账到账户 A,可能会导致死锁。
假设有两个线程:
- 线程 1:从账户 A 转账到账户 B。
- 线程 2:从账户 B 转账到账户 A。
如果线程 1 先锁定账户 A,线程 2 先锁定账户 B,然后线程 1 尝试锁定账户 B,线程 2 尝试锁定账户 A,就会形成死锁。
通过定义统一的锁获取顺序,所有线程都先锁定账户 A,再锁定账户 B,可以避免死锁。
#include <iostream>
#include <thread>
#include <mutex>
struct Account {
std::mutex mtx;
double balance;
};
void transfer(Account& from, Account& to, double amount) {
std::lock(from.mtx, to.mtx); // 一次性获取所有锁
std::lock_guard<std::mutex> lock1(from.mtx, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.mtx, std::adopt_lock);
from.balance -= amount;
to.balance += amount;
std::cout << "Transferred " << amount << " from Account A to Account B" << std::endl;
}
int main() {
Account accountA = {std::mutex(), 1000};
Account accountB = {std::mutex(), 2000};
std::thread t1([&]() { transfer(accountA, accountB, 100); });
std::thread t2([&]() { transfer(accountB, accountA, 50); });
t1.join();
t2.join();
return 0;
}
-
std::lock(from.mtx, to.mtx)
:- 一次性获取两个账户的锁,避免死锁。
-
std::lock_guard<std::mutex> lock1(from.mtx, std::adopt_lock)
:- 使用
std::lock_guard
管理锁的生命周期。
- 使用
-
转账操作:
- 从
from
账户扣除金额,增加到to
账户。
- 从
在分布式系统中,多个节点可能需要同时访问共享资源。如果节点之间的锁获取顺序不一致,可能会导致死锁。
假设有两个节点:
- 节点 1:先锁定资源 A,再锁定资源 B。
- 节点 2:先锁定资源 B,再锁定资源 A。
如果节点 1 锁定资源 A,节点 2 锁定资源 B,然后节点 1 尝试锁定资源 B,节点 2 尝试锁定资源 A,就会形成死锁。
通过定义统一的锁获取顺序,所有节点都先锁定资源 A,再锁定资源 B,可以避免死锁。
#include <iostream>
#include <thread>
#include <mutex>
std::mutex resourceA;
std::mutex resourceB;
void node1() {
std::lock(resourceA, resourceB); // 一次性获取所有锁
std::cout << "Node 1 acquired resourceA and resourceB" << std::endl;
resourceB.unlock();
resourceA.unlock();
}
void node2() {
std::lock(resourceA, resourceB); // 一次性获取所有锁
std::cout << "Node 2 acquired resourceA and resourceB" << std::endl;
resourceB.unlock();
resourceA.unlock();
}
int main() {
std::thread t1(node1);
std::thread t2(node2);
t1.join();
t2.join();
return 0;
}
-
std::lock(resourceA, resourceB)
:- 一次性获取两个资源的锁,避免死锁。
-
解锁操作:
- 在操作完成后,释放锁。
-
锁的获取顺序不一致:
- 多个线程以不同的顺序获取锁,容易导致死锁。
-
锁的粒度过大:
- 锁的粒度过大会导致不必要的资源竞争,增加死锁的风险。
-
忽略死锁检测工具:
- 忽略死锁检测工具的使用,可能导致死锁问题被忽视。
-
定义统一的锁获取顺序:
- 所有线程都按照相同的顺序获取锁,避免循环等待条件。
-
使用层次锁:
- 通过层次锁强制线程按照锁的层次顺序获取锁。
-
使用定时锁:
- 在获取锁时设置超时时间,避免线程无限等待。
-
使用无锁编程:
- 通过原子操作和内存顺序控制实现线程安全,避免锁的使用。
-
使用
std::lock
:- 一次性获取多个锁,避免循环等待条件。
-
无锁编程:
- 无锁编程通过原子操作和内存顺序控制实现线程安全,避免了锁的使用,是未来并发编程的重要方向。
-
分布式锁:
- 在分布式系统中,分布式锁可以确保多个节点之间的资源访问顺序,避免死锁问题。常见的分布式锁实现包括 ZooKeeper、Redis 等。
通过以上内容,我们详细探讨了多锁顺序导致的死锁问题及其解决方案。希望这些内容能够帮助你在实际开发中更好地理解和避免死锁问题。