Skip to content

Latest commit

 

History

History
504 lines (395 loc) · 19.4 KB

C++并发编程基础:案例实战解析.md

File metadata and controls

504 lines (395 loc) · 19.4 KB

生产者-消费者模型

代码示例

下面是一个综合的C++并发编程示例,展示了生产者和消费者模型。代码中包含了线程同步、互斥锁、条件变量、原子操作和线程局部存储(TLS)的使用。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <chrono>

// 定义缓冲区大小
const int BUFFER_SIZE = 10;

// 定义全局变量
std::queue<int> buffer; // 缓冲区
std::mutex mtx; // 互斥锁,用于保护对缓冲区的访问
std::condition_variable cv_producer, cv_consumer; // 条件变量,用于线程间的同步
std::atomic<int> item_count(0); // 原子变量,用于记录生产的总数量
thread_local int thread_local_data = 0; // 线程局部存储,每个线程有自己的副本

// 生产者函数
void producer() {
    while (true) {
        // 模拟生产过程
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        // 获取互斥锁
        std::unique_lock<std::mutex> lock(mtx);

        // 等待缓冲区有空闲空间
        cv_producer.wait(lock, [] { return buffer.size() < BUFFER_SIZE; });

        // 生产一个物品
        int item = ++item_count;
        buffer.push(item);
        std::cout << "Producer produced item: " << item << std::endl;

        // 通知消费者可以消费
        cv_consumer.notify_one();

        // 释放互斥锁
        lock.unlock();
    }
}

// 消费者函数
void consumer() {
    while (true) {
        // 模拟消费过程
        std::this_thread::sleep_for(std::chrono::milliseconds(700));

        // 获取互斥锁
        std::unique_lock<std::mutex> lock(mtx);

        // 等待缓冲区有物品
        cv_consumer.wait(lock, [] { return !buffer.empty(); });

        // 消费一个物品
        int item = buffer.front();
        buffer.pop();
        std::cout << "Consumer consumed item: " << item << std::endl;

        // 通知生产者可以生产
        cv_producer.notify_one();

        // 释放互斥锁
        lock.unlock();

        // 使用线程局部存储
        thread_local_data++;
        std::cout << "Thread local data for thread " << std::this_thread::get_id() << ": " << thread_local_data << std::endl;
    }
}

int main() {
    // 创建生产者和消费者线程
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    // 等待线程结束(实际上不会结束,因为生产者和消费者是无限循环)
    producer_thread.join();
    consumer_thread.join();

    return 0;
}

代码解释

  1. 缓冲区 (buffer):
    • 使用std::queue<int>作为缓冲区,生产者将物品放入缓冲区,消费者从缓冲区取出物品。
  2. 互斥锁 (mtx):
    • std::mutex用于保护对缓冲区的访问,确保同一时间只有一个线程可以访问缓冲区。
  3. 条件变量 (cv_producer, cv_consumer):
    • std::condition_variable用于线程间的同步。
    • cv_producer用于通知生产者缓冲区有空闲空间。
    • cv_consumer用于通知消费者缓冲区有物品可以消费。
  4. 原子变量 (item_count):
    • std::atomic<int>用于记录生产的总数量,确保线程安全。
  5. 线程局部存储 (thread_local_data):
    • thread_local关键字用于声明线程局部存储变量,每个线程都有自己的副本。
  6. 生产者函数 (producer):
    • 生产者线程不断生产物品,并将其放入缓冲区。
    • 使用std::unique_lock<std::mutex>获取互斥锁,确保对缓冲区的访问是线程安全的。
    • 使用cv_producer.wait等待缓冲区有空闲空间。
    • 生产物品后,使用cv_consumer.notify_one通知消费者可以消费。
  7. 消费者函数 (consumer):
    • 消费者线程不断从缓冲区取出物品进行消费。
    • 使用std::unique_lock<std::mutex>获取互斥锁,确保对缓冲区的访问是线程安全的。
    • 使用cv_consumer.wait等待缓冲区有物品。
    • 消费物品后,使用cv_producer.notify_one通知生产者可以生产。
    • 使用线程局部存储变量thread_local_data记录每个线程的消费次数。
  8. 主函数 (main):
    • 创建生产者和消费者线程,并使用join等待线程结束。

运行结果

运行程序后,你会看到生产者和消费者交替工作,生产者生产物品并放入缓冲区,消费者从缓冲区取出物品进行消费。每个线程的消费次数会记录在线程局部存储变量thread_local_data中。

总结

