MPI矩阵乘法的两种实现方法
MPI矩阵乘法去年学习了并行计算,接触了MPI、Pthreads和OpenMP等常用的并行方法实现了并行的矩阵乘法,本章在此总结一下MPI的矩阵乘法使用。使用简单的MPI_Send和MPI_Recv实现使用较高级的MPI_Scatter和MPI_Gather实现MPI_Send和MPI_Recv实现#include<stdio.h>#include<stdlib.h>#include<mp
·
MPI矩阵乘法
去年学习了并行计算,接触了MPI、Pthreads和OpenMP等常用的并行方法实现了并行的矩阵乘法,本章在此总结一下MPI的矩阵乘法使用。
- 使用简单的MPI_Send和MPI_Recv实现
- 使用较高级的MPI_Scatter和MPI_Gather实现
MPI_Send和MPI_Recv实现
#include<stdio.h>
#include<stdlib.h>
#include<mpi.h>
#include<time.h>
int main(int argc,char *argv[])
{
double start, stop;
int i, j, k, l;
int *a, *b, *c, *buffer, *ans;
int size = 1000;
int rank, numprocs, line;
MPI_Init(NULL,NULL);//MPI Initialize
MPI_Comm_rank(MPI_COMM_WORLD,&rank);//获得当前进程号
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);//获得进程个数
line = size/numprocs;//将数据分为(进程数)个块,主进程也要处理数据
a = (int*)malloc(sizeof(int)*size*size);
b = (int*)malloc(sizeof(int)*size*size);
c = (int*)malloc(sizeof(int)*size*size);
//缓存大小大于等于要处理的数据大小,大于时只需关注实际数据那部分
buffer = (int*)malloc(sizeof(int)*size*line);//数据分组大小
ans = (int*)malloc(sizeof(int)*size*line);//保存数据块计算的结果
//主进程对矩阵赋初值,并将矩阵N广播到各进程,将矩阵M分组广播到各进程
if (rank==0)
{
//从文件中读入矩阵
FILE *fp;
fp=fopen("a.txt","r");//打开文件
start = MPI_Wtime();
for(i=0;i<1000;i++) //读数据
for(j=0;j<1000;j++)
fscanf(fp,"%d",&a[i*size+j]);
fclose(fp);//关闭文件
fp=fopen("b.txt","r");
for(i=0;i<1000;i++)
for(j=0;j<1000;j++)
fscanf(fp,"%d",&b[i*size+j]);
fclose(fp);
//将矩阵N发送给其他从进程
for (i=1;i<numprocs;i++)
{
MPI_Send(b,size*size,MPI_INT,i,0,MPI_COMM_WORLD);
}
//依次将a的各行发送给各从进程
for (l=1; l<numprocs; l++)
{
MPI_Send(a+(l-1)*line*size,size*line,MPI_INT,l,1,MPI_COMM_WORLD);
}
//接收从进程计算的结果
for (k=1;k<numprocs;k++)
{
MPI_Recv(ans,line*size,MPI_INT,k,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
//将结果传递给数组c
for (i=0;i<line;i++)
{
for (j=0;j<size;j++)
{
c[((k-1)*line+i)*size+j] = ans[i*size+j];
}
}
}
//计算a剩下的数据
for (i=(numprocs-1)*line;i<size;i++)
{
for (j=0;j<size;j++)
{
int temp=0;
for (k=0;k<size;k++)
temp += a[i*size+k]*b[k*size+j];
c[i*size+j] = temp;
}
}
fp=fopen("c.txt","w");
for(i=0; i<size; i++){
for(j=0; j<size; j++)
fprintf(fp,"%d ",c[i*size+j]);
fputc('\n',fp);
}
fclose(fp);
//结果测试
//统计时间
stop = MPI_Wtime();
printf("rank:%d time:%lfs\n",rank,stop-start);
free(a);
free(b);
free(c);
free(buffer);
free(ans);
}
//其他进程接收数据,计算结果后,发送给主进程
else
{
//接收广播的数据(矩阵b)
MPI_Recv(b,size*size,MPI_INT,0,0,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
MPI_Recv(buffer,size*line,MPI_INT,0,1,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
//计算乘积结果,并将结果发送给主进程
for (i=0;i<line;i++)
{
for (j=0;j<size;j++)
{
int temp=0;
for(k=0;k<size;k++)
temp += buffer[i*size+k]*b[k*size+j];
ans[i*size+j]=temp;
}
}
//将计算结果传送给主进程
MPI_Send(ans,line*size,MPI_INT,0,3,MPI_COMM_WORLD);
}
MPI_Finalize();//结束
return 0;
}
MPI_Scatter和MPI_Gather实现
#include<stdio.h>
#include<mpi.h>
#include <malloc.h>
#define M 1000
#define N 1000
int main()
{
int my_rank;/*My process rank*/
int comm_sz;/*Number of processes*/
int local_M;
int i,j,k;
double start,finish;/*timer*/
int tem;
//初始化MPI
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
MPI_Comm_size(MPI_COMM_WORLD,&comm_sz);
//每个矩阵分配到的行数
local_M=M/comm_sz;
//分配到每个进程的矩阵
int *local_Matrix_one=(int*)malloc(local_M*N*sizeof(int));
//定义两个矩阵
int *Matrix_one=NULL;
int *Matrix_two=(int*)malloc(M*N*sizeof(int));
//每个进程里的结果矩阵
int *local_result=(int*)malloc(local_M*N*sizeof(int));
//结果矩阵
int *result_Matrix=NULL;
if(my_rank==0)
{
//printf("process %d of %d\n",my_rank,comm_sz);
FILE * fp;
//读取第一个矩阵
Matrix_one=(int*)malloc(M*N*sizeof(int));
//Matrix_one[M][N]={0};
fp=fopen("a.txt","r");
for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
fscanf(fp,"%d ",&Matrix_one[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
printf("%d ",Matrix_one[i*N+j]);
printf("\n");
}*/
//读取第二个矩阵
start=MPI_Wtime();
fp=fopen("b.txt","r");
for(j=0;j<N;j++)
{
for(i=0;i<M;i++)
fscanf(fp,"%d ",&Matrix_two[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(j=0;j<N;j++)
{
for(i=0;i<M;i++)
printf("%d ",Matrix_two[i*M+j]);
printf("\n");
}*/
//数据分发
MPI_Scatter(Matrix_one,local_M*N,MPI_INT,local_Matrix_one,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
//数据广播
MPI_Bcast(Matrix_two,M*N,MPI_INT,0,MPI_COMM_WORLD);
//计算local结果
for(i=0;i<local_M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=local_Matrix_one[i*M+k]*Matrix_two[j*M+k];
local_result[i*M+j]=tem;
}
free(local_Matrix_one);
result_Matrix=(int*)malloc(M*N*sizeof(int));
//结果聚集
MPI_Gather(local_result,local_M*N,MPI_INT,result_Matrix,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
//剩余行处理(处理不能整除的情况)
int rest=M%comm_sz;
if(rest!=0)
for(i=M-rest-1;i<M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=Matrix_one[i*M+k]*Matrix_two[j*M+k];
result_Matrix[i*M+j]=tem;
}
finish=MPI_Wtime();
free(Matrix_one);
free(Matrix_two);
free(local_result);
printf("Proc %d > Elapsed time = %e seconds\n",my_rank,finish-start);
//将结果写入文件
fp=fopen("c.txt","w");
for(i=0;i<M;i++)
{
for(j=0;j<N;j++)
fprintf(fp,"%d ",result_Matrix[i*N+j]);
fscanf(fp,"\n");
}
fclose(fp);
/*for(i=0;i<local_M;i++)
{
for(j=0;j<N;j++)
printf("%d ",local_result[i*N+j]);
printf("\n");
}*/
}
else{
//printf("process %d of %d\n",my_rank,comm_sz);
//数据分发
MPI_Scatter(Matrix_one,local_M*N,MPI_INT,local_Matrix_one,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
//数据广播
MPI_Bcast(Matrix_two,M*N,MPI_INT,0,MPI_COMM_WORLD);
//计算local结果
for(i=0;i<local_M;i++)
for(j=0;j<M;j++){
tem=0;
for(k=0;k<N;k++)
tem +=local_Matrix_one[i*M+k]*Matrix_two[j*M+k];
local_result[i*M+j]=tem;
}
free(local_Matrix_one);
free(Matrix_two);
//结果聚集
MPI_Gather(local_result,local_M*N,MPI_INT,result_Matrix,local_M*N,MPI_INT,0,MPI_COMM_WORLD);
free(local_result);
//printf("%d %d\n",local_M,my_rank);
}
MPI_Finalize();
return 0;
}
结果分析
① 执行时间分析:
并行时,随着进程数目的增多,并行计算的时间越来越短;当达到一定的进程数时,执行时间小到最小值;然后再随着进程数的增多,执行时间反而越来越长。
② 加速比分析:
随着进程数的增大,加速比也是逐渐增大到最大值;再随着进程数的增大,加速比逐渐减小。
③ 执行效率分析:
随着进程数的增大,程序执行效率不断降低
④ 原因分析:
MPI并行程序的测试平台为Intel Core i5 CPU,为双核CPU,即在一个处理器上集成两个运算核心,提高了运算效率,因此会比串行的执行时间要短。由于一个进程只能在一个核上执行,因此只能有两个进程并行执行,又因为多进程运行在两个CPU上,会有进程切换等操作,所以才会出现进程数增加而执行时间增加的情况。
更多推荐
已为社区贡献1条内容
所有评论(0)