约简运算符的概念是将数组的许多或所有元素约简为一个结果,它与并发和并行编程密切相关。具体地说,由于运算符的关联性和通信性,可以应用并发性和并行性来大大提高它们的执行时间。
本章从程序员和开发人员的角度讨论设计和编写约简运算符的理论并行方法。从这里开始,本章还将连接到类似的问题,这些问题可以通过类似的方式使用并发来解决。
本章将介绍以下主题:
- 计算机科学中的约化算子概念
- 还原运算符的通信和关联特性,以及可以应用并发的原因
- 如何识别等价于约简运算符的问题,以及如何在这种情况下应用并发编程
以下是本章的先决条件列表:
- 您的计算机上必须安装 Python 3
- 从下载 GitHub 存储库 https://github.com/PacktPublishing/Mastering-Concurrency-in-Python
- 在本章中,我们将使用名为
Chapter07
的子文件夹 - 查看以下视频以查看代码的运行:http://bit.ly/2TD5odl
作为经验丰富的程序员,您无疑遇到过这样的情况:您需要计算数组中所有数字的和或积,或者计算将AND
运算符应用于数组中所有布尔元素的结果,以查看该数组中是否存在任何假值。这些被称为归约运算符,它接受一组或一组元素,并执行某种形式的计算以仅返回一个结果。
并不是每一个数学或计算机科学操作员都是约化运算符。事实上,即使运算符能够将元素数组缩减为单个值,也不能保证它是一个缩减运算符。如果运算符满足以下条件,则为约化运算符:
- 运算符可以将元素数组缩减为一个标量值
- 最终结果(标量值)必须通过创建和计算部分任务来获得
第一个条件表示短语“归约运算符”,因为必须将输入数组的所有元素组合并归约为一个值。然而,第二个条件本质上是并发性和并行性。它要求任何约化算子的计算能够被划分为更小的部分计算。
首先,让我们考虑最常见的约化算子之一:加法。例如,考虑输入数组 AuthT0} -该数组中元素的和如下:
1 + 4 + 8 + 3 + 2 + 5
= ((((1 + 4) + 8) + 3) + 2) + 5
= (((5 + 8) + 3) + 2) + 5
= ((13 + 3) + 2) + 5
= (16 + 2) + 5
= 18 + 5
= 23
在前面的计算中,我们按顺序将数组中的数字减少为它们的和23
。换句话说,我们从头到尾检查了数组中的每个元素,并将当前的总和相加。现在我们知道加法是一个交换的结合算子,意思是:a+b=b+a和*(a+b)+c=a+(b+c)*。
因此,我们可以通过将求和分解为更小的求和,以更有效的方式执行前面的计算:
1 + 4 + 8 + 3 + 2 + 5
= ((1 + 4) + (8 + 3)) + (2 + 5)
= (5 + 11) + 7
= 16 + 7
= 23
这项技术是将并发性和并行性(特别是多处理)应用于 reduce 操作符的核心。通过将整个任务分解为更小的子任务,多个进程可以同时执行这些小计算,整个系统可以更快地得出结果。
出于同样的原因,交际性和联想性被认为等同于我们前面讨论的约化算子的要求。换句话说,操作符是一个交际和联想的归约操作符。具体如下:
这里a、b和c是输入数组的元素。
因此,如果一个操作符是一个归约操作符,那么它必须是交流的和关联的,因此它能够将一个大任务分解成更小、更易于管理的子任务,这可以通过使用多处理以更有效的方式进行计算。
到目前为止,我们已经看到加法是约化算子的一个例子。为了将加法作为归约运算符来执行,我们首先将输入数组中的元素分成两个组,每个组是我们的一个子任务。然后,我们对每组执行加法,从每组中获取加法结果,并再次将它们分成两组。
这个过程一直持续到我们得到一个数字。此过程遵循一种称为二叉树约简的模型,该模型利用两个组组成子任务:
Diagram of binary tree reduction for addition
在前面的示例中,使用数组[1,4,8,3,2,5],在将数字分成两个数字(1 和 4,8 和 3,2 和 5)的三个不同组后,我们使用三个单独的过程将数字对相加。然后我们得到了数组[5,11,7],我们在一个过程中使用它来获得[16,7],然后在另一个过程中最终获得 23。因此,使用三个或更多 CPU,六个元素的加法运算符可以在日志26=3 步中完成,而不是顺序加法中的五步。
还原运算符的其他常见示例是乘法和逻辑 and。例如,使用乘法作为缩减运算符来缩减相同的数字数组[1、4、8、3、2、5],操作如下:
1 x 4 x 8 x 3 x 2 x 5
= ((1 x 4) x (8 x 3)) x (2 x 5)
= (4 x 24) x 10
= 96 x 10
= 960
要减少布尔值数组,例如(True
、False
、False
、True
,使用逻辑AND
运算符,我们可以执行以下操作:
True AND False AND False AND True
= (True AND False) AND (False AND True)
= False AND False
= False
还原运算符的一个非示例是幂函数,因为改变计算顺序会改变最终结果(也就是说,函数不是可通信的)。例如,按顺序减少数组[2, 1, 2]
将得到以下结果:
2 ^ 1 ^ 2 = 2 ^ (1 ^ 2) = 2 ^ 1 = 2
如果我们改变操作顺序如下:
(2 ^ 1) ^ 2 = 2 ^ 2 = 4
我们将获得不同的值。因此,功率函数不是还原操作。
如前所述,由于还原运算符的通信和关联特性,它们可以独立创建和处理部分任务,这就是可以应用并发性的地方。为了真正了解还原运算符如何利用并发性,让我们尝试实现一个并发,从头开始的多处理缩减运算符—特别是添加运算符。
与我们在上一章中看到的类似,在本例中,我们将使用任务队列和结果队列来促进进程间通信。具体来说,程序将把任务队列中输入数组中的所有数字存储为单个任务。当我们的每个消费者(单个进程)执行时,它将调用任务队列上的get()
两次,以获得两个任务编号(除了一些边缘情况,任务队列中没有或只有一个编号),将它们加在一起,并将结果放入结果队列中。
与将成对的数字相加类似,就像我们在上一节中所做的那样,在我们的流程迭代任务队列一次并将添加的成对任务编号放入结果队列后,输入数组中的元素数量将减少一半。例如,[1, 4, 8, 3, 2, 5]
的输入数组将变成[5, 11, 7]
。
现在,我们的程序将新的任务队列指定为结果队列(因此,在本例中,[5, 11, 7]
现在是新的任务队列),我们的进程将继续遍历它,并将成对的数字相加,以生成一个新的结果队列,它将成为下一个任务队列。这个过程会不断重复,直到结果队列只包含一个元素,因为我们知道这个数字是原始输入数组中数字的总和。
下图显示了在处理输入数组[1, 4, 8, 3, 2, 5]
的每次迭代中任务队列和结果队列的变化;当结果队列仅包含一个编号(23
时,进程停止:
Sample diagram of the multiprocessing add operator
让我们看一看{ To.t1}文件中的{To0t0}类:
# Chapter07/example1.py
class ReductionConsumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
pname = self.name
print('Using process %s...' % pname)
while True:
num1 = self.task_queue.get()
if num1 is None:
print('Exiting process %s.' % pname)
self.task_queue.task_done()
break
self.task_queue.task_done()
num2 = self.task_queue.get()
if num2 is None:
print('Reaching the end with process %s and number
%i.' % (pname, num1))
self.task_queue.task_done()
self.result_queue.put(num1)
break
print('Running process %s on numbers %i and %i.' % (
pname, num1, num2))
self.task_queue.task_done()
self.result_queue.put(num1 + num2)
我们通过重写multiprocessing.Process
类来实现ReductionConsumer
类。此使用者类在初始化时接收一个任务队列和一个结果队列,并处理程序的使用者进程逻辑,该程序在任务队列上调用两次get()
以从队列中获取两个数字,并将它们的总和添加到结果队列中。
在执行此操作时,ReductionConsumer
类还处理任务队列中没有或只有一个数字的情况(即,num1
或num2
变量为None
,正如我们在上一章中所知道的,这是我们用来指示毒药丸的)。
另外,回想一下multiprocessing
模块的JoinableQueue
类是用来实现我们的任务队列的,它要求每次调用get()
函数后都要调用task_done()
函数,否则我们以后在任务队列上调用的后续join()
函数将无限期阻塞。因此,在使用者进程调用get()
两次的情况下,在当前任务队列上调用task_done()
两次很重要,当我们只调用get()
一次时(当第一个号码是毒药丸时),那么我们应该只调用task_done()
一次。这是在使用多处理程序以促进进程间通信时更复杂的考虑因素之一。
为了处理和协调不同的使用者流程,以及在每次迭代后操作任务队列和结果队列,我们有一个单独的函数称为reduce_sum()
:
def reduce_sum(array):
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.JoinableQueue()
result_size = len(array)
n_consumers = multiprocessing.cpu_count()
for item in array:
results.put(item)
while result_size > 1:
tasks = results
results = multiprocessing.JoinableQueue()
consumers = [ReductionConsumer(tasks, results)
for i in range(n_consumers)]
for consumer in consumers:
consumer.start()
for i in range(n_consumers):
tasks.put(None)
tasks.join()
result_size = result_size // 2 + (result_size % 2)
#print('-' * 40)
return results.get()
此函数接收 Python 数字列表以计算其元素的总和。除了任务队列和结果队列之外,该函数还跟踪另一个名为result_size
的变量,该变量表示当前结果队列中的元素数。
初始化其基本变量后,函数生成其使用者进程,以减少 while 循环中的当前任务队列。如前所述,在 while 循环的每次迭代中,任务队列中的元素成对添加在一起,添加的结果存储在结果队列中。之后,任务队列将接管该结果队列的元素,并向队列中添加额外的None
值以实现毒丸技术。
在每次迭代中,一个新的空结果队列也被初始化为一个JoinableQueue
对象。这与我们在上一章中用于结果队列的multiprocessing.Queue
类不同,因为我们将在下一次迭代开始时分配tasks = results
,任务队列需要是一个JoinableQueue
对象。
我们还通过result_size = result_size // 2 + (result_size % 2)
在每次迭代结束时更新result_size
的值。这里需要注意的是,虽然来自JoinableQueue
类的qsize()
方法是跟踪其对象长度(即JoinableQueue
对象中的元素数)的潜在方法,但由于各种原因,该方法通常被认为是不可靠的,甚至在 Unix 操作系统中都没有实现。
由于我们可以很容易地预测每次迭代后输入数组中剩余数字的数量将如何变化(如果是偶数,则减半,否则通过整数除法减半,然后将1
添加到该结果中),因此我们可以使用一个名为result_size
的单独变量跟踪该数字。
对于本例的主程序,我们只需将 Python 列表传递给reduce_sum()
函数。在这里,我们将数字从 0 添加到 19:
my_array = [i for i in range(20)]
result = reduce_sum(my_array)
print('Final result: %i.' % result)
运行脚本后,您的输出应类似于以下内容:
> python example1.py
Using process ReductionConsumer-1...
Running process ReductionConsumer-1 on numbers 0 and 1.
Using process ReductionConsumer-2...
Running process ReductionConsumer-2 on numbers 2 and 3.
Using process ReductionConsumer-3...
[...Truncated for readability..]
Exiting process ReductionConsumer-17.
Exiting process ReductionConsumer-18.
Exiting process ReductionConsumer-19.
Using process ReductionConsumer-20...
Exiting process ReductionConsumer-20.
Final result: 190.
约简运算符处理其数据的方式具有通信和关联性质,使运算符的子任务能够独立处理,因此与并发性和并行性高度相关。因此,并发编程中的各种主题可能与归约运算符有关,通过应用归约运算符的相同原理,与这些主题相关的问题可以变得更加直观和高效。
正如我们所看到的,加法和乘法运算符都是约化运算符。更一般地说,通常涉及通信和关联运算符的数字运算问题是应用并发性和并行性的主要候选问题。对于 PythonNumpy 中著名的、可以说是最常用的模块之一,这实际上是一个真实的例子,它的代码被实现为尽可能可并行化。
此外,将逻辑运算符 AND、OR 或 XOR 应用于布尔值数组的方式与归约运算符的方式相同。并发按位缩减运算符的一些实际应用包括:
- 有限状态机,通常在处理逻辑门时利用逻辑运算符。有限状态机可以在硬件结构和软件设计中找到。
- 跨套接字/端口的通信,通常涉及奇偶校验和停止位以检查数据错误,或流控制算法。这些技术利用单个字节的逻辑值,通过使用逻辑运算符来处理信息。
- 压缩和加密技术,这在很大程度上依赖于按位算法。
在 Python 中实现多处理缩减运算符时需要仔细考虑,特别是当程序利用任务队列和结果队列来促进消费者进程之间的通信时。
各种实际问题的操作类似于约简运算符,对这些问题使用并发性和并行性可以极大地提高处理这些问题的程序的效率和生产率。因此,重要的是能够识别这些问题,并与还原操作员的概念联系起来,以实施其解决方案。
在下一章中,我们将讨论 Python 中多处理程序的一个特定的实际应用:图像处理。我们将讨论图像处理背后的基本思想,以及如何将并发(特别是多处理)应用于图像处理应用。
- 什么是还原运算符?必须满足哪些条件才能使操作员成为还原操作员?
- 还原运算符具有哪些与所需条件等效的属性?
- 约简运算符和并发编程之间有什么联系?
- 在使用多处理程序以促进 Python 中的进程间通信时,必须考虑哪些因素?
- 并发约化算子的一些实际应用是什么?
有关更多信息,请参阅以下链接:
- Python 并行编程食谱,Giancarlo Zaccone,Packt 出版有限公司,2015 年
- 学习 Python 中的并发:构建高效、健壮、并发的应用。、艾略特·福布斯(2017)
- OpenMP中的并行编程,摩根·考夫曼,钱德拉,罗希特(2001)
- 并行多核体系结构基础,阎索利欣(2016),华润出版社