学习笔记之MPI通信
包括mpi的简述、常用语句、通信的小例子、一些Bug
一、MPI简述
MPI是一个跨语言的通讯协议,用于编写并行计算机。支持点对点和广播。MPI是一个信息传递应用程序接口,包括协议和和语义说明,他们指明其如何在各种实现中发挥其特性。
阻塞通信,
二、常用语句
1、初始化程序
这两个参数目前并没有什么用

MPI_Init(int* argc,char*** argv);

一般情况下这样使用

MPI_Init(NULL, NULL);

2、定点发送和接收
发送定义和函数:

MPI_Send(
    void* data,								// 数据头部地址
    int count,								// 数据个数(注意不是bit数)
    MPI_Datatype datatype,	            	// 数据类型
    int destination,					     // 目的线程号rank
    int tag,							     // 标签,此send的标示,只有recv的tag一致才能收到
    MPI_Comm communicator)                   //通信域

接收定义和函数:

MPI_Recv(
    void* data,								// 数据头部地址
    int count,								// 要传输的数据个数
    MPI_Datatype datatype,		            // 数据类型
    int source,								// 从哪个进程接收的进程号
    int tag,								// 数据标示,和send的tag要一致,才能接收
    MPI_Comm communicator,                  //通信域
    MPI_Status* status)				        // 数据状态

一些预先定义的MPI数据类型

MPI数据类型C语言数据类型
MPI_CHARsigned char
MPI_INTsigned int
MPI_SHORTshort int
MPI_LONGlong int
MPI_LONG_LONGlong long int
MPI_UNSIGNED_CHARunsigned char
MPI_UNSIGNED_SHORTunsigned short int
MPI_UNSIGNEDunsigned int
MPI_UNSIGNED_LONGunsigned long int
MPI_UNSIGNED_LONG_LONGunsigned long long int
MPI_FLOATfloat
MPI_DOUBLEdouble
MPI_LONG_DOUBLElong double

3、把数据发出去要收回来,聚合数据函数:

MPI_Gather(
    void* send_data,							// 节点聚合数据的首地址
    int send_count,								// 每个节点向根节点发送数据个数
    MPI_Datatype send_datatype,		            // 子节点发送数据类型
    void* recv_data,							// 根节点接受数据数组首地址
    int recv_count,								// 根节点从其他节点收到的数据个数(注意不是总个数)
    MPI_Datatype recv_datatype,		           // 根节点接受数据类型
    int root,									// 根节点rank值
    MPI_Comm communicator)

4、广播函数
在集体通信中我们经常用到同步点,也就是需要所有的节点都运行到同一个位置然后再继续运行,当然我们可以通过Recv方法来实现这一点,但是更便捷并且更高效的方式是调用下面的函数:

MPI_Bcast(
    void* data,							// 要广播的数据起始地址
    int count,							// 要广播的数据个数
    MPI_Datatype datatype,	            // 要广播的数据类型
    int root,							// 广播发起者
    MPI_Comm communicator)

三、实例
1、C++环境下编译执行的命令
编译:mpic++ -o csdn csdn_test.cpp
csdn指编译产生的可执行文件,csdn_test.cpp指要编译的cpp
运行:mpirun -n 3 ./csdn
2、一个简单的数据发送接收小例子

#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main()        
{
    int numprocs, myid, source;
    MPI_Status status;
    char message[100];
    MPI_Init(NULL, NULL);//初始化
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);//进程号
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);//总进程数
    if (myid != 0) {  //非0号进程发送消息
        strcpy(message, "Hello World!");
        MPI_Send(message, strlen(message) + 1, MPI_CHAR, 0, 99,
            MPI_COMM_WORLD);
    }
    else {   // myid == 0,即0号进程接收消息
        for (source = 1; source < numprocs; source++) {//从1号进程开始接收数据
            MPI_Recv(message, 100, MPI_CHAR, source, 99,
                MPI_COMM_WORLD, &status);
            printf("接收到第%d号进程发送的消息:%s\n", source, message);
        }
    }
    MPI_Finalize();
    return 0;
}

