Previous Next Contents

Chapter 5    PVM avancé

Ce chapitre nous offre un petit interlude, afin de développer quelques unes des fonctions avancées de PVM. Celles qui nous paraissent vraiment essentielles sont dans ce chapitre, et concernent la possibilité de recevoir des messages sans bloquer, et la possibilité d'envoyer des paramètres à la création des processus, ce qui permet en particulier d'écrire facilement des programmes parallèles récursifs.

5.1  Messages non-bloquants

On a vu la réception bloquante, Elle peut aussi être non-bloquante ou bloquante avec ``timeout''.

5.1.1  pvm_nrecv

Cette instruction effectue une réception non-bloquante: si un message avec le bon tag est arrivé, le place dans le tampon bufid. Sinon renvoie bufid=0:

int bufid;
bufid=pvm_nrecv(int tid,int msgtag);
Il y a un cas d'Erreur si bufid < 0.

5.1.2  pvm_trecv

Cette instruction effectue une réception bloquante d'un message du processus tid avec le tag msgtag. Si rien n'arrive avant le délai tmout (champs entiers tv_sec et tv_usec) renvoie bufid=0:

int bufid;
bufid=pvm_trecv(int tid,int msgtag,struct timeval *tmout);
Il y a un cas d'Erreur si bufid < 0.

5.1.3  pvm_probe

Cette instruction teste l'arrivée d'un message du processus tid avec le tag msgtag:

int bufid;
bufid=pvm_probe(int tid,int msgtag);
Il y a un cas d'Erreur si bufid < 0.

5.1.4  pvm_bufinfo

Cette instruction donne l'état du tampon de communication bufid: après exécution, bytes contient le nombre d'octets du message éventuellement reçu, msgtag, son tag et tid l'identificateur de tâche de l'expéditeur:

int info;
info=pvm_bufinfo(int bufid,int *bytes,int *msgtag,int *tid);
Il y a un cas d'Erreur si info < 0.

5.1.5  Exemple

On considère le programme maître suivant; il crée un esclave (async2), affiche qu'il est en train de calculer quelque chose, teste régulièrement si son esclave lui a envoyé un message et si tel est le cas sort de sa boucle et termine. C'est ce type de test régulier que l'on appelle ``active polling''.

#include <stdio.h>
#include <pvm3.h>
#define MSG 1

main()
{
int mtid,ctid,bufid,ok,i;
if ((mtid=pvm_mytid())<0) 
{ 
  pvm_exit();
  exit(1); }
 pvm_spawn("async2",(char **) 0,PvmTaskDefault,(char *) 0,1,&ctid);
 fprintf(stdout,"I am spawning one child\n");
 fflush(stdout);
 ok = 0;
 i = -1;
 while (!ok)
   { i++;
   bufid = pvm_nrecv(ctid,MSG);
   if (bufid < 0) { pvm_perror("Pb with nrecv");
   pvm_exit(); exit(1); }
   if (bufid != 0) ok=1;
   else if ((i % 60000) == 0)
     { fprintf(stdout,"I am computing...\n");
     fflush(stdout);
     } }
 pvm_upkint(&i,1,1);
 fprintf(stdout,"I have received %d\n",i);
 fflush(stdout);
 pvm_exit();
 exit(1);
}
L'esclave ``async2'' est très simple:

#include <stdio.h>
#include <pvm3.h>
#define MSG 1

main()
{
int i=42;
sleep(4);
fprintf(stdout,"I'm sending a message to my parent\n");
fflush(stdout);
pvm_initsend(PvmDataDefault);
pvm_pkint(&i,1,1);
pvm_send(pvm_parent(),MSG);
pvm_exit();
exit(1);
}
L'exécution du programme donne:

pvm> spawn -1 -> async1
[11]
1 successful
t40021
pvm> [11:t40021] I am spawning one child
[11:t40021] I am computing...
[11:t40021] I am computing...
[11:t40021] I am computing...
[11:t40021] I am computing...
[11:t40021] I am computing...
[11:t40022] I am sending a message to my parent
[11:t40022] EOF
[11:t40021] I have received 42
[11:t40021] EOF
[11] finished

5.2  Passage d'arguments et processus récursifs

5.2.1  Rappel: Transformée de Fourier Rapide

