Using MPI with C

并行程序使用户能够充分利用超级计算集群的多节点结构。消息传递接口 (MPI) 是一种标准,用于允许集群上的多个不同处理器相互通信。在本教程中,我们将使用英特尔 C++ 编译器、GCC、IntelMPI 和 OpenMPI 用 C++ 创建多处理器“hello world”程序。本教程假设用户具有 Linux 终端和 C++ 经验。

Setup and “Hello, World”

这应该为您的环境准备好编译和运行 MPI 代码所需的所有工具。现在让我们开始构建我们的 C++ 文件。在本教程中,我们将命名我们的代码文件:hello_world_mpi.cpp
打开 hello_world_mpi.cpp 并首先包含 C 标准库 <stdio.h> 和 MPI 库 <mpi.h> ,并构建 C++ 代码的 main 函数:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    return 0;
}

现在让我们设置几个 MPI 指令来并行化我们的代码。在这个“Hello World”教程中,我们将使用以下四个指令:

  • MPI_Init():

该函数初始化 MPI 环境。它接收 C++ 命令行参数 argc 和 argv 的地址。

  • MPI_Comm_size():

此函数通过进程数量返回环境的总大小。该函数接受 MPI 环境和整型变量的内存地址。

  • MPI_Comm_rank():

该函数返回调用该函数的处理器的进程ID。该函数接受 MPI 环境和整型变量的内存地址。

  • MPI_Finalize():

该函数清理 MPI 环境并结束 MPI 通信。

这四个指令应该足以让我们的并行“hello world”运行。我们将首先创建两个变量 process_Rank 和 size_Of_Cluster,分别存储每个并行进程的标识符和集群中运行的进程数。我们还将实现 MPI_Init 函数,该函数将初始化 mpi 通信器:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster

    MPI_Init(&argc, &argv);

    return 0;
}

现在让我们获取有关处理器集群的一些信息并为用户打印该信息。我们将使用函数 MPI_Comm_size() 和 MPI_Comm_rank() 分别获取进程计数和进程排名:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    printf("Hello World from process %d of %d\n", process_Rank, size_Of_Cluster);

    return 0;
}

最后让我们使用 MPI_Finalize() 关闭环境:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    printf("Hello World from process %d of %d\n", process_Rank, size_Of_Cluster);

    MPI_Finalize();
    return 0;
}

现在代码已经完成并可以编译了。因为这是一个MPI程序,所以我们必须使用专门的编译器。请务必根据您加载的编译器使用正确的命令。

OpenMPI

mpic++ hello_world_mpi.cpp -o hello_world_mpi.exe

这将生成一个可执行文件,我们可以将其作为作业传递给集群。为了执行 MPI 编译代码,必须使用特殊命令:

mpirun -np 4 ./hello_world_mpi.exe

标志 -np 指定在执行程序时要使用的处理器数量。
在作业脚本中,加载上面用于编译程序的相同编译器和 OpenMPI 选择,并使用 Slurm 运行作业以执行应用程序。您的作业脚本应如下所示:

OpenMPI

#!/bin/bash
#SBATCH -N 1
#SBATCH --ntasks 4
#SBATCH --job-name parallel_hello
#SBATCH --constraint ib
#SBATCH --partition atesting
#SBATCH --time 0:01:00
#SBATCH --output parallel_hello_world.out

module purge

module load gcc
module load openmpi

mpirun -np 4 ./hello_world_mpi.exe

重要的是要注意,在AIpine,每个节点共有64个核心。对于需要超过64个流程的应用程序,您需要在工作中要求多个节点。我们的输出文件应该看起来像这样:

Hello World from process 3 of 4
Hello World from process 2 of 4
Hello World from process 1 of 4
Hello World from process 0 of 4

MPI Barriers and Synchronization

与许多其他并行编程实用程序一样,同步是线程安全和确保代码的某些部分在某些点得到处理的重要工具。 MPI_Barrier 是一个进程锁,它将每个进程保持在某一代码行,直到所有进程都到达该代码行。 MPI_Barrier 可以这样调用:

MPI_Barrier(MPI_Comm comm);

