任务并行(Task Parallelism)是指将一个程序分解为多个独立的任务,这些任务可以并行执行。每个任务通常是独立的,不依赖于其他任务的结果。任务并行的目标是利用多核处理器的优势,通过并行执行多个任务来提高程序的执行效率。
随着多核处理器的普及,单线程程序的性能提升空间有限。通过任务并行,我们可以充分利用多核处理器的计算能力,从而提高程序的执行效率。任务并行还可以减少程序的响应时间,特别是在处理大量数据或复杂计算时。
任务并行广泛应用于以下场景:
- 图像处理:如图像滤波、边缘检测等。
- 科学计算:如矩阵运算、数值模拟等。
- 数据处理:如数据清洗、数据分析等。
- 网络服务器:如并发处理多个客户端请求。
std::async
是 C++11 引入的一个标准库函数,用于异步执行任务。它允许你将一个函数或可调用对象封装为一个任务,并在后台线程中执行。std::async
返回一个 std::future
对象,通过这个对象可以获取任务的执行结果。
std::async
提供了一种简单的方式来实现任务并行。它隐藏了线程管理的复杂性,开发者只需关注任务的执行逻辑。std::async
还支持延迟执行和立即执行两种模式,可以根据需要选择合适的执行策略。
#include <thread>
#include <iostream>
#include <future>
#include <chrono>
// 定义一个简单的任务函数
int taskFunction(int x) {
std::cout << "Task is running in thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
return x * 2;
}
int main() {
// 使用 std::async 异步执行任务
std::future<int> futureResult = std::async(std::launch::async, taskFunction, 10);
std::cout << "Main thread is running in thread: " << std::this_thread::get_id() << std::endl;
// 获取任务的执行结果
int result = futureResult.get();
std::cout << "Task result: " << result << std::endl;
return 0;
}
-
包含必要的头文件:
#include <thread> #include <iostream> #include <future> #include <chrono>
iostream
:用于标准输入输出。future
:包含std::async
和std::future
。chrono
:用于线程睡眠操作。
-
定义任务函数:
int taskFunction(int x) { std::cout << "Task is running in thread: " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作 return x * 2; }
taskFunction
是一个简单的函数,接受一个整数参数x
,并返回x * 2
。std::this_thread::get_id()
用于获取当前线程的 ID。std::this_thread::sleep_for(std::chrono::seconds(2))
用于模拟一个耗时操作,线程会睡眠 2 秒。
-
使用 std::async 异步执行任务:
std::future<int> futureResult = std::async(std::launch::async, taskFunction, 10);
std::async(std::launch::async, taskFunction, 10)
:std::async
的第一个参数std::launch::async
表示立即启动一个新线程来执行任务。taskFunction
是要执行的任务函数,10
是传递给taskFunction
的参数。std::future<int>
是一个模板类,用于存储任务的返回值。
-
主线程继续执行:
std::cout << "Main thread is running in thread: " << std::this_thread::get_id() << std::endl;
- 主线程继续执行,输出当前线程的 ID。
-
获取任务的执行结果:
int result = futureResult.get();
futureResult.get()
会阻塞当前线程,直到任务执行完毕并返回结果。result
存储了任务的返回值。
-
输出任务结果:
std::cout << "Task result: " << result << std::endl;
- 输出任务的执行结果。
- 简单易用:
std::async
隐藏了线程管理的复杂性,开发者只需关注任务的执行逻辑。 - 自动线程管理:
std::async
会自动管理线程的创建和销毁,减少了手动管理线程的负担。 - 支持延迟执行:
std::async
可以选择延迟执行任务,适合需要按需执行的场景。
- 无法精确控制线程池:
std::async
不提供线程池的配置选项,无法精确控制线程的数量和行为。 - 可能引发线程爆炸:如果频繁调用
std::async
,可能会创建大量线程,导致系统资源耗尽。 - 无法取消任务:
std::async
不支持任务的取消操作,任务一旦启动就无法中途停止。
std::future
是 C++11 引入的一个模板类,用于表示一个异步操作的结果。它提供了一种机制,允许你在一个线程中启动一个任务,并在另一个线程中获取该任务的结果。std::future
通常与 std::async
、std::promise
或 std::packaged_task
一起使用。
std::future
的主要作用是简化异步编程。通过 std::future
,你可以轻松地获取异步任务的结果,而不需要手动管理线程和同步机制。它提供了一种干净的方式来处理异步操作的结果,避免了复杂的回调机制。
下面是一个更复杂的例子,展示了如何使用 std::future
和 std::async
来实现一个并行计算任务,并处理可能的异常。
#include <iostream>
#include <future>
#include <vector>
#include <stdexcept>
#include <chrono>
#include <thread>
// 模拟一个耗时的计算任务
int computeTask(int id, int delay) {
std::cout << "Task " << id << " is running in thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(delay)); // 模拟耗时操作
// 模拟任务可能抛出异常
if (id == 3) {
throw std::runtime_error("Task 3 encountered an error!");
}
return id * 10; // 返回任务的计算结果
}
int main() {
// 创建多个异步任务
std::vector<std::future<int>> futures;
for (int i = 1; i <= 5; ++i) {
futures.push_back(std::async(std::launch::async, computeTask, i, i));
}
// 获取并处理每个任务的结果
for (int i = 0; i < futures.size(); ++i) {
try {
int result = futures[i].get(); // 获取任务的结果
std::cout << "Task " << (i + 1) << " result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Exception from task " << (i + 1) << ": " << e.what() << std::endl;
}
}
return 0;
}
-
包含必要的头文件:
#include <iostream> #include <future> #include <vector> #include <stdexcept> #include <chrono> #include <thread>
iostream
:用于标准输入输出。future
:包含std::future
和std::async
。vector
:用于存储多个std::future
对象。stdexcept
:用于处理异常。chrono
:用于线程睡眠操作。
-
定义任务函数:
int computeTask(int id, int delay) { std::cout << "Task " << id << " is running in thread: " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(delay)); // 模拟耗时操作 // 模拟任务可能抛出异常 if (id == 3) { throw std::runtime_error("Task 3 encountered an error!"); } return id * 10; // 返回任务的计算结果 }
computeTask
是一个模拟的计算任务,接受两个参数:id
表示任务的标识符,delay
表示任务的延迟时间。std::this_thread::sleep_for(std::chrono::seconds(delay))
:模拟任务的耗时操作。- 如果
id
为 3,任务会抛出一个std::runtime_error
异常。 - 任务的返回值是
id * 10
。
-
创建多个异步任务:
std::vector<std::future<int>> futures; for (int i = 1; i <= 5; ++i) { futures.push_back(std::async(std::launch::async, computeTask, i, i)); }
- 创建一个
std::vector<std::future<int>>
来存储多个异步任务的结果。 - 使用
std::async
启动 5 个异步任务,每个任务的id
和delay
相同。
- 创建一个
-
获取并处理每个任务的结果:
for (int i = 0; i < futures.size(); ++i) { try { int result = futures[i].get(); // 获取任务的结果 std::cout << "Task " << (i + 1) << " result: " << result << std::endl; } catch (const std::exception& e) { std::cerr << "Exception from task " << (i + 1) << ": " << e.what() << std::endl; } }
- 遍历
futures
向量,使用get()
方法获取每个任务的结果。 get()
方法会阻塞当前线程,直到任务完成并返回结果。- 如果任务抛出异常,
get()
会重新抛出该异常,因此使用try-catch
块来捕获并处理异常。
- 遍历
-
输出结果:
- 如果任务成功完成,输出任务的计算结果。
- 如果任务抛出异常,输出异常信息。
- 简化异步编程:
std::future
提供了一种简单的方式来获取异步任务的结果,避免了复杂的回调机制。 - 线程安全:
std::future
的get()
方法是线程安全的,可以安全地在多个线程中调用。 - 支持异常处理:
std::future
可以捕获异步任务抛出的异常,并在调用get()
时重新抛出。
- 阻塞调用:
get()
方法会阻塞调用线程,直到任务完成。如果任务耗时较长,可能会导致性能问题。 - 单次使用:
std::future
只能调用一次get()
方法,之后就无法再次获取结果。 - 无法取消任务:
std::future
不支持任务的取消操作,任务一旦启动就无法中途停止。
std::future
是 C++ 中实现异步编程的重要工具。通过 std::future
,你可以轻松地获取异步任务的结果,并处理可能的异常。然而,std::future
也有其局限性,特别是在需要频繁获取结果或取消任务的场景下。在实际应用中,开发者需要根据具体需求选择合适的并发编程工具。
任务的组合与依赖是指将多个任务按照一定的顺序或依赖关系组合在一起,形成一个任务链。在前一个任务完成之后,后一个任务才能开始执行。这种机制可以确保任务之间的数据一致性和执行顺序。
在实际应用中,任务之间往往存在依赖关系。例如,一个任务的输出可能是另一个任务的输入。通过任务的组合与依赖,可以确保任务按照正确的顺序执行,避免数据竞争和逻辑错误。
#include <iostream>
#include <future>
#include <vector>
#include <thread>
// 任务1:生成一组数据
std::vector<int> generateData(int size) {
std::vector<int> data(size);
for (int i = 0; i < size; ++i) {
data[i] = i;
}
return data;
}
// 任务2:处理数据
int processData(const std::vector<int>& data) {
int sum = 0;
for (int value : data) {
sum += value;
}
return sum;
}
int main() {
// 异步执行任务1
std::future<std::vector<int>> dataFuture = std::async(std::launch::async, generateData, 10);
// 等待任务1完成并获取结果
std::vector<int> data = dataFuture.get();
// 异步执行任务2
std::future<int> sumFuture = std::async(std::launch::async, processData, std::ref(data));
// 等待任务2完成并获取结果
int sum = sumFuture.get();
std::cout << "Sum of data: " << sum << std::endl;
return 0;
}
-
定义任务函数:
std::vector<int> generateData(int size) { std::vector<int> data(size); for (int i = 0; i < size; ++i) { data[i] = i; } return data; }
generateData
函数生成一个包含size
个整数的向量。
int processData(const std::vector<int>& data) { int sum = 0; for (int value : data) { sum += value; } return sum; }
processData
函数计算向量中所有元素的和。
-
异步执行任务1:
std::future<std::vector<int>> dataFuture = std::async(std::launch::async, generateData, 10);
std::async
启动一个新线程来执行generateData
任务,并返回一个std::future
对象。
-
等待任务1完成并获取结果:
std::vector<int> data = dataFuture.get();
dataFuture.get()
阻塞当前线程,直到generateData
任务完成并返回结果。
-
异步执行任务2:
std::future<int> sumFuture = std::async(std::launch::async, processData, std::ref(data));
std::async
启动一个新线程来执行processData
任务,并传递data
作为参数。
-
等待任务2完成并获取结果:
int sum = sumFuture.get();
sumFuture.get()
阻塞当前线程,直到processData
任务完成并返回结果。
-
输出结果:
std::cout << "Sum of data: " << sum << std::endl;
- 输出任务2的结果。
- 确保任务顺序:通过任务的组合与依赖,可以确保任务按照正确的顺序执行。
- 简化复杂逻辑:将复杂的任务分解为多个小任务,并通过依赖关系组合在一起,简化了代码逻辑。
- 提高并行度:在任务之间没有依赖关系的情况下,可以并行执行多个任务,提高程序的执行效率。
- 复杂性增加:任务的组合与依赖可能会增加代码的复杂性,特别是在任务数量较多时。
- 性能开销:频繁的任务切换和同步操作可能会带来一定的性能开销。
#include <iostream>
#include <vector>
#include <future>
#include <numeric>
// 并行计算向量的和
int parallelSum(const std::vector<int>& data, int start, int end) {
if (end - start <= 1000) {
return std::accumulate(data.begin() + start, data.begin() + end, 0);
}
int mid = (start + end) / 2;
std::future<int> leftSum = std::async(std::launch::async, parallelSum, std::ref(data), start, mid);
int rightSum = parallelSum(data, mid, end);
return leftSum.get() + rightSum;
}
int main() {
std::vector<int> data(10000, 1); // 10000个1
int sum = parallelSum(data, 0, data.size());
std::cout << "Sum of data: " << sum << std::endl;
return 0;
}
-
定义并行计算函数:
int parallelSum(const std::vector<int>& data, int start, int end) { if (end - start <= 1000) { return std::accumulate(data.begin() + start, data.begin() + end, 0); } int mid = (start + end) / 2; std::future<int> leftSum = std::async(std::launch::async, parallelSum, std::ref(data), start, mid); int rightSum = parallelSum(data, mid, end); return leftSum.get() + rightSum; }
parallelSum
函数递归地将向量分成两部分,并行计算每一部分的和。- 如果子向量的长度小于等于 1000,则直接计算其和。
-
生成数据:
std::vector<int> data(10000, 1); // 10000个1
- 生成一个包含 10000 个 1 的向量。
-
并行计算向量的和:
int sum = parallelSum(data, 0, data.size());
- 调用
parallelSum
函数计算向量的和。
- 调用
-
输出结果:
std::cout << "Sum of data: " << sum << std::endl;
- 输出计算结果。
#include <iostream>
#include <fstream>
#include <future>
#include <string>
// 异步读取文件内容
std::string readFileAsync(const std::string& filename) {
std::ifstream file(filename);
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
return content;
}
int main() {
std::future<std::string> contentFuture = std::async(std::launch::async, readFileAsync, "example.txt");
std::cout << "Reading file asynchronously..." << std::endl;
std::string content = contentFuture.get();
std::cout << "File content: " << content << std::endl;
return 0;
}
-
定义异步读取文件函数:
std::string readFileAsync(const std::string& filename) { std::ifstream file(filename); std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>()); return content; }
readFileAsync
函数异步读取文件内容,并返回文件内容字符串。
-
异步读取文件:
std::future<std::string> contentFuture = std::async(std::launch::async, readFileAsync, "example.txt");
std::async
启动一个新线程来执行readFileAsync
任务。
-
等待文件读取完成并获取结果:
std::string content = contentFuture.get();
contentFuture.get()
阻塞当前线程,直到文件读取完成并返回结果。
-
输出文件内容:
std::cout << "File content: " << content << std::endl;
- 输出文件内容。
#include <iostream>
#include <vector>
#include <future>
#include <algorithm>
// 并发处理数据
void processDataConcurrently(std::vector<int>& data) {
std::vector<std::future<void>> futures;
for (int& value : data) {
futures.push_back(std::async(std::launch::async, [&value]() {
value *= 2; // 每个元素乘以2
}));
}
for (auto& future : futures) {
future.get();
}
}
int main() {
std::vector<int> data = {1, 2, 3, 4, 5};
processDataConcurrently(data);
std::cout << "Processed data: ";
for (int value : data) {
std::cout << value << " ";
}
std::cout << std::endl;
return 0;
}
-
定义并发处理函数:
void processDataConcurrently(std::vector<int>& data) { std::vector<std::future<void>> futures; for (int& value : data) { futures.push_back(std::async(std::launch::async, [&value]() { value *= 2; // 每个元素乘以2 })); } for (auto& future : futures) { future.get(); } }
processDataConcurrently
函数并发处理向量中的每个元素,将每个元素乘以 2。
-
生成数据:
std::vector<int> data = {1, 2, 3, 4, 5};
- 生成一个包含 5 个整数的向量。
-
并发处理数据:
processDataConcurrently(data);
- 调用
processDataConcurrently
函数并发处理数据。
- 调用
-
输出处理后的数据:
std::cout << "Processed data: "; for (int value : data) { std::cout << value << " "; } std::cout << std::endl;
- 输出处理后的数据。