Soit un signal analogique xa. Supposons xa de carré sommable. La tranformée de Fourier de xa est xa,
x a(s)= ó
õ
¥


-¥
xa(t) e-i s t dt
de transformée inverse,
xa(t)=
1

2p
ó
õ
¥


-¥
xa(s) ei s t ds

Dans le cas des signaux digitaux x avec T=[0,N-1] Ç IN, on a une décomposition semblable en sommes,
x(k)=
1

N
N-1
S
n=0
x(n) e
-
2ip

N
k n
 
de transformée inverse,
x(n)=
N
S
k=0
x (k) e
2ip

N
k n
 

Les fonctions x et xa sont les fonctions en fréquence, correspondant respectivement à x et xa. La transformée de Fourier discrète est très utilisée dans un grand nombre de systèmes de traitement de signaux (après échantillonage et quantification). Le calcul de cette transformée peut se faire en O(N log(N)) opérations au lieu des O(N2) opérations que la définition donnée semble rendre nécessaire. Cet algorithme de calcul est connu sous le nom de tranformée de Fourier rapide (FFT: ``Fast Fourier Transform'').

L'algorithme de transformée de Fourier rapide est basé sur le fait suivant.

Soit (x(n))0 £ n £ N-1 un signal digital avec N=2m. Alors pour k < N/2,
x(k) =
1

N
N-1
S
n=0
x(n) e
-
2 ip

N
k n
 
  =
1

2
æ
ç
ç
ç
è
1

N/2
 
S
n pair
x(n) e
-
2 ip

N
k n
 
+
1

N/2
 
S
n impair
x(n) e
-
2 ip

N
k n
 
ö
÷
÷
÷
ø
  =
1

2
æ
ç
ç
ç
ç
ç
è
1

N/2
N

2
S
n=0
x(2n) e
-
2 ip

N
k 2n
 
+
1

N/2
N

2
S
n=0
x(2n+1) e
-
2 ip

N
k (2n+1)
 
ö
÷
÷
÷
÷
÷
ø
  =
1

2
æ
ç
ç
ç
è
x 1(k) + e
-
2 ip

N
k
 
x2(k) ö
÷
÷
÷
ø
et pour k > N/2,
x(k) =
1

2
æ
ç
ç
ç
è
x 1(k) - e
-
2 ip

N
k
 
x2(k) ö
÷
÷
÷
ø

x1(n)=x(2n) et x2(n)=x(2n+1) sont les échantillonages du signal x aux temps pairs et impairs respectivement.

Pour calculer rapidement la transformée inverse, il suffit de remarquer que la transformée de Fourier inverse d'un signal x est N fois le conjugué de la transformée de Fourier du signal conjugué x*.

Le calcul de la FFT s'effectue donc suivant le paradigme classique ``diviser pour régner''. On a m appels récursifs à la procédure de calcul de la FFT. Il s'ensuit aisément que la complexité de l'algorithme est en moyenne de N m opérations élémentaires.

5.2.2  Version parallèle récursive

Cet algorithme se prête également à des parallélisations efficaces. La procédure de calcul de x, fft(x) est définie de façon récursive. fft(x) lance un processus fils qui se charge de calculer x1 et un autre qui se charge de calculer x2. Une représentation des appels récursifs est donnée dans la figure 5.1. Un programme PVM implémentant cet algorithme est donné plus bas.



Figure 5.1: Arbre de création des processus dans l'exécution de l'algorithme de FFT.

``fft-startup'' est à la racine de l'arbre des appels récursifs:

#include <stdlib.h>
#include <stdio.h>
#include "pvm3.h"

#define RESULT_TAG 1
#define MAX_LENGTH 128

main(argc, argv)
int argc;
char *argv[];
{
/* argv[i] is the unidimensional complex signal to be processed */
/* format is fft Re(x_1) Im(x_1) Re(x_2) ... Re(x_n) Im(x_n) */
int i,bufid,mtid,tid,numt;
float R[MAX_LENGTH];
if ((mtid=pvm_mytid())<0)
 {
  printf("unexpected error occurred!\n");
  exit(1);
 }
if (argc < 5)
 {
  printf("wrong number of parameters\n");
  exit(1);
 }
numt = pvm_spawn("fft",&argv[1],PvmTaskDefault,"",1,&tid);
if (numt != 1)
 {
  printf("could not spawn fft\n");
  exit(1);
 }
bufid = pvm_recv(tid, RESULT_TAG);
pvm_upkfloat(&R[0],argc-1,1);
for (i=0;i<(argc-1)/2;i++)
 {
  printf(" %g %g",R[i],R[i+(argc-1)/2]);
 }
printf("\n"); 
pvm_exit(); 
exit(0); 
}
Le programme récursif ``fft'' se charge du calcul effectif:
#include <math.h>
#include <stdio.h>
#include "pvm3.h"