运行结果:

接收到第1号进程发送的消息:Hello World!
接收到第2号进程发送的消息:Hello World!

2、mpi传输的只能是整型或者是字符型
如果要传输vector,就需要先转成字符串,再进行发送,接收后再解码
下面通过Json来进行字符串的转换
例子如下:

#include <iostream>
#include <string>
#include <cmath>
#include <vector>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
#include "xxxxxx/json.hpp"//这里是json头文件
using namespace std;
int sum(int a)//一个简单函数,
{
    return a + 2;
}
vector<int> array_m(int n, int sum)//对所有要计算的数,进行排列组合,然后给每个进程分发该进程要计算的数
{
    vector<int> res(n, sum / n);
    for (int i = 0; i < sum % n; i++)
    {
        res[i]++;
    }
    return res;
}
int main()
{
    int numprocs, myid, source;
    MPI_Status status;
    MPI_Init(NULL, NULL);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Barrier(MPI_COMM_WORLD);

    vector<int> test1 = {1, 2, 3};
    vector<int> test2 = {4, 5, 6};
    vector<int> test3 = {7, 8, 9};
    vector<vector<int>> test;
    test.push_back(test1);
    test.push_back(test2);
    test.push_back(test3);
    vector<int> cur = test1;
    nlohmann::json json{{"zzyzzy", cur}};
    string max_str = json.dump();
    char max_buf [max_str.size()];
    int max_length = max_str.size();
    vector<vector<int>> ans;
    int flag = 0;
    map<int, int> maptest;
    // maptest[1] = 11;
    // maptest[2] = 22;
    // maptest[3] = 33;
    while (flag < 3)
    {
        if (myid == 0)
        {
            vector<vector<int>> dispart;
            if (test[flag].size() < numprocs)
            {
                for (int i = 0; i < test[flag].size(); i++)
                {
                    vector<int> temp;
                    temp.push_back(test[flag][i]);
                    dispart.push_back(temp);
                }
            }
            else
            {
                vector<int> arr = array_m(numprocs, test[flag].size());
                int idx = 0;
                for (int i = 0; i < arr.size(); i++)
                {
                    vector<int> temp;
                    for (int j = 0; j < arr[i]; j++)
                    {
                        temp.push_back(test[flag][idx]);
                        idx++;
                    }
                    dispart.push_back(temp);
                }
            }
            if (test[flag].size() >= numprocs)
            {
                vector<int> temp2;
                for (auto j : dispart[dispart.size() - 1])
                {
                    temp2.push_back(sum(j));
                }
                ans.push_back(temp2);
            }
            for (int i = 0; i < numprocs - 1; i++) //??????
            {
                nlohmann::json json{{"zzy", dispart[i]}};
                string str = json.dump();
                char max_buffer[str.size()];
                strcpy(max_buffer, str.c_str());
                MPI_Send(max_buffer, strlen(max_buffer) + 1, MPI_CHAR, i + 1, 90, MPI_COMM_WORLD);
            }
            for (int source = 1; source < numprocs; source++)
            {
                // cout << "zzyrecv2" << endl;
                int len2 = 0;
                MPI_Recv(&len2, 1, MPI_INT, source, 95, MPI_COMM_WORLD, &status);
                char buf2[len2];
                MPI_Recv(buf2, len2, MPI_CHAR, source, 99, MPI_COMM_WORLD, &status); //nums就是所有频率的长度的数组,freq_list就是所有频率的字符串的长度
                // cout << "zzyrecv2fini" << endl;
                nlohmann::json new_json = (nlohmann::json::parse(buf2));
                vector<int> new_str = new_json["zzy"];
                ans.push_back(new_str);
            }
            maptest[flag] = flag * 10;
            // tmp[0] = 'a' + flag;
            // MPI_Send(tmp, 20, MPI_CHAR, 1, 0, MPI_COMM_WORLD);
        }

        else
        {
            nlohmann::json json{{"zzyzzy", cur}};
            string max_str = json.dump();
            char max_buf[max_str.size()];
            int max_length = max_str.size();
            // cout << "zzy0_recive" << endl;
            MPI_Recv(max_buf, max_length, MPI_CHAR, 0, 90, MPI_COMM_WORLD, &status);
            nlohmann::json new_json = (nlohmann::json::parse(max_buf));
            vector<int> new_str = new_json["zzy"];
            vector<int> temp;
            for (auto j : new_str)
            {
                temp.push_back(sum(j));
            }

            nlohmann::json json1{{"zzy", temp}};
            string str1 = json1.dump();
            int len1 = str1.size() + 1;
            char buf1[len1];
            strcpy(buf1, str1.c_str());
            MPI_Send(&len1, 1, MPI_INT, 0, 95, MPI_COMM_WORLD);
            MPI_Send(buf1, len1, MPI_CHAR, 0, 99, MPI_COMM_WORLD);
            // MPI_Recv(tmp, 20, MPI_CHAR, 0, 0, MPI_COMM_WORLD, &status);
            // printf("I received: %s\n", tmp);
        }
        flag++;
    }
    //
    /*if (myid == 0)
    {
        for (int i = 1; i < numprocs; i++) //??????
        {
            nlohmann::json json_ans{{"zzy", ans}};
            string str_ans = json_ans.dump();
            char max_buffer_ans[str_ans.size()];
            int len = str_ans.size() + 1;
            strcpy(max_buffer_ans, str_ans.c_str());
            MPI_Send(&len, 1, MPI_INT, i, 50, MPI_COMM_WORLD);
            MPI_Send(max_buffer_ans, len, MPI_CHAR, i, 51, MPI_COMM_WORLD);
        }
    }
    else
    {

        // for (int source = 1; source < numprocs; source++)
        // {
        int len2 = 0;
        cout << "zzy" << endl;
        MPI_Recv(&len2, 1, MPI_INT, 0, 50, MPI_COMM_WORLD, &status);
        cout << "zzyrecv2fini" << endl;
        char buf2[len2];
        MPI_Recv(buf2, len2, MPI_CHAR, 0, 51, MPI_COMM_WORLD, &status); 

        nlohmann::json new_json = (nlohmann::json::parse(buf2));
        vector<vector<int>> new_str = new_json["zzy"];
        cout << new_str.size() << endl;
        ans = new_str;
        // }
    }*/
    if (myid == 1)
    {
        for (auto i : ans)
        {
            for (auto j : i)
            {
                cout << j << " ";
            }
            cout << endl;
        }
    }
    return 0;
}

3、循环发送接收
上面程序是一个3层循环的收发例子;
3进程运行上面程序后,没有输出结果;
原因是:结果只存储在0号进程,但是1号和2号进程没有存储结果,只是计算的作用;
4、怎么解决这个问题
①可以利用广播函数,把0号进程中的结果发送到其他进程当中
②如上面注释掉的内容,通过发送接收把0号进程的结果发送到其他进程

5、自我理解
mpi并行,就是几个进程同时工作;比如开4个进程
这四个进程同时工作,内存不共享,即互不影响,
一般情况下会把0号进程作为主进程来控制其他进程;
**工作模式一、**mpi分布在整个流程的中间
上部分→mpi→下部分
在这里插入图片描述

注意:此时只有0号进程有数据,其他进程都没有数据,需要同步(上面例子注释部分)
**工作模式二、**mpi包括整个流程
mpi→上下部分
在这里插入图片描述

Logo

汇聚原天河团队并行计算工程师、中科院计算所专家以及头部AI名企HPC专家,助力解决“卡脖子”问题

更多推荐