参考 https://www.jianshu.com/u/59ef38a1d84b
并行处理框架主要有MPI、OpenMP和MapReduce(Hadoop)三个(CUDA属于GPU并行编程,这里不提及)。MPI和Hadoop都可以在集群中运行,而OpenMP因为共享存储结构的关系,不能在集群上运行,只能单机。另外,MPI可以让数据保留在内存中,可以为节点间的通信和数据交互保存上下文,所以能执行迭代算法,而Hadoop却不具有这个特性。因此,需要迭代的机器学习算法大多使用MPI来实现。当然了,部分机器学习算法也是可以通过设计使用Hadoop来完成的。
MPI是Message Passing Interface的简称,也就是消息传递。消息传递指的是并行执行的各个进程具有自己独立的堆栈和代码段,作为互不相关的多个程序独立执行,进程之间的信息交互完全通过显示地调用通信函数来完成。
Mpi4py是构建在mpi之上的python库,使得python的数据结构可以在进程(或者多个cpu)之间进行传递。MPI的每个进程都有一个ID,也就是rank来标记我是谁。什么意思呢?假设一个CPU是你请的一个工人,共有10个工人。你有100块砖头要搬,然后很公平,让每个工人搬10块。这时候,你把任务写到一个任务卡里面,让10个工人都执行这个任务卡中的任务,也就是搬砖!这个任务卡中的“搬砖”就是你写的代码。然后10个CPU执行同一段代码。需要注意的是,代码里面的所有变量都是每个进程独有的,虽然名字相同。

1 基础通信

首先是启动进程,并获取进程编号。如下脚本test.py,里面包含以下代码:

from mpi4py import MPI
print("hello world")
print("my rank is: %d" %MPI.COMM_WORLD.Get_rank())

然后页面上运行:mpirun -np 3 python test.py
返回

hello world
hello world
hello world
my rank is: 0
my rank is: 1
my rank is: 2

点对点通信(Point-to-PointCommunication)的能力是信息传递系统最基本的要求。意思就是让两个进程直接可以传输数据,也就是一个发送数据,另一个接收数据。接口就两个,send和recv,来个例子:

import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
 
# point to point communication
data_send = [comm_rank]*5 
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv =comm.recv(source=(comm_rank-1)%comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)

运行mpirun -np 4 python test.py

my rank is 0, and Ireceived:
[3, 3, 3, 3, 3]
my rank is 1, and Ireceived:
[0, 0, 0, 0, 0]
my rank is 2, and Ireceived:
[1, 1, 1, 1, 1]
my rank is 3, and Ireceived:
[2, 2, 2, 2, 2]

如果我们要发送的数据比较小的话,mpi会缓存我们的数据,也就是说执行到send这个代码的时候,会缓存被send的数据,然后继续执行后面的指令,而不会等待对方进程执行recv指令接收完这个数据。但是,如果要发送的数据很大,那么进程就是挂起等待,直到接收进程执行了recv指令接收了这个数据,进程才继续往下执行。所以上述的代码发送[rank]*5没啥问题,如果发送[rank]*500程序就会半死不活的样子了。因为所有的进程都会卡在发送这条指令,等待下一个进程发起接收的这个指令,但是进程是执行完发送的指令才能执行接收的指令,这就和死锁差不多了。所以一般,我们将其修改成以下的方式:

import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
 
data_send = [comm_rank]*5
if comm_rank == 0:
   comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank > 0:
   data_recv = comm.recv(source=(comm_rank-1)%comm_size)
   comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank == 0:
   data_recv = comm.recv(source=(comm_rank-1)%comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)

最开始0进程发送数据,避免了死锁。
比较常用的方法是封一个组长,也就是一个主进程,一般是进程0作为主进程leader。主进程将数据发送给其他的进程,其他的进程处理数据,然后返回结果给进程0。换句话说,就是进程0来控制整个数据处理流程。

群体通信(Collective Communications)分发送和接收两类,一个是一次性把数据发给所有人,另一个是一次性从所有人那里回收结果。
广播bcast将一份数据发送给所有的进程。例如我有200份数据,有10个进程,那么每个进程都会得到这200份数据。

mport mpi4py.MPI as MPI
  
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
   data = range(comm_size) # 这份数据仅进程0拥有
data = comm.bcast(data if comm_rank == 0 else None, root=0) # 这里把它广播到了所有的进程
print ('rank %d, got:' % (comm_rank))
print (data)

