Dans ce TD, nous allons écrire et expérimenter successivement plusieurs versions de files d'attente partagées. Cela sera utilisé pour développer une architecture de calcul de type Task Graph où un certain nombre de nœuds spécialisés réalisent chacun une partie du calcul voulu. Les données sont alors transmises de nœuds en nœuds par l'envoi de messages. Les nœuds sont organisés de manière statique pour former un graphe acyclique orienté dont les arcs déterminent la relation source - destinataire pour l'envoi des messages.
Pour permettre un fonctionnement asynchrone, chaque nœud comporte en entrée une file d'attente. Envoyer des données à un nœud consiste donc à ajouter des messages contenant ces données dans sa file d'attente. La tâche d'un nœud consiste à prendre le premier message dans sa file d'attente, réaliser un certain traitement sur les données contenues dans ce message et envoyer son résultat aux nœuds suivants. Cette tâche est infinie, c'est-à-dire que lorsque sa file d'attente est vide, un nœud attend l'arrivée d'un nouveau message pour continuer. Plusieurs nœuds affectés au même « service » pourront partager une même file d'attente.
On va réaliser cela en affectant un thread Java différent à chaque nœud. Chaque file d'attente fera donc l'objet d'accès concurrents entre les threads qui doivent la remplir et les threads qui doivent la vider.
L'application sera des manipulations d'images et chaque message servira à transporter une ligne d'une image codée en niveaux de gris. Il ne s'agit pas de faire ici du traitement d'image performant mais cela constitue une charge conséquente pour les processeurs en générant un nombre important de messages et d'opérations sur les files d'attente. Cela devrait aussi permettre de constater visuellement si nos files d'attente semblent fonctionner correctement, en particulier si des messages sont perdus. Attention, chaque exécution d'un programme faux peut donner lieu à des phénomènes différents, y compris l'absence de défaut apparent !
Pour commencer, suivant la procédure habituelle,
La racine du projet contient la classe Test et quelques
fichiers (images) pour les tests.
La classe Message du paquetage data définit la
structure des messages qui vont passer d'un nœud à un autre.
La méthode test0 de Test donne un exemple très simple. On construit un nœud de type Source et un nœud de type Display, on les connecte et on lance l'exécution de la source par start(). On a ainsi un thread qui exécute la méthode run() du nœud Source et qui, par l'intermédiaire de sendLines() et forward, place des messages dans la file d'attente du nœud Display. Il n'y a pas besoin de lancer explicitement l'exécution du nœud Display car son fonctionnement est assuré automatiquement par l'interface graphique de Java qui utilise son propre thread.
Dans la méthode test1 de Test, on veut faire de même
pour visualiser deux images.
Le problème est que la méthode run() de source1 ne
termine pas (elle est volontairement paramétrée pour itérer sans fin l'envoi
des lignes de l'image).
Pour obtenir le fonctionnement attendu, il faut que la méthode
run de chaque nœud soit exécutée par un thread différent.
Modifiez de manière appropriée la méthode start de Node.
Notez que Node est de type Runnable et qu'il existe un
constructeur de Thread approprié.
On en profitera pour donner au thread le nom du nœud qui lui
correspond ; cela sera utile pour la mise au point.
Après la modification attendue, on obtient la visualisation de deux
images et la console affiche :
source 1 started source 2 started
Déposez Node.java.
On va écrire maintenant plusieurs versions de files d'attente qui implantent l'interface MessageQueue (du paquetage data). Dans la classe Test, on utilise systématiquement la méthode getQueue comme fabrique pour construire les files. Il suffira donc de changer l'appel du constructeur dans getQueue pour changer facilement le type de file qui est expérimenté dans les tests.
On utilise maintenant des nouveaux types de nœuds, définis par la classe Processor, qui implantent le comportement général décrit en introduction. Le constructeur de Processor prend en paramètres :
On suppose pour l'instant que chaque nœud a sa propre file d'attente. Ainsi chaque file d'attente a un seul thread consommateur et, dans la méthode run() de Processor, la taille de la file ne peut pas diminuer entre l'appel à isEmpty et celui à remove. Cette méthode run() est donc correcte pour l'instant. Ce ne sera plus le cas à partir de l'exercice 5.
On passe à la méthode test2. Deux nœuds source1 et
source2 envoient leurs messages dans la file du nœud
operator.
Pour cette file, on est donc dans le schéma « 2 producteurs - 1 consommateur ».
La suite de test2 permet d'assurer la visualisation.
Le nœud operator reproduit à l'identique
les messages qu'il reçoit et il les envoie sans distinction dans les files
respectives des nœuds filter1 et filter2 qui filtrent et
transmettent sélectivement à display1 et display2.
On considère tout d'abord des files de la classe ListQueue qui est définie simplement en encapsulant une LinkedList. À l'exécution de test2, on devrait observer des défauts aléatoires, notamment :
myLock.lock(); try { return ...; } finally { myLock.unlock(); }C'est-à-dire que le bloc finally est toujours exécuté, ici entre le calcul de la valeur qui est renvoyée et le retour effectif. Cette facilité sémantique évite une écriture plus compliquée qui utiliserait une variable locale.
Déposez SafeListQueue.java.
On passe à la méthode test3 qui utilise une troisième source, ce qui tend à augmenter le nombre d'écritures dans la file du nœud operator.
Le fonctionnement avec SafeListQueue semble toujours
correct mais, après quelques temps (cela peut prendre une vingtaine de
secondes), il se produit une ou plusieurs
OutOfMemoryError (au plus, une par thread) car le vidage de la
file du nœud
operator est plus lent que son remplissage.
On va utiliser maintenant une file de capacité limitée qui utilise la
technique dite du « buffer circulaire ».
Pour éviter de perdre des messages, les threads qui accèdent à la
méthode add lorsque la file est pleine, doivent attendre
jusqu'à ce qu'une place soit libérée.
La classe BoundedQueue est une implantation naïve où on a
essayé de faire une attente active dans add.
Par exemple, pour tester avec une file de longueur 5, on doit mettre return new BoundedQueue(5); dans getQueue. Il se produit naturellement des erreurs à l'exécution.
Dupliquez la classe BoundedQueue en une nouvelle classe
SafeBoundedQueue et modifiez cette dernière pour obtenir
un fonctionnement correct.
Peut-on mettre la boucle d'attente en section d'exclusion mutuelle ?
Doit-on la mettre en dehors ?
Mettez vos justifications en commentaire dans
SafeBoundedQueue.java.
Déposez SafeBoundedQueue.java.
On est dans le schéma « n producteurs - 1 consommateur ».
En repartant de SafeBoundedQueue,
écrivez une nouvelle classe BlockingQueue qui n'utilise
pas d'attente active dans add.
Si la file est pleine, les threads qui entrent dans add
sont bloqués.
Un appel à remove permet ensuite de débloquer un des
threads ainsi bloqués.
On utilise pour cela une variable de condition.
Il s'agira ici d'un objet de type Condition, voir sa
documentation, et on le construit par la méthode
newCondition() d'un Lock.
On doit ensuite toujours utiliser cette variable de condition dans des
sections d'exclusion mutuelle protégées par son verrou.
L'attente sur la condition doit se faire par un boucle du genre :
while ( négation de la condition ) var_condition.await();Dans notre cas, awaitUninterruptibly() est préférable à await() car cela permet de différer une éventuelle interruption. Ainsi, on n'a plus besoin de gérer explicitement le cas de InterruptedException.
On testera avec la méthode test3.
Déposez BlockingQueue.java.
On veut maintenant supprimer l'attente active dans la méthode
run de la classe Processor.
Modifiez la classe BlockingQueue pour que la méthode
remove soit également bloquante lorsque la file est vide.
Pour simplifier, on considère provisoirement le cas « 1 producteur -
1 consommateur » où un seul thread peut ajouter dans la file
d'attente.
La méthode test5 permet de tester ce cas précis avec une
configuration de type pipeline.
On passe ensuite au cas général « n producteurs - m consommateurs ».
La méthode test5 est une adaptation de test3 où une
même file est partagée par deux processeurs, ce qui permet déjà de
tester le cas avec 2 consommateurs.
Déposez BlockingQueue.java et Processor.java.
On voit avec test5 que la configuration du graphe devient fastidieuse si on doit construire plusieurs nœuds destinés à travailler en parallèle, établir toutes leurs connexions et les lancer. On va définir un type de nœud qui sera vu comme unique pour ces opérations, mais qui sera capable de faire fonctionner en interne l'équivalent de plusieurs nœuds de type Processor.
Écrivez une classe ProcessorPool dont le constructeur prend les
mêmes paramètres que le constructeur de Processor avec un
paramètre additionnel qui donne le nombre de threads internes.
La file d'attente sera partagée en lecture par tous ces threads,
les appels à addConnectionTo établiront toutes les
connexions requises et un seul appel à start permettra de lancer
l'exécution de l'ensemble.
On testera de nouveau avec la méthode test3 où on aura remplacé
la construction d'un Processor par celle d'un
ProcessorPool (ligne en commentaire).
Pour tester correctement, essayez 20, 50 ou 100 processeurs et diverses
tailles de file. Il est intéressant aussi de mettre la ligne
source3.start(); en commentaire.
Déposez ProcessorPool.java (et BlockingQueue.java si vous l'avez retouchée).