Skip to content

Latest commit

 

History

History
592 lines (470 loc) · 19.8 KB

C++并发编程基础:任务并行.md

File metadata and controls

592 lines (470 loc) · 19.8 KB

C++并发编程中的任务并行

1. 引言

1.1 什么是任务并行?

任务并行(Task Parallelism)是指将一个程序分解为多个独立的任务,这些任务可以并行执行。每个任务通常是独立的,不依赖于其他任务的结果。任务并行的目标是利用多核处理器的优势,通过并行执行多个任务来提高程序的执行效率。

1.2 为什么需要任务并行?

随着多核处理器的普及,单线程程序的性能提升空间有限。通过任务并行,我们可以充分利用多核处理器的计算能力,从而提高程序的执行效率。任务并行还可以减少程序的响应时间,特别是在处理大量数据或复杂计算时。

1.3 任务并行的应用场景

任务并行广泛应用于以下场景:

  • 图像处理:如图像滤波、边缘检测等。
  • 科学计算:如矩阵运算、数值模拟等。
  • 数据处理:如数据清洗、数据分析等。
  • 网络服务器:如并发处理多个客户端请求。

2. std::async 与异步任务

2.1 什么是 std::async?

std::async 是 C++11 引入的一个标准库函数,用于异步执行任务。它允许你将一个函数或可调用对象封装为一个任务,并在后台线程中执行。std::async 返回一个 std::future 对象,通过这个对象可以获取任务的执行结果。

2.2 为什么使用 std::async?

std::async 提供了一种简单的方式来实现任务并行。它隐藏了线程管理的复杂性,开发者只需关注任务的执行逻辑。std::async 还支持延迟执行和立即执行两种模式,可以根据需要选择合适的执行策略。

2.3 如何使用 std::async?

代码示例

#include <thread>
#include <iostream>
#include <future>
#include <chrono>

// 定义一个简单的任务函数
int taskFunction(int x) {
    std::cout << "Task is running in thread: " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
    return x * 2;
}

int main() {
    // 使用 std::async 异步执行任务
    std::future<int> futureResult = std::async(std::launch::async, taskFunction, 10);

    std::cout << "Main thread is running in thread: " << std::this_thread::get_id() << std::endl;

    // 获取任务的执行结果
    int result = futureResult.get();

    std::cout << "Task result: " << result << std::endl;

    return 0;
}

代码逐行解析

  1. 包含必要的头文件

    #include <thread>
    #include <iostream>
    #include <future>
    #include <chrono>
    • iostream:用于标准输入输出。
    • future:包含 std::asyncstd::future
    • chrono:用于线程睡眠操作。
  2. 定义任务函数

    int taskFunction(int x) {
        std::cout << "Task is running in thread: " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
        return x * 2;
    }
    • taskFunction 是一个简单的函数,接受一个整数参数 x,并返回 x * 2
    • std::this_thread::get_id() 用于获取当前线程的 ID。
    • std::this_thread::sleep_for(std::chrono::seconds(2)) 用于模拟一个耗时操作,线程会睡眠 2 秒。
  3. 使用 std::async 异步执行任务

    std::future<int> futureResult = std::async(std::launch::async, taskFunction, 10);
    • std::async(std::launch::async, taskFunction, 10)std::async 的第一个参数 std::launch::async 表示立即启动一个新线程来执行任务。
    • taskFunction 是要执行的任务函数,10 是传递给 taskFunction 的参数。
    • std::future<int> 是一个模板类,用于存储任务的返回值。
  4. 主线程继续执行

    std::cout << "Main thread is running in thread: " << std::this_thread::get_id() << std::endl;
    • 主线程继续执行,输出当前线程的 ID。
  5. 获取任务的执行结果

    int result = futureResult.get();
    • futureResult.get() 会阻塞当前线程,直到任务执行完毕并返回结果。
    • result 存储了任务的返回值。
  6. 输出任务结果

    std::cout << "Task result: " << result << std::endl;
    • 输出任务的执行结果。

2.4 std::async 的优缺点

优点

  • 简单易用std::async 隐藏了线程管理的复杂性,开发者只需关注任务的执行逻辑。
  • 自动线程管理std::async 会自动管理线程的创建和销毁,减少了手动管理线程的负担。
  • 支持延迟执行std::async 可以选择延迟执行任务,适合需要按需执行的场景。

缺点

  • 无法精确控制线程池std::async 不提供线程池的配置选项,无法精确控制线程的数量和行为。
  • 可能引发线程爆炸:如果频繁调用 std::async,可能会创建大量线程,导致系统资源耗尽。
  • 无法取消任务std::async 不支持任务的取消操作,任务一旦启动就无法中途停止。