这个示例展示了C++并发编程中的多个重要概念,包括线程同步、互斥锁、条件变量、原子操作和线程局部存储。通过这个示例,你可以更好地理解如何在多线程环境中安全地共享数据和进行线程间的通信。

基于线程池的生产者-消费者模型

代码示例

下面是一个更复杂的并发编程示例,结合了生产者-消费者模型、线程池、线程同步、互斥锁、条件变量、原子操作和线程局部存储。这个示例模拟了一个任务调度系统,其中多个生产者生成任务,多个消费者(线程池)处理任务。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <atomic>
#include <chrono>
#include <functional>
#include <random>

// 定义任务队列的最大容量
const int TASK_QUEUE_SIZE = 10;

// 定义线程池的大小
const int THREAD_POOL_SIZE = 4;

// 定义全局变量
std::queue<std::function<void()>> task_queue; // 任务队列
std::mutex mtx; // 互斥锁,用于保护对任务队列的访问
std::condition_variable cv_producer, cv_consumer; // 条件变量,用于线程间的同步
std::atomic<bool> stop_flag(false); // 原子变量,用于控制线程池的停止
std::atomic<int> task_count(0); // 原子变量,用于记录任务的总数量
thread_local int thread_local_data = 0; // 线程局部存储,每个线程有自己的副本

// 生产者函数
void producer() {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dist(100, 500);

    while (!stop_flag) {
        // 模拟生产任务的过程
        std::this_thread::sleep_for(std::chrono::milliseconds(dist(gen)));

        // 获取互斥锁
        std::unique_lock<std::mutex> lock(mtx);

        // 等待任务队列有空闲空间
        cv_producer.wait(lock, [] { return task_queue.size() < TASK_QUEUE_SIZE; });

        // 生成一个任务
        auto task = [task_id = ++task_count]() {
            std::cout << "Task " << task_id << " is being processed by thread " << std::this_thread::get_id() << std::endl;
            // 模拟任务处理过程
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
            std::cout << "Task " << task_id << " is completed by thread " << std::this_thread::get_id() << std::endl;
        };

        // 将任务放入任务队列
        task_queue.push(task);
        std::cout << "Producer produced task: " << task_count << std::endl;

        // 通知消费者可以消费
        cv_consumer.notify_one();

        // 释放互斥锁
        lock.unlock();
    }
}

// 消费者函数(线程池中的工作线程)
void consumer() {
    while (!stop_flag) {
        // 获取互斥锁
        std::unique_lock<std::mutex> lock(mtx);

        // 等待任务队列有任务
        cv_consumer.wait(lock, [] { return !task_queue.empty() || stop_flag; });

        // 如果任务队列为空且停止标志被设置,退出循环
        if (task_queue.empty() && stop_flag) {
            lock.unlock();
            break;
        }

        // 从任务队列中取出任务
        auto task = task_queue.front();
        task_queue.pop();

        // 通知生产者可以生产
        cv_producer.notify_one();

        // 释放互斥锁
        lock.unlock();

        // 执行任务
        task();

        // 使用线程局部存储
        thread_local_data++;
        std::cout << "Thread " << std::this_thread::get_id() << " has processed " << thread_local_data << " tasks." << std::endl;
    }
}

int main() {
    // 创建生产者线程
    std::thread producer_thread(producer);

    // 创建消费者线程池
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < THREAD_POOL_SIZE; ++i) {
        thread_pool.emplace_back(consumer);
    }

    // 模拟运行一段时间后停止
    std::this_thread::sleep_for(std::chrono::seconds(10));

    // 设置停止标志
    stop_flag = true;

    // 通知所有消费者线程
    cv_consumer.notify_all();

    // 等待生产者线程结束
    producer_thread.join();

    // 等待消费者线程结束
    for (auto& thread : thread_pool) {
        thread.join();
    }

    std::cout << "All tasks have been processed. Exiting." << std::endl;

    return 0;
}

