集合通訊(Collective communication)

  • 通信空間中的所有進程都參與通信操作,通信空間中的所有進程都參與通信操作。
  • 每一個進程都需要調用該操作函數,每一個進程都需要調用該操作函式。
    • 分為一對多、多對一、同步。
類別 函式 功能
資料移動 MPI_Bcast 一對多,資料廣播
資料移動 MPI_Gather 多對一,資料匯合
資料移動 MPI_Gatherv MPI_Gather的一般形式
資料移動 MPI_Allgather MPI_Gather的一般形式
資料移動 MPI_Allgatherv MPI_Allgather的一般形式
資料移動 MPI_Scatter 一對多,資料分散
資料移動 MPI_Scatterv MPI_Scatter的一般形式
資料移動 MPI_Alltoall 多對多,置換資料(全交換)
資料移動 MPI_Alltoallv MPI_Alltoall的一般形式
資料聚集 MPI_Reduce 多對一,資料歸約
資料聚集 MPI_AllReduce MPI_Reduce的一般形式,結果會存在於所有行程
資料聚集 MPI_Reduce_scatter 將結果scatter到每個行程
資料聚集 MPI_Scan 前綴操作
同步 MPI_Barrier 同步操作

Brocadcast

  • MPI根據rootID將buffer中的資料傳送給通訊群組comm中的所有行程。
MPI_Bcast(void* buffer,
         int count,
         MPI_Datatype datatype,
         int rootID,
         MPI_Comm comm )
行程0將資料送給其它行程
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mpi.h"