3. std::future 与异步结果获取

3.1 什么是 std::future?

std::future 是 C++11 引入的一个模板类,用于表示一个异步操作的结果。它提供了一种机制,允许你在一个线程中启动一个任务,并在另一个线程中获取该任务的结果。std::future 通常与 std::asyncstd::promisestd::packaged_task 一起使用。

3.2 为什么使用 std::future?

std::future 的主要作用是简化异步编程。通过 std::future,你可以轻松地获取异步任务的结果,而不需要手动管理线程和同步机制。它提供了一种干净的方式来处理异步操作的结果,避免了复杂的回调机制。

3.3 如何使用 std::future?

3.3.1 代码示例

下面是一个更复杂的例子,展示了如何使用 std::futurestd::async 来实现一个并行计算任务,并处理可能的异常。

#include <iostream>
#include <future>
#include <vector>
#include <stdexcept>
#include <chrono>
#include <thread>
// 模拟一个耗时的计算任务
int computeTask(int id, int delay) {
    std::cout << "Task " << id << " is running in thread: " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(delay)); // 模拟耗时操作

    // 模拟任务可能抛出异常
    if (id == 3) {
        throw std::runtime_error("Task 3 encountered an error!");
    }

    return id * 10; // 返回任务的计算结果
}

int main() {
    // 创建多个异步任务
    std::vector<std::future<int>> futures;
    for (int i = 1; i <= 5; ++i) {
        futures.push_back(std::async(std::launch::async, computeTask, i, i));
    }

    // 获取并处理每个任务的结果
    for (int i = 0; i < futures.size(); ++i) {
        try {
            int result = futures[i].get(); // 获取任务的结果
            std::cout << "Task " << (i + 1) << " result: " << result << std::endl;
        } catch (const std::exception& e) {
            std::cerr << "Exception from task " << (i + 1) << ": " << e.what() << std::endl;
        }
    }

    return 0;
}

3.3.2 代码逐行解析

  1. 包含必要的头文件

    #include <iostream>
    #include <future>
    #include <vector>
    #include <stdexcept>
    #include <chrono>
    #include <thread>
    • iostream:用于标准输入输出。
    • future:包含 std::futurestd::async
    • vector:用于存储多个 std::future 对象。
    • stdexcept:用于处理异常。
    • chrono:用于线程睡眠操作。
  2. 定义任务函数

    int computeTask(int id, int delay) {
        std::cout << "Task " << id << " is running in thread: " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(delay)); // 模拟耗时操作
    
        // 模拟任务可能抛出异常
        if (id == 3) {
            throw std::runtime_error("Task 3 encountered an error!");
        }
    
        return id * 10; // 返回任务的计算结果
    }
    • computeTask 是一个模拟的计算任务,接受两个参数:id 表示任务的标识符,delay 表示任务的延迟时间。
    • std::this_thread::sleep_for(std::chrono::seconds(delay)):模拟任务的耗时操作。
    • 如果 id 为 3,任务会抛出一个 std::runtime_error 异常。
    • 任务的返回值是 id * 10
  3. 创建多个异步任务

    std::vector<std::future<int>> futures;
    for (int i = 1; i <= 5; ++i) {
        futures.push_back(std::async(std::launch::async, computeTask, i, i));
    }
    • 创建一个 std::vector<std::future<int>> 来存储多个异步任务的结果。
    • 使用 std::async 启动 5 个异步任务,每个任务的 iddelay 相同。
  4. 获取并处理每个任务的结果

    for (int i = 0; i < futures.size(); ++i) {
        try {
            int result = futures[i].get(); // 获取任务的结果
            std::cout << "Task " << (i + 1) << " result: " << result << std::endl;
        } catch (const std::exception& e) {
            std::cerr << "Exception from task " << (i + 1) << ": " << e.what() << std::endl;
        }
    }
    • 遍历 futures 向量,使用 get() 方法获取每个任务的结果。
    • get() 方法会阻塞当前线程,直到任务完成并返回结果。
    • 如果任务抛出异常,get() 会重新抛出该异常,因此使用 try-catch 块来捕获并处理异常。
  5. 输出结果

    • 如果任务成功完成,输出任务的计算结果。
    • 如果任务抛出异常,输出异常信息。

3.4 std::future 的优缺点

优点

  • 简化异步编程std::future 提供了一种简单的方式来获取异步任务的结果,避免了复杂的回调机制。
  • 线程安全std::futureget() 方法是线程安全的,可以安全地在多个线程中调用。
  • 支持异常处理std::future 可以捕获异步任务抛出的异常,并在调用 get() 时重新抛出。