代码解释

  1. 任务队列 (task_queue):
    • 使用std::queue<std::function<void()>>作为任务队列,生产者将任务放入队列,消费者从队列取出任务并执行。
  2. 互斥锁 (mtx):
    • std::mutex用于保护对任务队列的访问,确保同一时间只有一个线程可以访问任务队列。
  3. 条件变量 (cv_producer, cv_consumer):
    • std::condition_variable用于线程间的同步。
    • cv_producer用于通知生产者任务队列有空闲空间。
    • cv_consumer用于通知消费者任务队列有任务可以处理。
  4. 原子变量 (stop_flag, task_count):
    • std::atomic<bool>用于控制线程池的停止标志。
    • std::atomic<int>用于记录任务的总数量。
  5. 线程局部存储 (thread_local_data):
    • thread_local关键字用于声明线程局部存储变量,每个线程都有自己的副本,用于记录每个线程处理的任务数量。
  6. 生产者函数 (producer):
    • 生产者线程不断生成任务,并将其放入任务队列。
    • 使用std::unique_lock<std::mutex>获取互斥锁,确保对任务队列的访问是线程安全的。
    • 使用cv_producer.wait等待任务队列有空闲空间。
    • 生产任务后,使用cv_consumer.notify_one通知消费者可以处理任务。
  7. 消费者函数 (consumer):
    • 消费者线程(线程池中的工作线程)不断从任务队列取出任务并执行。
    • 使用std::unique_lock<std::mutex>获取互斥锁,确保对任务队列的访问是线程安全的。
    • 使用cv_consumer.wait等待任务队列有任务。
    • 处理任务后,使用cv_producer.notify_one通知生产者可以生产新任务。
    • 使用线程局部存储变量thread_local_data记录每个线程处理的任务数量。
  8. 主函数 (main):
    • 创建一个生产者线程和多个消费者线程(线程池)。
    • 模拟运行一段时间后,设置停止标志并通知所有消费者线程。
    • 等待生产者和消费者线程结束。

运行结果

运行程序后,你会看到生产者不断生成任务,消费者(线程池)从任务队列中取出任务并处理。每个线程处理的任务数量会记录在线程局部存储变量thread_local_data中。程序运行一段时间后,停止标志被设置,所有线程退出。


总结

这个示例展示了更复杂的并发编程场景,包括生产者-消费者模型、线程池、线程同步、互斥锁、条件变量、原子操作和线程局部存储。通过这个示例,你可以更好地理解如何在多线程环境中实现任务调度、线程池管理和线程间的通信。

工作窃取模型

代码示例

除了生产者-消费者模型,另一个常见的复杂并发任务模型是 工作窃取(Work Stealing)模型。工作窃取模型通常用于任务调度系统,其中多个线程(工作线程)从任务队列中获取任务并执行。如果某个线程的任务队列为空,它可以“窃取”其他线程的任务队列中的任务。

下面是一个结合了工作窃取模型的并发编程示例,包含线程同步、互斥锁、条件变量、原子操作和线程局部存储。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <atomic>
#include <chrono>
#include <random>
#include <deque>
#include <functional>

// 定义线程池的大小
const int THREAD_POOL_SIZE = 4;

// 定义全局变量
std::vector<std::deque<std::function<void()>>> task_queues(THREAD_POOL_SIZE); // 每个线程的任务队列
std::vector<std::mutex> mtx_queues(THREAD_POOL_SIZE); // 每个任务队列的互斥锁
std::vector<std::condition_variable> cv_queues(THREAD_POOL_SIZE); // 每个任务队列的条件变量
std::atomic<bool> stop_flag(false); // 原子变量,用于控制线程池的停止
std::atomic<int> task_count(0); // 原子变量,用于记录任务的总数量
thread_local int thread_local_data = 0; // 线程局部存储,每个线程有自己的副本

// 工作线程函数
void worker(int thread_id) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dist(0, THREAD_POOL_SIZE - 1);

    while (!stop_flag) {
        bool task_found = false;

        // 尝试从自己的任务队列中获取任务
        {
            std::unique_lock<std::mutex> lock(mtx_queues[thread_id]);
            cv_queues[thread_id].wait(lock, [&] { return !task_queues[thread_id].empty() || stop_flag; });

            if (stop_flag) {
                lock.unlock();
                break;
            }

            if (!task_queues[thread_id].empty()) {
                auto task = task_queues[thread_id].front();
                task_queues[thread_id].pop_front();
                lock.unlock();

                // 执行任务
                task();
                task_found = true;

                // 使用线程局部存储
                thread_local_data++;
                std::cout << "Thread " << std::this_thread::get_id() << " (ID: " << thread_id << ") has processed " << thread_local_data << " tasks." << std::endl;
            }
        }

        // 如果自己的任务队列为空,尝试窃取其他线程的任务
        if (!task_found) {
            int steal_target = dist(gen); // 随机选择一个线程窃取任务
            if (steal_target != thread_id) {
                std::unique_lock<std::mutex> lock(mtx_queues[steal_target]);
                if (!task_queues[steal_target].empty()) {
                    auto task = task_queues[steal_target].back();
                    task_queues[steal_target].pop_back();
                    lock.unlock();

                    // 执行任务
                    task();
                    task_found = true;

                    // 使用线程局部存储
                    thread_local_data++;
                    std::cout << "Thread " << std::this_thread::get_id() << " (ID: " << thread_id << ") stole a task from thread " << steal_target << " and processed " << thread_local_data << " tasks." << std::endl;
                }
            }
        }

        // 如果所有任务队列都为空,短暂等待
        if (!task_found) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
}