为了掌握障碍,让我们修改“Hello World”程序,以便它按线程 ID 的顺序打印出每个进程。从上一节中的“Hello World”代码开始,首先将 print 语句嵌套在循环中:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    for(int i = 0; i < size_Of_Cluster; i++){
        printf("Hello World from process %d of %d\n", process_Rank, size_Of_Cluster);
    }

    MPI_Finalize();
    return 0;
}

接下来,让我们在循环中实现一个条件语句,仅当循环迭代与进程等级匹配时才打印。

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    for(int i = 0; i < size_Of_Cluster; i++){
        if(i == process_Rank){
            printf("Hello World from process %d of %d\n", process_Rank, size_Of_Cluster);
        }
    }
    MPI_Finalize();
    return 0;
}

最后,在循环中实现屏障函数。这将确保所有进程在通过循环时都是同步的。

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    for(int i = 0; i < size_Of_Cluster; i++){
        if(i == process_Rank){
            printf("Hello World from process %d of %d\n", process_Rank, size_Of_Cluster);
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    MPI_Finalize();
    return 0;
}

编译并运行此代码将产生以下输出:

Hello World from process 0 of 4
Hello World from process 1 of 4
Hello World from process 2 of 4
Hello World from process 3 of 4

Message Passing

消息传递是 MPI 应用程序接口中的主要实用程序,允许进程相互通信。在本教程中,我们将学习两个进程之间消息传递的基础知识。
MPI 中的消息传递由相应的函数及其参数处理:

MPI_Send(void* message, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm, communicator);
MPI_Recv(void* data, int count, MPI_Datatype datatype, int from, int tag, MPI_Comm comm, MPI_Status* status);

参数如下:

  • MPI_Send
void* message;          //Address for the message you are sending.
int count;              //Number of elements being sent through the address.
MPI_Datatype datatype;  //The MPI specific data type being passed through the address.
int dest;               //Rank of destination process.
int tag;                //Message tag.
MPI_Comm comm;          //The MPI Communicator handle.
  • MPI_Recv
void* message;          //Address to the message you are receiving.
int count;              //Number of elements being sent through the address.
MPI_Datatype datatype;  //The MPI specific data type being passed through the address.
int from;               //Process rank of sending process.
int tag;                //Message tag.
MPI_Comm comm;          //The MPI Communicator handle.
MPI_Status* status;     //Status object.

让我们通过一个例子来实现消息传递:

Example

我们将创建一个双进程进程,将数字 42 从一个进程传递到另一个进程。我们将使用“Hello World”程序作为该程序的起点。让我们首先创建一个变量来存储一些信息。

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster, message_Item;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    MPI_Finalize();
    return 0;
}

现在创建 if 和 else if 条件,指定调用 MPI_Send() 和 MPI_Recv() 函数的适当过程。在此示例中,我们希望进程 1 向进程 2 发送一条包含整数 42 的消息。

#include <stdio.h>
#include <mpi.h>
int main(int argc, char** argv){
    int process_Rank, size_Of_Cluster, message_Item;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    if(process_Rank == 0){
        message_Item = 42;
        printf(“Sending message containing: %d\n”, message_Item)
    }
    else if(process_Rank == 1){
        printf(“Received message containing: %d\n”, message_Item)
    }
    MPI_Finalize();
    return 0;
}

最后我们必须调用 MPI_Send() 和 MPI_Recv()。我们将以下参数传递给函数:

MPI_Send(
    &message_Item,      //Address of the message we are sending.
    1,                  //Number of elements handled by that address.
    MPI_INT,            //MPI_TYPE of the message we are sending.
    1,                  //Rank of receiving process
    1,                  //Message Tag
    MPI_COMM_WORLD      //MPI Communicator
);

MPI_Recv(
    &message_Item,      //Address of the message we are receiving.
    1,                  //Number of elements handled by that address.
    MPI_INT,            //MPI_TYPE of the message we are sending.
    0,                  //Rank of sending process
    1,                  //Message Tag
    MPI_COMM_WORLD      //MPI Communicator
    MPI_STATUS_IGNORE   //MPI Status Object
);

让我们在代码中实现这些函数:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv) {
    int process_Rank, size_Of_Cluster, message_Item;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Cluster);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    if(process_Rank == 0){
        message_Item = 42;
        MPI_Send(&message_Item, 1, MPI_INT, 1, 1, MPI_COMM_WORLD);
        printf("Message Sent: %d\n", message_Item);
    }

    else if(process_Rank == 1){
        MPI_Recv(&message_Item, 1, MPI_INT, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        printf("Message Received: %d\n", message_Item);
    }

    MPI_Finalize();
    return 0;
}