缺点

  • 阻塞调用get() 方法会阻塞调用线程,直到任务完成。如果任务耗时较长,可能会导致性能问题。
  • 单次使用std::future 只能调用一次 get() 方法,之后就无法再次获取结果。
  • 无法取消任务std::future 不支持任务的取消操作,任务一旦启动就无法中途停止。

总结

std::future 是 C++ 中实现异步编程的重要工具。通过 std::future,你可以轻松地获取异步任务的结果,并处理可能的异常。然而,std::future 也有其局限性,特别是在需要频繁获取结果或取消任务的场景下。在实际应用中,开发者需要根据具体需求选择合适的并发编程工具。

4. 任务的组合与依赖

4.1 什么是任务的组合与依赖?

任务的组合与依赖是指将多个任务按照一定的顺序或依赖关系组合在一起,形成一个任务链。在前一个任务完成之后,后一个任务才能开始执行。这种机制可以确保任务之间的数据一致性和执行顺序。

4.2 为什么需要任务的组合与依赖?

在实际应用中,任务之间往往存在依赖关系。例如,一个任务的输出可能是另一个任务的输入。通过任务的组合与依赖,可以确保任务按照正确的顺序执行,避免数据竞争和逻辑错误。

4.3 如何实现任务的组合与依赖?

4.3.1 代码示例

#include <iostream>
#include <future>
#include <vector>
#include <thread>
// 任务1:生成一组数据
std::vector<int> generateData(int size) {
    std::vector<int> data(size);
    for (int i = 0; i < size; ++i) {
        data[i] = i;
    }
    return data;
}

// 任务2:处理数据
int processData(const std::vector<int>& data) {
    int sum = 0;
    for (int value : data) {
        sum += value;
    }
    return sum;
}

int main() {
    // 异步执行任务1
    std::future<std::vector<int>> dataFuture = std::async(std::launch::async, generateData, 10);

    // 等待任务1完成并获取结果
    std::vector<int> data = dataFuture.get();

    // 异步执行任务2
    std::future<int> sumFuture = std::async(std::launch::async, processData, std::ref(data));

    // 等待任务2完成并获取结果
    int sum = sumFuture.get();

    std::cout << "Sum of data: " << sum << std::endl;

    return 0;
}

4.3.2 代码逐行解析

  1. 定义任务函数

    std::vector<int> generateData(int size) {
        std::vector<int> data(size);
        for (int i = 0; i < size; ++i) {
            data[i] = i;
        }
        return data;
    }
    • generateData 函数生成一个包含 size 个整数的向量。
    int processData(const std::vector<int>& data) {
        int sum = 0;
        for (int value : data) {
            sum += value;
        }
        return sum;
    }
    • processData 函数计算向量中所有元素的和。
  2. 异步执行任务1

    std::future<std::vector<int>> dataFuture = std::async(std::launch::async, generateData, 10);
    • std::async 启动一个新线程来执行 generateData 任务,并返回一个 std::future 对象。
  3. 等待任务1完成并获取结果

    std::vector<int> data = dataFuture.get();
    • dataFuture.get() 阻塞当前线程,直到 generateData 任务完成并返回结果。
  4. 异步执行任务2

    std::future<int> sumFuture = std::async(std::launch::async, processData, std::ref(data));
    • std::async 启动一个新线程来执行 processData 任务,并传递 data 作为参数。
  5. 等待任务2完成并获取结果

    int sum = sumFuture.get();
    • sumFuture.get() 阻塞当前线程,直到 processData 任务完成并返回结果。
  6. 输出结果

    std::cout << "Sum of data: " << sum << std::endl;
    • 输出任务2的结果。

4.4 任务组合与依赖的优缺点

优点

  • 确保任务顺序:通过任务的组合与依赖,可以确保任务按照正确的顺序执行。
  • 简化复杂逻辑:将复杂的任务分解为多个小任务,并通过依赖关系组合在一起,简化了代码逻辑。
  • 提高并行度:在任务之间没有依赖关系的情况下,可以并行执行多个任务,提高程序的执行效率。

缺点

  • 复杂性增加:任务的组合与依赖可能会增加代码的复杂性,特别是在任务数量较多时。
  • 性能开销:频繁的任务切换和同步操作可能会带来一定的性能开销。

5. 实际应用场景

5.1 并行计算

5.1.1 代码示例

#include <iostream>
#include <vector>
#include <future>
#include <numeric>