散播scatter将一份数据平分给所有的进程。例如我有200份数据,有10个进程,那么每个进程会分别得到20份数据。
代码如下。注意和上面不同,每个进程都需要定义data变量,即使为None。


import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
 
if comm_rank == 0:
   data = range(comm_size)
   print data
else:
   data = None
local_data = comm.scatter(data, root=0)
print 'rank %d, got:' % comm_rank
print local_data

返回

[0, 1, 2, 3]
rank 0, got:
0
rank 1, got:
1
rank 2, got:
2
rank 3, got:

收集gather。那有发送,就有一起回收的函数。Gather是将所有进程的数据收集回来,合并成一个列表。下面联合scatter和gather组成一个完成的分发和收回过程:

import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
 
if comm_rank == 0:
   data = range(comm_size)
   print data
else:
   data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print ('rank %d, got and do:' % comm_rank)
print (local_data)
combine_data = comm.gather(local_data,root=0)
if comm_rank == 0:
	print(combine_data)

Gather还有一个变体就是allgather,可以理解为它在gather的基础上将gather的结果再bcast了一次。啥意思?意思是root进程将所有进程的结果都回收统计完后,再把整个统计结果告诉大家。这样,不仅root可以访问combine_data,所有的进程都可以访问combine_data了。
规约reduce是指不但将所有的数据收集回来,收集回来的过程中还进行了简单的计算,例如求和,求最大值等等。为什么要有这个呢?我们不是可以直接用gather全部收集回来了,再对列表求个sum或者max就可以了吗?这样不是累死组长吗?为什么不充分使用每个工人呢?规约实际上是使用规约树来实现的。例如求max,完成可以让工人两两pk后,再返回两两pk的最大值,然后再对第二层的最大值两两pk,直到返回一个最终的max给组长。组长就非常聪明的将工作分配下工人高效的完成了。这是O(n)的复杂度,下降到O(log n)(底数为2)的复杂度。

import mpi4py.MPI as MPI
 
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
   data = range(comm_size)
   print data
else:
   data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print ('rank %d, got and do:' % comm_rank)
print (local_data)
all_sum = comm.reduce(local_data, root=0,op=MPI.SUM)
if comm_rank == 0:
	print ('sumis:%d' % all_sum)
   结果如下:

[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 3, got and do:
6
rank 4, got and do:
8
sum is:20

2 读取大文件/大数组

Mpi4py支持numpy:

import os, sys, time
import numpy as np
import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if __name__ == "__main__":
   if comm_rank == 0:
       all_data = np.arange(20).reshape(4, 5)
   all_data = comm.bcast(all_data if comm_rank == 0 else None, root = 0)
   num_samples = all_data.shape[0]
   local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int')
   local_data = all_data[local_data_offset[comm_rank] :local_data_offset[comm_rank + 1]]
    #reduce to get sum of elements
   local_sum = local_data.sum()
   all_sum = comm.reduce(local_sum, root = 0, op = MPI.SUM)
   local_result = local_data ** 2
   result = comm.allgather(local_result)
   result = np.vstack(result)
   
   if comm_rank == 0:
       print ("*** sum: ", all_sum)
       print ("************ result ******************")
       print (result)

返回

*** sum:  190
************ result ******************
[[  0   1   4   9  16]
 [ 25  36  49  64  81]
 [100 121 144 169 196]
 [225 256 289 324 361]]
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if __name__ == '__main__':
   if len(sys.argv) != 2:
       sys.stderr.write("Usage: python *.py directoty_with_files\n")
       sys.exit(1)
   path = sys.argv[1]
   if comm_rank == 0:
       file_list = os.listdir(path)
       sys.stderr.write("%d files\n" % len(file_list))
   file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0)
   num_files = len(file_list)
   local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int')
   local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]]
   sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files))
   cnt = 0
   for file_name in local_files:
       hd = open(os.path.join(path, file_name))
       for line in hd:
           output = line.strip() + ' process every line here'
           print output
       cnt += 1
       sys.stderr.write("processor %d has processed %d/%d files \n" %(comm_rank, cnt, len(local_files)))
       hd.close()

3 多机执行

集群机器之间免密登录是必须的,并且所有的机器上都要有相同的程序和文件,在当前机器上建立hosts文件,枚举所有机器的ip地址,执行mpirun -f ./hosts -n 2 text.py即可

4 MPIArray

Logo

汇聚原天河团队并行计算工程师、中科院计算所专家以及头部AI名企HPC专家,助力解决“卡脖子”问题

更多推荐