// 任务生成函数
void generate_tasks() {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dist(0, THREAD_POOL_SIZE - 1);

    while (!stop_flag) {
        // 模拟任务生成过程
        std::this_thread::sleep_for(std::chrono::milliseconds(200));

        // 生成一个任务
        auto task = [task_id = ++task_count]() {
            std::cout << "Task " << task_id << " is being processed by thread " << std::this_thread::get_id() << std::endl;
            // 模拟任务处理过程
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
            std::cout << "Task " << task_id << " is completed by thread " << std::this_thread::get_id() << std::endl;
        };

        // 随机选择一个线程的任务队列
        int target_thread = dist(gen);
        std::unique_lock<std::mutex> lock(mtx_queues[target_thread]);
        task_queues[target_thread].push_back(task);
        cv_queues[target_thread].notify_one(); // 通知目标线程有新任务
        lock.unlock();

        std::cout << "Task " << task_count << " is added to thread " << target_thread << "'s queue." << std::endl;
    }
}

int main() {
    // 创建线程池
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < THREAD_POOL_SIZE; ++i) {
        thread_pool.emplace_back(worker, i);
    }

    // 创建任务生成线程
    std::thread task_generator(generate_tasks);

    // 模拟运行一段时间后停止
    std::this_thread::sleep_for(std::chrono::seconds(10));

    // 设置停止标志
    stop_flag = true;

    // 通知所有线程停止
    for (auto& cv : cv_queues) {
        cv.notify_all();
    }

    // 等待任务生成线程结束
    task_generator.join();

    // 等待线程池中的所有线程结束
    for (auto& thread : thread_pool) {
        thread.join();
    }

    std::cout << "All tasks have been processed. Exiting." << std::endl;

    return 0;
}

代码解释

  1. 任务队列 (task_queues):
    • 使用std::vector<std::deque<std::function<void()>>>表示每个线程的任务队列。每个线程都有自己的任务队列。
  2. 互斥锁 (mtx_queues):
    • 使用std::vector<std::mutex>为每个任务队列提供互斥锁,确保对任务队列的访问是线程安全的。
  3. 条件变量 (cv_queues):
    • 为每个任务队列添加一个条件变量,用于通知线程有新任务。
    • 当任务生成线程向某个任务队列添加任务时,使用cv_queues[target_thread].notify_one()通知对应的线程。
  4. 原子变量 (stop_flag, task_count):
    • std::atomic<bool>用于控制线程池的停止标志。
    • std::atomic<int>用于记录任务的总数量。
  5. 线程局部存储 (thread_local_data):
    • thread_local关键字用于声明线程局部存储变量,每个线程都有自己的副本,用于记录每个线程处理的任务数量。
  6. 工作线程函数 (worker):
    • 每个工作线程首先尝试从自己的任务队列中获取任务。
    • 如果自己的任务队列为空,尝试随机窃取其他线程的任务队列中的任务。
    • 使用互斥锁保护对任务队列的访问。
    • 使用线程局部存储变量thread_local_data记录每个线程处理的任务数量。
  7. 任务生成函数 (generate_tasks):
    • 任务生成线程不断生成任务,并随机选择一个线程的任务队列添加任务。
    • 使用互斥锁保护对任务队列的访问。
  8. 主函数 (main):
    • 创建线程池和工作线程。
    • 创建任务生成线程。
    • 模拟运行一段时间后,设置停止标志并等待所有线程结束。

运行结果

运行程序后,你会看到任务生成线程不断生成任务,并随机分配到某个线程的任务队列中。工作线程从自己的任务队列中获取任务并执行。如果某个线程的任务队列为空,它会尝试窃取其他线程的任务队列中的任务。每个线程处理的任务数量会记录在线程局部存储变量thread_local_data中。


总结

这个示例展示了工作窃取模型的并发编程场景,结合了线程同步、互斥锁、条件变量、原子操作和线程局部存储。通过这个示例,你可以更好地理解如何在多线程环境中实现任务调度、负载均衡和线程间的通信。