// 并行计算向量的和
int parallelSum(const std::vector<int>& data, int start, int end) {
    if (end - start <= 1000) {
        return std::accumulate(data.begin() + start, data.begin() + end, 0);
    }
    int mid = (start + end) / 2;
    std::future<int> leftSum = std::async(std::launch::async, parallelSum, std::ref(data), start, mid);
    int rightSum = parallelSum(data, mid, end);
    return leftSum.get() + rightSum;
}

int main() {
    std::vector<int> data(10000, 1); // 10000个1
    int sum = parallelSum(data, 0, data.size());
    std::cout << "Sum of data: " << sum << std::endl;
    return 0;
}

5.1.2 代码逐行解析

  1. 定义并行计算函数

    int parallelSum(const std::vector<int>& data, int start, int end) {
        if (end - start <= 1000) {
            return std::accumulate(data.begin() + start, data.begin() + end, 0);
        }
        int mid = (start + end) / 2;
        std::future<int> leftSum = std::async(std::launch::async, parallelSum, std::ref(data), start, mid);
        int rightSum = parallelSum(data, mid, end);
        return leftSum.get() + rightSum;
    }
    • parallelSum 函数递归地将向量分成两部分,并行计算每一部分的和。
    • 如果子向量的长度小于等于 1000,则直接计算其和。
  2. 生成数据

    std::vector<int> data(10000, 1); // 10000个1
    • 生成一个包含 10000 个 1 的向量。
  3. 并行计算向量的和

    int sum = parallelSum(data, 0, data.size());
    • 调用 parallelSum 函数计算向量的和。
  4. 输出结果

    std::cout << "Sum of data: " << sum << std::endl;
    • 输出计算结果。

5.2 异步 I/O

5.2.1 代码示例

#include <iostream>
#include <fstream>
#include <future>
#include <string>

// 异步读取文件内容
std::string readFileAsync(const std::string& filename) {
    std::ifstream file(filename);
    std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
    return content;
}

int main() {
    std::future<std::string> contentFuture = std::async(std::launch::async, readFileAsync, "example.txt");

    std::cout << "Reading file asynchronously..." << std::endl;

    std::string content = contentFuture.get();

    std::cout << "File content: " << content << std::endl;

    return 0;
}

5.2.2 代码逐行解析

  1. 定义异步读取文件函数

    std::string readFileAsync(const std::string& filename) {
        std::ifstream file(filename);
        std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
        return content;
    }
    • readFileAsync 函数异步读取文件内容,并返回文件内容字符串。
  2. 异步读取文件

    std::future<std::string> contentFuture = std::async(std::launch::async, readFileAsync, "example.txt");
    • std::async 启动一个新线程来执行 readFileAsync 任务。
  3. 等待文件读取完成并获取结果

    std::string content = contentFuture.get();
    • contentFuture.get() 阻塞当前线程,直到文件读取完成并返回结果。
  4. 输出文件内容

    std::cout << "File content: " << content << std::endl;
    • 输出文件内容。

5.3 并发数据处理

5.3.1 代码示例

#include <iostream>
#include <vector>
#include <future>
#include <algorithm>

// 并发处理数据
void processDataConcurrently(std::vector<int>& data) {
    std::vector<std::future<void>> futures;
    for (int& value : data) {
        futures.push_back(std::async(std::launch::async, [&value]() {
            value *= 2; // 每个元素乘以2
        }));
    }
    for (auto& future : futures) {
        future.get();
    }
}

int main() {
    std::vector<int> data = {1, 2, 3, 4, 5};
    processDataConcurrently(data);
    std::cout << "Processed data: ";
    for (int value : data) {
        std::cout << value << " ";
    }
    std::cout << std::endl;
    return 0;
}

5.3.2 代码逐行解析

  1. 定义并发处理函数

    void processDataConcurrently(std::vector<int>& data) {
        std::vector<std::future<void>> futures;
        for (int& value : data) {
            futures.push_back(std::async(std::launch::async, [&value]() {
                value *= 2; // 每个元素乘以2
            }));
        }
        for (auto& future : futures) {
            future.get();
        }
    }
    • processDataConcurrently 函数并发处理向量中的每个元素,将每个元素乘以 2。
  2. 生成数据

    std::vector<int> data = {1, 2, 3, 4, 5};
    • 生成一个包含 5 个整数的向量。
  3. 并发处理数据

    processDataConcurrently(data);
    • 调用 processDataConcurrently 函数并发处理数据。
  4. 输出处理后的数据

    std::cout << "Processed data: ";
    for (int value : data) {
        std::cout << value << " ";
    }
    std::cout << std::endl;
    • 输出处理后的数据。