#define RESULT_TAG 1
#define SIZE 64
#define MAX_LENGTH 128
#define PI 3.14159265358

main(argc, argv)
int argc;
char *argv[];
{
/* argv[i] is the unidimensional complex signal to be processed */
/* format is fft Re(x_1) Im(x_1) Re(x_2) ... Re(x_n) Im(x_n) */
/* sends to his parent an array with RESULT_TAG equal to 1 */
  int i,numt,bufid,mtid,ptid,lstid,rstid;
  float wr,wi;
  float Rr[MAX_LENGTH],Ri[MAX_LENGTH];
  float Xr[MAX_LENGTH];
  float Xi[MAX_LENGTH];
  float T1[MAX_LENGTH],T2[MAX_LENGTH];
  char *fargv1[MAX_LENGTH+1],*fargv2[MAX_LENGTH+1];
  if ((mtid=pvm_mytid())<0)
    {
      printf("unexpected error occurred!\n");
      exit(1);
    }
  ptid=pvm_parent();
  for (i=0;i<(argc-1)/2;i++)
    {
      sscanf(argv[2*i+1],"%g",&Xr[i]);
      sscanf(argv[2*(i+1)],"%g",&Xi[i]);
      if (i%2 == 0)
        {
          fargv1[i]=argv[2*i+1];
          fargv1[i+1]=argv[2*(i+1)];
        }
      else
        {
          fargv2[i-1]=argv[2*i+1];
          fargv2[i]=argv[2*(i+1)];
        }
    }
  fargv1[(argc-1)/2]=NULL;
  fargv2[(argc-1)/2]=NULL;
  if (argc==5) 
    {
      Rr[1]=(Xr[0]-Xr[1])/2;
      Ri[1]=(Xi[0]-Xi[1])/2;
      Rr[0]=(Xr[0]+Xr[1])/2;
      Ri[0]=(Xi[0]+Xi[1])/2;
    }
  else
    {    
      numt = pvm_spawn("fft",&fargv1[0],PvmTaskDefault,"",1,&lstid);
      if (numt != 1)
        {
          printf("could not spawn left son\n");
          exit(1);
        }
      numt = pvm_spawn("fft",&fargv2[0],PvmTaskDefault,"",1,&rstid);
      if (numt != 1)
        {
          printf("could not spawn right son\n");
          exit(1);
        }
      bufid = pvm_recv(lstid, RESULT_TAG);
      pvm_upkfloat(&T1[0],(argc-1)/2,1);
      bufid = pvm_recv(rstid, RESULT_TAG);
      pvm_upkfloat(&T2[0],(argc-1)/2,1);
      for (i=0;i<(argc-1)/4;i++)
        {
          wr=cos(-4*PI*i/(argc-1));
          wi=sin(-4*PI*i/(argc-1));
          Rr[i]=(T1[i]+wr*T2[i]-wi*T2[i+(argc-1)/4])/2;
          Ri[i]=(T1[i+(argc-1)/4]+wr*T2[i+(argc-1)/4]+wi*T2[i])/2;
          Rr[i+(argc-1)/4]=(T1[i]-wr*T2[i]+wi*T2[i+(argc-1)/4])/2;
          Ri[i+(argc-1)/4]=(T1[i+(argc-1)/4]-wr*T2[i+(argc-1)/4]-wi*T2[i])/2;
        }
    }        
  pvm_initsend(PvmDataDefault);
  pvm_pkfloat(&Rr[0],(argc-1)/2,1);
  pvm_pkfloat(&Ri[0],(argc-1)/2,1);
  pvm_send(ptid,RESULT_TAG);
  pvm_exit();
  exit(0);
}


Previous Next Contents