int main(int argc, char **argv){
    char buff[128];
    int secret_num;

    int numprocs;
    int myid;
    int i;

    MPI_Status stat;
    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);
    if ( myid == 0 ){
        secret_num = atoi(argv[1]);
    }

    MPI_Bcast (&secret_num, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if ( myid == 0 ) {
        for( i = 1; i < numprocs; i++) {
            MPI_Recv(buff, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD, &stat);
            printf("buff: %s\n", buff);
        }
    } else {
        sprintf(buff, "Processor %d knows the secret code: %d",
                myid, secret_num);
        MPI_Send(buff, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
    }

    MPI_Finalize();
    return 0;
}

// mpiexec -n 5 ./bcast 10
buff: Processor 1 knows the secret code: 10
buff: Processor 2 knows the secret code: 10
buff: Processor 3 knows the secret code: 10
buff: Processor 4 knows the secret code: 10

Scatter

  • scatter將array中的資料切成多份分給不同行程。

    • sendbuf中放置資料
    • sendcount為資料切成的份量數
  • 注意:

    • sendtype與recvtype應相同
    • sendcount與recvcount應相同
    • 發送的node也會有一份切好的資料在recvbuf中。
    • 接收的node永遠在recvbuf中接收資料。
MPI_Scatter(void* sendbuf,            // Distribute sendbuf evenly to recvbuf
          int sendcount,            // # items sent to EACH processor
          MPI_Datatype sendtype,

          void* recvbuf,
          int recvcount,
          MPI_Datatype recvtype,

          int rootID,           // Sending processor !
          MPI_Comm comm)
行程0將資料切段後送給其它行程
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

int main(int argc, char* argv[]){

    int buff[100];
    int recvbuff[2];
    int numprocs;
    int myid;
    int i, k;
    int mysum;

    MPI_Status stat;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);

    if ( myid == 0 ) {
        printf("We have %d processors\n", numprocs);

        // -----------------------------------------------
        // Node 0 prepare 2 number for each processor
        //     [1][2]  [3][4]  [5][6] .... etc
        // -----------------------------------------------
        k = 1;
        for ( i = 0; i < 2*numprocs; i += 2 ){
            buff[i] = k++;
            buff[i+1] = k++;
        }
    }
     // ------------------------------------------
     // Node 0 scatter the array to the processors:
     // ------------------------------------------
    MPI_Scatter (buff, 2, MPI_INT, recvbuff, 2, MPI_INT, 0, MPI_COMM_WORLD);

    if ( myid == 0 ) {
        mysum = recvbuff[0] + recvbuff[1];
        printf("Processor %d, sum = %d\n", myid, mysum);

        for( i = 1; i < numprocs; i++) {
            MPI_Recv(&mysum, 1, MPI_INT, i, 0, MPI_COMM_WORLD, &stat);
            printf("Processor %d, sum = %d\n", myid, mysum);
        }
    } else {
            mysum = recvbuff[0] + recvbuff[1];
            MPI_Send(&mysum, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return EXIT_SUCCESS;
}

// mpiexec -n 4 ./scatter

We have 4 processors
Processor 0, sum = 3
Processor 0, sum = 7
Processor 0, sum = 11
Processor 0, sum = 15

Gatter

  • 通常與scatter一起使用。

    • sendbuf: the data (must be valid for ALL processors)
    • sendcount: number of items that will be sent to the rootID process
  • the number of items received in recvbuf of the rootID process will be equal to recvcount * numprocs

  • 注意:

    • sendtype與recvtype應相同
    • sendcount與recvcount應相同
MPI_Gather(void* sendbuf,
          int sendcount,
          MPI_Datatype sendtype,

          void* recvbuf,
          int recvcount,
          MPI_Datatype recvtype,

          int rootID,
          MPI_Comm comm)
行程0接收從其它行程回來的資料。
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

int main(int argc, char* argv[]){

    int buff[100] ={};
    int recvbuff[2] = {};
    int numprocs;
    int myid;
    int i, k;
    int mysum;

    MPI_Status stat;

    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD,&myid);

    if ( myid == 0 ) {
        printf("We have %d processors\n", numprocs);

        // -----------------------------------------------
        // Node 0 prepare 2 number for each processor
        //     [1][2]  [3][4]  [5][6] .... etc
        // -----------------------------------------------
        k = 1;
        for ( i = 0; i < 2*numprocs; i += 2 ){
            buff[i] = k++;
            buff[i+1] = k++;
        }
    }

    // ------------------------------------------
    // Node 0 scatter the array to the processors:
    // ------------------------------------------
    MPI_Scatter (buff, 2, MPI_INT, recvbuff, 2, MPI_INT, 0, MPI_COMM_WORLD);
    mysum = recvbuff[0] + recvbuff[1];   // Everyone calculate sum

    // ------------------------------------------
    // Node 0 collects the results in "buff":
    // ------------------------------------------
    MPI_Gather (&mysum, 1, MPI_INT, &buff, 1, MPI_INT, 0, MPI_COMM_WORLD);

    if ( myid == 0 ) {
        for( i= 0; i < numprocs; i++) {
            printf("Processor %d, sum = %d\n", i, buff[i]);
        }
    }
    MPI_Finalize();
    return EXIT_SUCCESS;
}

// mpiexec -n 4 ./gather
We have 4 processors
Processor 0, sum = 3
Processor 1, sum = 7
Processor 2, sum = 11
Processor 3, sum = 15

Reduce

  • gather在把資料收集合回來後,仍然需要再計算才能得到最後的結果。
  • 使用reduce可以一邊gather同時計算。

    • sendbuf: the data (must be valid for ALL processors)
    • recvbuf: buffer used to receiving data and simultaneously perform the "reducing operation".
    • recvcount: number of items to receive (per processor)
  • 注意:

    • sendtype與recvtype應相同
    • sendcount與recvcount應相同
MPI_Reduce(void* sendbuf,

          void* recvbuf,
          int recvcount,
          MPI_Datatype recvtype,
          MPI_Op op,    // MPI支援的operator

          int rootID,
          MPI_Comm comm)
行程0接收從其它行程回來的資料,同時計算。
  • MPI支援的reduction operator如下。
Opeartion 說明
MPI_MAX 找最大值
MPI_MIN 找最小值
MPI_PROD 所有元素相乘
MPI_LAND 所有元素做logical and
MPI_BAND 所有元素做bitwise and
MPI_LOR 所有元素做logical or
MPI_BOR 所有元素做bitwise or
MPI_LXOR 所有元素做logical xor
MPI_BXOR 所有元素做bitwise xor
MPI_MAXLOC 找最大值與擁有此值的process rank
MPI_MINLOC 找最小值與擁有此值的process rank
// pi.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"

double func(double a){
    return (2.0/sqrt(1 - a*a));
}

int main(int argc, char* argv[]){
    int N = 0;
    double w, x;
    int i, myid;
    double mypi, final_pi;
    int num_procs = 0;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    if(myid == 0)
        N = atoi(argv[1]);

    MPI_Bcast(&N, 1, MPI_INT, 0, MPI_COMM_WORLD);

    w = 1.0/(double)N;

    mypi = 0.0;
    for(i=myid; i<N; i+=num_procs){
        x = w * (i + 0.5);
        mypi += w*func(x);
    }

    MPI_Reduce(&mypi, &final_pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

    if(myid == 0)
        printf("Pi = %f\n", final_pi);

    MPI_Finalize();
    return EXIT_SUCCESS;
}

//mpiexec -n 10 ./pi 1000000
Pi = 3.140737s

自訂reduce function

  • MPI_Op_create的參數

    • function: the function that you want to use to perform the reduction operation
    • commute (交換性): set this parameter to 0 (zero) if the operation does NOT commute, otherwise (if "a op b" = "b op a"), set it to 1 (ONE)
    • op: return value: the operation handle that you need to pass to MPI_Reduce().
  • 自訂函數的參數

    • in: pointer to the first operand
    • inout: pointer to the second operand
    • len: number of elements in in and inout
    • datatype: the type of data stored in in and inout
MPI_Op_create(MPI_User_function *function,
              int commute,
              MPI_Op *op)


void function_name( void *in,
             void *inout,
            int *len,
            MPI_Datatype *datatype);
// pi2.c

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"

void myAdd( void *a, void *b, int *len, MPI_Datatype *datatype){
     int i;

     if ( *datatype == MPI_INT )
     {
        int *x = (int *)a;    // Turn the (void *) into an (int *)
        int *y = (int *)b;    // Turn the (void *) into an (int *)

        for (i = 0; i < *len; i++)
        {
           *y = *x + *y;
           x++;
           y++;
        }
     }
     else if ( *datatype == MPI_DOUBLE )
     {
        double *x = (double *)a;    // Turn the (void *) into an (double *)
        double *y = (double *)b;    // Turn the (void *) into an (double *)

        for (i = 0; i < *len; i++)
        {
           *y = *x + *y;
           x++;
           y++;
        }
     }
  }

double func(double a){
    return (2.0/sqrt(1 - a*a));
}

int main(int argc, char* argv[]){
    int N = 0;
    double w, x;
    int i, myid;
    double mypi, final_pi;
    int num_procs = 0;
    MPI_Op myOp;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);

    if(myid == 0)
        N = atoi(argv[1]);

    MPI_Bcast(&N, 1, MPI_INT, 0, MPI_COMM_WORLD);

    w = 1.0/(double)N;

    mypi = 0.0;
    for(i=myid; i<N; i+=num_procs){
        x = w * (i + 0.5);
        mypi += w*func(x);
    }

    MPI_Op_create( myAdd, 1, &myOp);
    MPI_Reduce(&mypi, &final_pi, 1, MPI_DOUBLE, myOp, 0, MPI_COMM_WORLD);

    if(myid == 0)
        printf("Pi = %f\n", final_pi);

    MPI_Finalize();
    return EXIT_SUCCESS;
}

Barrier

  • 在設置Barrier的地方,所有的行程都會停在這一點直到所有的行程均已執行到此處。
MPI_Barrier( MPI_Comm comm )

參考資料

results matching ""

    No results matching ""