MPI——进程之间的全局通信
进程之间的全局通信四种基本的MPI原语:广播、收集、归约和全交换阻塞与非阻塞和同步与异步通信阻塞通信产生的死锁并发性:局部计算可以与通信重叠执行单向与双向通信MPI的全局计算归约操作并行前缀采用通信器定义通信组通过在一组机器上执行MPI程序,我们启动一组进程,并且对与每个进程都有本地计算(与通常的串行程序执行类似),同时还可以执行以下操作数据传输:一些数据通过消息传送到其他的进程同步屏障:...
进程之间的全局通信
通过在一组机器上执行MPI程序,我们启动一组进程,并且对与每个进程都有本地计算(与通常的串行程序执行类似),同时还可以执行以下操作
- 数据传输:一些数据通过消息传送到其他的进程
- 同步屏障:导致所有的进程在继续运行之前都需要等待
- 全局计算:全局的操作
全局通信原语由属于同一个通信组的所有进程执行,默认的情况下,MPI初始化后,所有的进程都属于同一个被称为MPI_COMM_WORLD
的通信组。
四种基本的MPI原语:广播、收集、归约和全交换
MPI广播原语(即MPI_Bcast
)用来从根进程(通信组当前调用的进程)向其他所有(同一个进程组)的进程发送消息。
归约操作将变量的所有对应的值汇聚到到一个值,并将其返回给当前调用进程。
当将不同的个性化信息发送到其他每个进程中时,执行了在MPI中称为MPI_Scatter
的散播操作
聚合原语既可以用于通信、也可以用于全局计算:收集是散播操作的逆过程,这个过程中根进程从所有的进程接收个性化信息。
在MPI中,MPI_Reduce
可以通过聚合(归约)一个使用可交换二元运算符的变量来执行全局计算操作。
MPI名称 | 含义 |
---|---|
MPI_MAX | 最大值 |
MPI_MIN | 最小值 |
MPI_SUM | 求和 |
MPI_PROD | 求积 |
MPI_LAND | 逻辑与 |
MPI_BAND | 按位与 |
MPI_LOR | 逻辑或 |
MPI_BOR | 按位与 |
MPI_LXOR | 逻辑异或 |
MPI_BXOR | 按位异或 |
MPI_MAXLOC | 最大值和相应位置 |
MPI_MINLOC | 最小值和相应位置 |
四种基本的集体通信原语:
阻塞与非阻塞和同步与异步通信
MPI有多种发送模式,这取决于数据是否被缓冲,以及是否需要同步。
从send
和receive
这两种基本通信原语的表示开始:
send(&data, n, Pdest)
把从内存地址&data
开始的含有n
个数据的数组发送到进程Pdest
receive(&data, n, Psrc)
从进程Psrc
接收n
个数据,并将它们存储在一个本地内存地址&data
开始的数组中。
阻塞通信(非缓冲)产生了等待时间的情况,也就是空闲时间。发送进程和接收进程需要相互等待对方。这是一种通常称为==握手(hand-shaking)==的通信模式,这种通信模式允许执行同步通信。
发送进程等待接收进程的确认通过指令
重点通用的MPI
函数
函数名 | 含义 |
---|---|
MPI_Init | 初始化MPI库 |
MPI_Finalize | 终止MPI |
MPI_Comm_size | 返回进程数量 |
MPI_Comm_rank | 当前正在运行的进程的标识号 |
MPI_Send | 发送消息(阻塞) |
MPI_Recv | 接收消息(阻塞) |
MPI_Isend | 发送消息(非阻塞) |
MPI_Irecv | 接收消息(非阻塞) |
基本例子
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>
int main(int argc, char * argv[]){
int myid, numprocs;
int tag, source, destination, count;
int buffer;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
tag = 2312;
source = 0;
destination = 1;
count = 1;
if(myid == source) {
buffer = 2015;
MPI_Send(&buffer, count, MPI_INT, destination, tag, MPI_COMM_WORLD);
printf("processor %d received %d \n", myid, buffer);
}
if(myid == destination) {
MPI_Recv(&buffer, count, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
printf("processor %d received %d \n", myid, buffer);
}
MPI_Finalize();
}
mpic++ MPIBlockingCommunication.cpp -o MPIBlockingCommunication
mpirun -np 2 ./MPIBlockingCommunication
processor 0 received 2015
processor 1 received 2015
对于阻塞通信,要尽量的减少总的空闲时间。
描述MPI的send原语的参数
#include <mpi.h>
int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm )
send中的tag参数为消息指定一个整型数据(标签),这样进程就可以指定等待那些类型的消息。标签在实际中非常有用,有助于过滤通信。
C语言绑定的MPI基本数据类型
MPI数据类型 | C语言数据类型 |
---|---|
MPI_CHAR | signed char |
MPI_SHORT | signed short int |
MPI_INT | signed int |
MPI_LONG | Signed long int |
MPI_UNSIGNED_CHAR | unsigned char |
MPI_UNSIGNED_SHORT | unsigned short int |
MPI_UNSIGNED | unsigned short |
MPI_UNSIGNED_LONG | unsigned long int |
MPI_FLOAT | float |
MPI_DOUBLE | double |
MPI_LONG_DOUBLE | long double |
MPI_BYTE | |
MPI_PACKED |
阻塞通信产生的死锁
进程 P 0 P_0 P0发送一个消息,然后等待接收进程 P 1 P_1 P1的同意发送的指令,同时 P 1 P_1 P1也在发送数据,等待接收进程 P 0 P_0 P0的同意发送指令。这是一个典型的死锁情况。
实际上,在MPI中,每个发送、接收操作涉及一组通信,并且有一个标签属性。从算法角度中,阻塞通信是确保程序一致性(或语义)的一个非常理想的特性,但是阻塞通信会给检测死锁带来困难。
为了避免(或者至少最小化)这些死锁的情况,我们预先给每个进程分配一个专用的内存空间,用于缓冲数据:数据缓冲区(Data Buffer, DB)。然后
- 首先,发送进程在数据缓冲区发送消息
- 齐次,接收进程在地址
&data
指向的本地内存区域上复制数据缓冲区。
每个进程在发送之前需要等待一个指令,同样这个也是死锁状态。
当我们考虑像广播这样的全局通信以确保消息的正确到达顺序的情况时,阻塞通信是非常有用的。但是在这些通信算法时,必须注意潜在的死锁。
避免死锁的一个解决方案是考虑让send
和receive
原语成为非阻塞的。这些非阻塞通信程序(无缓冲)是由MPI的Isend
和Ireceive
表示,即异步通信。发送进程发布一条发送授权请求消息,并继续执行其程序的执行,当接收进程发布一个同意发送许可指令时,数据传输就启动了。
原语MPI_Wait(&request, &status)
等到数据传输完成之后,使用一个称为status的状态变量来指示数据传输是否已经完成。
#include <mpi.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[]){
int myid, numprocs;
int tag, source,destination, count;
int buffer;
MPI_Status status;
MPI_Request request;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
tag = 2312;
source = 0;
destination = 1;
count = 1;
request = MPI_REQUEST_NULL;
if(myid == source) {
buffer = 2015;
MPI_Isend(&buffer, count, MPI_INT, destination, tag, MPI_COMM_WORLD, &request);
printf("send\n");
}
if(myid == destination) {
MPI_Irecv(&buffer, count, MPI_INT, source, tag, MPI_COMM_WORLD, &request);
printf("received\n");
}
MPI_Wait(&request, &status);
if(myid == source) {
printf("processor %d sent %d \n",myid, buffer);
}
if(myid == destination) {
printf("processor %d received %d \n",myid, buffer);
}
MPI_Finalize();
return 0;
}
➜ 并行计算 mpic++ MPINonBlockingCommunication.cpp -o MPINonBlockingCommunication
➜ 并行计算 mpirun -np 2 ./MPINonBlockingCommunication
received
send
processor 0 sent 2015
processor 1 received 2015
非阻塞Isend
和Irecv
的调用语法
#include <mpi.h>
int MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)
MPI_Request
结构是程序中经常使用的,当*request
完成之后返回1,否则返回0。
#include <mpi.h>
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)
原语MPI_Wait一直等到*repuest
所执行的操作完成。
#include <mpi.h>
int MPI_Wait(MPI_Request *request, MPI_Status *status)
不同发送、接收操作协议的对比
阻塞操作 | 非阻塞操作 | |
---|---|---|
缓冲 | send操作在数据拷贝到数据缓冲区之后完成 | send初始化DMA(直接内存访问)之后,完成send操作,DMA可以将数据转移到数据缓冲区。该操作在函数返回后不一定完成 |
非缓冲 | 阻塞send操作只到碰到一个相应的receive操作 | |
含义 | 配对send和receive操作的语义 | 程序设计者除了需要检查操作状态,还必须明确指出语义 |
并发性:局部计算可以与通信重叠执行
处理器可以在同一时间运行好几个任务,要求这三个操作互相不干涉对方,所在一个阶段中,我们不能把一个运算的结果发送出去,并且我们不能把同一时刻所接收的内容发送。在并行算法中,我们用||
表示这些并发操作
IRecv || ISend ||Local_Computation
单向与双向通信
单向通信:通信信道中的消息仅能在单方向上进行通信,要么发送一个消息,要么接收一个消息。但不能同时进行
双向通信:我们可以同时进行双向通信,可以调用MPI_Sendrecv
来完成。
MPI的全局计算
累加 V = ∑ i = 0 P − 1 v i V = \sum\limits_{i=0}^{P-1} v_i V=i=0∑P−1vi或者 V = ∏ i = 0 P − 1 v i V=\prod\limits_{i=0}^{P-1}v_i V=i=0∏P−1vi 的全局计算,其中 v i v_i vi是存储在进程 P i P_i Pi内存中的局部变量。这个全局计算结果 V V V可以从调用了这个归约(reduce)原语的进程从本地内存中获得,这个进程就是当前调用进程,也称为根进程。
归约操作
#include <mpi.h>
int MPI_Reduce(const void *sendbuf, void *recvbuf, int count,MPI_Datatype datatype, MPI_Op op, int root,MPI_Comm comm)
int MPI_Ireduce(const void *sendbuf, void *recvbuf, int count,MPI_Datatype datatype, MPI_Op op, int root,MPI_Comm comm, MPI_Request *request)
输入参数
- sendbuf :发送缓存的地址
- count:发送缓存的数据个数
- datatype:发送缓存的数据类型
- op:归约操作(句柄)
- root 根进程
- comm:通信组(句柄)
输出参数:
- recvbuf:接收缓存地址,只在根进程有效
- request:请求(句柄)
之前介绍了归约操作的关键词。
并行前缀
也称为扫描,一个扫描操作计算存储在进程中的本地数据的所有部分归约操作。
#include <mpi.h>
int MPI_Scan(const void *sendbuf, void *recvbuf, int count,MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
int MPI_Iscan(const void *sendbuf, void *recvbuf, int count,MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,MPI_Request *request)
调用此函数可以通过内存地址recvbuf中获取的结果对每个进程中位于sendbuf的数据进行前缀归约操作。
采用通信器定义通信组
在MPI中,通信器能够将进程分为不同的通信组,每个进程都包括在一个通信组中,并由通信组内部的进程标识号索引。默认情况下,MPI_COMM_WORLD
包括了所有
P
P
P个标识号从
0
0
0到
P
−
1
P-1
P−1的整型数字的进程。为了获取通信组内部进程数量和进程组内部的进程标识号。
int MPI_Comm_size(MPI_Comm comm, int *size);
int MPI_Comm_rank(MPI_Comm comm. int *size);
通过删除第一个进程来创建一个新的通信域
#include <mpi.h>
int main(int argc, char *argv[]){
MPI_Comm comm_world, comm_worker;
MPI_Group group_world, group_worker;
comm_world = MPI_COMM_WORLD;
MPI_Comm_group(comm_world, &group_world);
MPI_Group_excl(group_world, 1, 0, &group_worker);
MPI_Comm_create(comm_world, group_worker, &comm_worker);
}
使用通信域
#include <mpi.h>
#include <stdio.h>
#define NPROCS 8
int main(int argc, char *argv[]) {
int ranks1[4] = {0,1,2,3};
int ranks2[4] = {4,5,6,7};
int rank,recvbuf = 0,new_rank;
MPI_Group orig_group, new_group;
MPI_Comm new_comm;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int sendbuf=rank;
MPI_Comm_group(MPI_COMM_WORLD, &orig_group);
if(rank < NPROCS/2) {
MPI_Group_incl(orig_group, NPROCS/2, ranks1, &new_group);
}
else {
MPI_Group_incl(orig_group, NPROCS/2, ranks2, &new_group);
MPI_Comm_create(MPI_COMM_WORLD, new_group, &new_comm);
}
MPI_Group_rank(new_group, &new_rank);
printf("rank = %d newrank = %d recvbuf = %d\n",rank,new_rank,recvbuf);
MPI_Allreduce(&sendbuf, &recvbuf,1, MPI_INT,MPI_SUM,new_comm);
MPI_Finalize();
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)