MPI矩阵乘法

去年学习了并行计算,接触了MPI、Pthreads和OpenMP等常用的并行方法实现了并行的矩阵乘法,本章在此总结一下MPI的矩阵乘法使用。

  • 使用简单的MPI_Send和MPI_Recv实现
  • 使用较高级的MPI_Scatter和MPI_Gather实现

MPI_Send和MPI_Recv实现

#include<stdio.h>
#include<stdlib.h>
#include<mpi.h>
#include<time.h>

int main(int argc,char *argv[])
{
    double start, stop;
    int i, j, k, l;
    int *a, *b, *c, *buffer, *ans;
    int size = 1000;
    int rank, numprocs, line;

    MPI_Init(NULL,NULL);//MPI Initialize
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);//获得当前进程号
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);//获得进程个数

    line = size/numprocs;//将数据分为(进程数)个块,主进程也要处理数据
    a = (int*)malloc(sizeof(int)*size*size);
    b = (int*)malloc(sizeof(int)*size*size);
    c = (int*)malloc(sizeof(int)*size*size);
    //缓存大小大于等于要处理的数据大小,大于时只需关注实际数据那部分
    buffer = (int*)malloc(sizeof(int)*size*line);//数据分组大小
    ans = (int*)malloc(sizeof(int)*size*line);//保存数据块计算的结果

    //主进程对矩阵赋初值,并将矩阵N广播到各进程,将矩阵M分组广播到各进程
    if (rank==0)
    {
        //从文件中读入矩阵
        FILE *fp;

        fp=fopen("a.txt","r");//打开文件
        start = MPI_Wtime();
        for(i=0;i<1000;i++) //读数据
            for(j=0;j<1000;j++)
                fscanf(fp,"%d",&a[i*size+j]);  
        fclose(fp);//关闭文件

        fp=fopen("b.txt","r");

        for(i=0;i<1000;i++) 
            for(j=0;j<1000;j++)
                fscanf(fp,"%d",&b[i*size+j]);  
        fclose(fp);
        //将矩阵N发送给其他从进程
        for (i=1;i<numprocs;i++)
        {
                MPI_Send(b,size*size,MPI_INT,i,0,MPI_COMM_WORLD);
        }
        //依次将a的各行发送给各从进程
        for (l=1; l<numprocs; l++)
        {
            MPI_Send(a+(l-1)*line*size,size*line,MPI_INT,l,1,MPI_COMM_WORLD);
        }
        //接收从进程计算的结果
        for (k=1;k<numprocs;k++)
        {
            MPI_Recv(ans,line*size,MPI_INT,k,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
            //将结果传递给数组c
            for (i=0;i<line;i++)
            {
                for (j=0;j<size;j++)
                {
                    c[((k-1)*line+i)*size+j] = ans[i*size+j];
                }

            }
        }
        //计算a剩下的数据
        for (i=(numprocs-1)*line;i<size;i++)
        {
            for (j=0;j<size;j++)
            {
                int temp=0;
                for (k=0;k<size;k++)
                    temp += a[i*size+k]*b[k*size+j];
                c[i*size+j] = temp;
            }
        }

        fp=fopen("c.txt","w");
        for(i=0; i<size; i++){
            for(j=0; j<size; j++)
                fprintf(fp,"%d ",c[i*size+j]);
            fputc('\n',fp);
        }
        fclose(fp);
        //结果测试
        //统计时间
        stop = MPI_Wtime();

        printf("rank:%d time:%lfs\n",rank,stop-start); 

        free(a);
        free(b);
        free(c);
        free(buffer);
        free(ans);
    }

    //其他进程接收数据,计算结果后,发送给主进程
    else
    {
        //接收广播的数据(矩阵b)
        MPI_Recv(b,size*size,MPI_INT,0,0,MPI_COMM_WORLD,MPI_STATUS_IGNORE);

        MPI_Recv(buffer,size*line,MPI_INT,0,1,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        //计算乘积结果,并将结果发送给主进程
        for (i=0;i<line;i++)
        {
            for (j=0;j<size;j++)
            {
                int temp=0;
                for(k=0;k<size;k++)
                    temp += buffer[i*size+k]*b[k*size+j];
                ans[i*size+j]=temp;
            }
        }
        //将计算结果传送给主进程
        MPI_Send(ans,line*size,MPI_INT,0,3,MPI_COMM_WORLD);
    }

    MPI_Finalize();//结束

    return 0;
}

MPI_Scatter和MPI_Gather实现

#include<stdio.h>
#include<mpi.h>
#include <malloc.h>
#define M 1000
#define N 1000
int main()
{
int my_rank;/*My process rank*/
int comm_sz;/*Number of processes*/
int local_M;
int i,j,k;
double start,finish;/*timer*/
int tem;
//初始化MPI
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
//每个矩阵分配到的行数
local_M=M/comm_sz;
//分配到每个进程的矩阵
int *local_Matrix_one=(int*)malloc(local_M*N*sizeof(int));
//定义两个矩阵
int *Matrix_one=NULL;
int *Matrix_two=(int*)malloc(M*N*sizeof(int));
//每个进程里的结果矩阵
int *local_result=(int*)malloc(local_M*N*sizeof(int));
//结果矩阵
int *result_Matrix=NULL;
if(my_rank==0)
{
//printf("process %d of %d\n",my_rank,comm_sz);
FILE * fp;
//读取第一个矩阵
Matrix_one=(int*)malloc(M*N*sizeof(int));
//Matrix_one[M][N]={0};
fp=fopen("a.txt","r");
for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
fscanf(fp,"%d ",&Matrix_one[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
   printf("%d ",Matrix_one[i*N+j]);
   printf("\n");
}*/
//读取第二个矩阵
start=MPI_Wtime();
fp=fopen("b.txt","r");
for(j=0;j<N;j++)
{
for(i=0;i<M;i++)
fscanf(fp,"%d ",&Matrix_two[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(j=0;j<N;j++)
{
for(i=0;i<M;i++)
   printf("%d ",Matrix_two[i*M+j]);
   printf("\n");
}*/
//数据分发
MPI_Scatter(Matrix_one,local_M*N,MPI_INT,local_Matrix_one,local_M*N,MPI_INT,0,MPI_COMM_WORLD);

//数据广播
MPI_Bcast(Matrix_two,M*N,MPI_INT,0,MPI_COMM_WORLD);
//计算local结果
for(i=0;i<local_M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=local_Matrix_one[i*M+k]*Matrix_two[j*M+k];
local_result[i*M+j]=tem;
}
free(local_Matrix_one);
result_Matrix=(int*)malloc(M*N*sizeof(int));
//结果聚集
MPI_Gather(local_result,local_M*N,MPI_INT,result_Matrix,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
//剩余行处理(处理不能整除的情况)
int rest=M%comm_sz;
if(rest!=0)
for(i=M-rest-1;i<M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=Matrix_one[i*M+k]*Matrix_two[j*M+k];
result_Matrix[i*M+j]=tem;
}
finish=MPI_Wtime();
free(Matrix_one);
free(Matrix_two);
free(local_result);

printf("Proc %d > Elapsed time = %e seconds\n",my_rank,finish-start);
//将结果写入文件
fp=fopen("c.txt","w");
for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
fprintf(fp,"%d ",result_Matrix[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(i=0;i<local_M;i++)
{
for(j=0;j<N;j++)
   printf("%d ",local_result[i*N+j]);
   printf("\n");
}*/
}
else{
//printf("process %d of %d\n",my_rank,comm_sz);
//数据分发
MPI_Scatter(Matrix_one,local_M*N,MPI_INT,local_Matrix_one,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
//数据广播
MPI_Bcast(Matrix_two,M*N,MPI_INT,0,MPI_COMM_WORLD);
//计算local结果
for(i=0;i<local_M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=local_Matrix_one[i*M+k]*Matrix_two[j*M+k];
local_result[i*M+j]=tem;
}
free(local_Matrix_one);
free(Matrix_two);
//结果聚集
MPI_Gather(local_result,local_M*N,MPI_INT,result_Matrix,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
free(local_result);
//printf("%d %d\n",local_M,my_rank);
}
MPI_Finalize();
return 0;
}

结果分析


① 执行时间分析:
并行时,随着进程数目的增多,并行计算的时间越来越短;当达到一定的进程数时,执行时间小到最小值;然后再随着进程数的增多,执行时间反而越来越长。
② 加速比分析:
随着进程数的增大,加速比也是逐渐增大到最大值;再随着进程数的增大,加速比逐渐减小。
③ 执行效率分析:
随着进程数的增大,程序执行效率不断降低
④ 原因分析:
MPI并行程序的测试平台为Intel Core i5 CPU,为双核CPU,即在一个处理器上集成两个运算核心,提高了运算效率,因此会比串行的执行时间要短。由于一个进程只能在一个核上执行,因此只能有两个进程并行执行,又因为多进程运行在两个CPU上,会有进程切换等操作,所以才会出现进程数增加而执行时间增加的情况。

Logo

汇聚原天河团队并行计算工程师、中科院计算所专家以及头部AI名企HPC专家,助力解决“卡脖子”问题

更多推荐