使用 2 个进程编译并运行我们的代码将产生以下输出:

Message Sent: 42
Message Received: 42

Group Operators: Scatter and Gather

组运算符对于 MPI 非常有用。它们允许将大量数据从根进程分发到所有其他可用进程,或者可以在一个进程中收集来自所有进程的数据。这些运算符可以通过使用两个函数来消除对大量样板代码的需求:

  • MPI_Scatter:
void* send_Var;         //Address of the variable that will be scattered.
int send_Count;         //Number of elements that will be scattered.
MPI_Datatype send_Type; //MPI Datatype of the data that is scattered.
void* recv_Var;         //Address of the variable that will store the scattered data.
int recv_Count;         //Number of data elements that will be received per process.
MPI_Datatype recv_Type; //MPI Datatype of the data that will be received.
int root_Process;       //The rank of the process that will scatter the information.
MPI_Comm comm;          //The MPI_Communicator.
  • MPI_Gather:
void* send_Var;         //Address of the variable that will be sent.
int send_Count;         //Number of data elements that will sent .
MPI_Datatype send_Type; //MPI Datatype of the data that is sent.
void* recv_Var;         //Address of the variable that will store the received data.
int recv_Count;         //Number of data elements per process that will be received.
MPI_Datatype recv_Type; //MPI Datatype of the data that will be received.
int root_Process;       //The rank of the process rank that will gather the information.
MPI_Comm comm;          //The MPI_Communicator.

为了更好地掌握这些函数,让我们继续创建一个使用分散函数的程序。请注意,聚集函数(示例中未显示)的工作原理类似,本质上是分散函数的逆函数。使用收集函数的更多示例可以在本文档开头作为资源列出的 MPI 教程中找到。

Example

我们将创建一个程序,将数据数组的一个元素分散到每个进程。具体来说,此代码会将数组的四个元素分散到四个不同的进程。我们将从基本的 C++ 主函数以及用于存储进程等级和进程数量的变量开始。

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Comm;

    return 0;
}

现在让我们使用 MPI_Init 、 MPI_Comm_size 、 MPI_Comm_rank 和

  • MPI_Finaize:
#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Comm;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Comm);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    MPI_Finalize();
    return 0;
}

接下来,我们生成一个名为 distro_Array 的数组来存储四个数字。我们还将创建一个名为分散数据的变量,我们将把数据分散到其中。

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){

    int process_Rank, size_Of_Comm;
    int distro_Array[4] = {39, 72, 129, 42};
    int scattered_Data;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Comm);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    MPI_Finalize();
    return 0;
}

现在我们将开始使用组运算符。我们将使用 scatter 运算符将 distro_Array 分发到分散的_Data 中。让我们看一下该函数中将使用的参数:

MPI_Scatter(
    &distro_Array,      //Address of array we are scattering from.
    1,                  //Number of items we are sending each processor
    MPI_INT,            //MPI Datatype of scattering array.
    &scattered_Data,    //Address of array we are receiving scattered data.
    1,                  //Amount of data each process will receive.
    MPI_INT,            //MPI Datatype of receiver array.
    0,                  //Process ID that will distribute the data.
    MPI_COMM_WORLD      //MPI Communicator.
)

让我们看看它在代码中的实现。我们还将在 scatter 调用之后编写一条 print 语句:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv){
    int process_Rank, size_Of_Comm;
    int distro_Array[4] = {39, 72, 129, 42};
    int scattered_Data;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size_Of_Comm);
    MPI_Comm_rank(MPI_COMM_WORLD, &process_Rank);

    MPI_Scatter(&distro_Array, 1, MPI_INT, &scattered_Data, 1, MPI_INT, 0, MPI_COMM_WORLD);

    printf("Process has received: %d \n", scattered_Data);
MPI_Finalize();
return 0;
}

运行此代码会将发行版数组中的四个数字打印为四个单独的数字,每个数字来自不同的处理器(请注意,排名顺序不一定是连续的):

Process has received: 39
Process has received: 72
Process has received: 129
Process has received: 42

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