Synchronisation et files d'attente
par Philippe Chassignet

 Login :  Mot de passe :

Introduction

Dans ce TD, nous allons écrire et expérimenter successivement plusieurs versions de files d'attente partagées. Cela sera utilisé pour développer une architecture de calcul de type Task Graph où un certain nombre de nœuds spécialisés réalisent chacun une partie du calcul voulu. Les données sont alors transmises de nœuds en nœuds par l'envoi de messages. Les nœuds sont organisés de manière statique pour former un graphe acyclique orienté dont les arcs déterminent la relation source - destinataire pour l'envoi des messages.

Pour permettre un fonctionnement asynchrone, chaque nœud comporte en entrée une file d'attente. Envoyer des données à un nœud consiste donc à ajouter des messages contenant ces données dans sa file d'attente. La tâche d'un nœud consiste à prendre le premier message dans sa file d'attente, réaliser un certain traitement sur les données contenues dans ce message et envoyer son résultat aux nœuds suivants. Cette tâche est infinie, c'est-à-dire que lorsque sa file d'attente est vide, un nœud attend l'arrivée d'un nouveau message pour continuer. Plusieurs nœuds affectés au même « service » pourront partager une même file d'attente.

On va réaliser cela en affectant un thread Java différent à chaque nœud. Chaque file d'attente fera donc l'objet d'accès concurrents entre les threads qui doivent la remplir et les threads qui doivent la vider.

L'application sera des manipulations d'images et chaque message servira à transporter une ligne d'une image codée en niveaux de gris. Il ne s'agit pas de faire ici du traitement d'image performant mais cela constitue une charge conséquente pour les processeurs en générant un nombre important de messages et d'opérations sur les files d'attente. Cela devrait aussi permettre de constater visuellement si nos files d'attente semblent fonctionner correctement, en particulier si des messages sont perdus. Attention, chaque exécution d'un programme faux peut donner lieu à des phénomènes différents, y compris l'absence de défaut apparent !

Préparation

Pour commencer, suivant la procédure habituelle,

La racine du projet contient la classe Test et quelques fichiers (images) pour les tests.
La classe Message du paquetage data définit la structure des messages qui vont passer d'un nœud à un autre.

On n'aura pas à modifier cette classe. Les autres classes du paquetage data concernent l'implantation des files d'attente et elles seront introduites au moment de leur utilisation.
La classe abstraite Node du paquetage nodes définit le comportement de base d'un nœud. Les autres classes du paquetage nodes sont des implantations particulières de Node. Les nœuds de type Source et AnimatedSource lisent des fichiers, supposés être des images, et envoient ces images, ligne par ligne, dans des messages à leurs nœuds destinataires. Les nœuds de type Display reconstituent des images à partir des messages qu'ils reçoivent. Il est inutile de regarder le détail de ces trois classes.
Les autres classes du paquetage nodes seront introduites au moment de leur utilisation.

La méthode test0 de Test donne un exemple très simple. On construit un nœud de type Source et un nœud de type Display, on les connecte et on lance l'exécution de la source par start(). On a ainsi un thread qui exécute la méthode run() du nœud Source et qui, par l'intermédiaire de sendLines() et forward, place des messages dans la file d'attente du nœud Display. Il n'y a pas besoin de lancer explicitement l'exécution du nœud Display car son fonctionnement est assuré automatiquement par l'interface graphique de Java qui utilise son propre thread.

Un Thread par nœud

Dans la méthode test1 de Test, on veut faire de même pour visualiser deux images. Le problème est que la méthode run() de source1 ne termine pas (elle est volontairement paramétrée pour itérer sans fin l'envoi des lignes de l'image). Pour obtenir le fonctionnement attendu, il faut que la méthode run de chaque nœud soit exécutée par un thread différent.
Modifiez de manière appropriée la méthode start de Node. Notez que Node est de type Runnable et qu'il existe un constructeur de Thread approprié.
On en profitera pour donner au thread le nom du nœud qui lui correspond ; cela sera utile pour la mise au point. Après la modification attendue, on obtient la visualisation de deux images et la console affiche :

source 1 started
source 2 started

Déposez Node.java.

Files d'attente

On va écrire maintenant plusieurs versions de files d'attente qui implantent l'interface MessageQueue (du paquetage data). Dans la classe Test, on utilise systématiquement la méthode getQueue comme fabrique pour construire les files. Il suffira donc de changer l'appel du constructeur dans getQueue pour changer facilement le type de file qui est expérimenté dans les tests.

On utilise maintenant des nouveaux types de nœuds, définis par la classe Processor, qui implantent le comportement général décrit en introduction. Le constructeur de Processor prend en paramètres :

Les classes Operator et Selector (du paquetage data) implantent la méthode process pour, respectivement, simuler un traitement ligne par ligne des images et filtrer des messages en fonction de leur champ channel.

On suppose pour l'instant que chaque nœud a sa propre file d'attente. Ainsi chaque file d'attente a un seul thread consommateur et, dans la méthode run() de Processor, la taille de la file ne peut pas diminuer entre l'appel à isEmpty et celui à remove. Cette méthode run() est donc correcte pour l'instant. Ce ne sera plus le cas à partir de l'exercice 5.

Synchronisation d'une file d'attente

On passe à la méthode test2. Deux nœuds source1 et source2 envoient leurs messages dans la file du nœud operator. Pour cette file, on est donc dans le schéma « 2 producteurs - 1 consommateur ».
La suite de test2 permet d'assurer la visualisation. Le nœud operator reproduit à l'identique les messages qu'il reçoit et il les envoie sans distinction dans les files respectives des nœuds filter1 et filter2 qui filtrent et transmettent sélectivement à display1 et display2.

On considère tout d'abord des files de la classe ListQueue qui est définie simplement en encapsulant une LinkedList. À l'exécution de test2, on devrait observer des défauts aléatoires, notamment :

Les méthodes de LinkedList ne sont pas conçues pour supporter des opérations concurrentes et, dans notre programme, les méthodes add et remove peuvent être appelées « simultanément » dans des threads différents, y compris plusieurs appels à add et on peut ainsi casser les invariants internes de LinkedList.
Dupliquez la classe ListQueue en un nouvelle classe SafeListQueue et modifiez cette dernière de manière à assurer l'exclusion mutuelle dans les méthodes add et remove. On utilisera pour cela un objet de type ReentrantLock, voir sa documentation. Notez que la construction suivante est correcte :
    myLock.lock();
    try {
      return ...;
    } finally {
      myLock.unlock();
    }
C'est-à-dire que le bloc finally est toujours exécuté, ici entre le calcul de la valeur qui est renvoyée et le retour effectif. Cette facilité sémantique évite une écriture plus compliquée qui utiliserait une variable locale.
Que faut-il faire pour size et isEmpty ? Mettez vos justifications en commentaire dans SafeListQueue.java. Maintenant, en appelant le constructeur de SafeListQueue dans getQueue, le fonctionnement de test2 doit être correct.

Déposez SafeListQueue.java.

Une file bloquante par attente active

On passe à la méthode test3 qui utilise une troisième source, ce qui tend à augmenter le nombre d'écritures dans la file du nœud operator.

Le fonctionnement avec SafeListQueue semble toujours correct mais, après quelques temps (cela peut prendre une vingtaine de secondes), il se produit une ou plusieurs OutOfMemoryError (au plus, une par thread) car le vidage de la file du nœud operator est plus lent que son remplissage.
On va utiliser maintenant une file de capacité limitée qui utilise la technique dite du « buffer circulaire ». Pour éviter de perdre des messages, les threads qui accèdent à la méthode add lorsque la file est pleine, doivent attendre jusqu'à ce qu'une place soit libérée. La classe BoundedQueue est une implantation naïve où on a essayé de faire une attente active dans add.

Par exemple, pour tester avec une file de longueur 5, on doit mettre return new BoundedQueue(5); dans getQueue. Il se produit naturellement des erreurs à l'exécution.

Dupliquez la classe BoundedQueue en une nouvelle classe SafeBoundedQueue et modifiez cette dernière pour obtenir un fonctionnement correct.
Peut-on mettre la boucle d'attente en section d'exclusion mutuelle ? Doit-on la mettre en dehors ? Mettez vos justifications en commentaire dans SafeBoundedQueue.java.

Déposez SafeBoundedQueue.java.

Utilisation d'une variable de condition

On est dans le schéma « n producteurs - 1 consommateur ». En repartant de SafeBoundedQueue, écrivez une nouvelle classe BlockingQueue qui n'utilise pas d'attente active dans add.
Si la file est pleine, les threads qui entrent dans add sont bloqués. Un appel à remove permet ensuite de débloquer un des threads ainsi bloqués.

On utilise pour cela une variable de condition. Il s'agira ici d'un objet de type Condition, voir sa documentation, et on le construit par la méthode newCondition() d'un Lock. On doit ensuite toujours utiliser cette variable de condition dans des sections d'exclusion mutuelle protégées par son verrou.
L'attente sur la condition doit se faire par un boucle du genre :

      while ( négation de la condition )
        var_condition.await();
Dans notre cas, awaitUninterruptibly() est préférable à await() car cela permet de différer une éventuelle interruption. Ainsi, on n'a plus besoin de gérer explicitement le cas de InterruptedException.
L'envoi d'un signal sur la condition se fait par une de ses méthodes signal() ou signalAll() dont le bon usage a été discuté en amphi. Souvenez-vous que si un thread est parvenu à exécuter await, c'est qu'il possédait le verrou. Au moment où il bloque, il rend le verrou. Quand il recevra le signal, il devra alors entrer en compétition pour reprendre le verrou. Pendant ce temps, la condition peut de nouveau changer.

On testera avec la méthode test3.

Déposez BlockingQueue.java.

Une file bloquante en écriture et en lecture

On veut maintenant supprimer l'attente active dans la méthode run de la classe Processor. Modifiez la classe BlockingQueue pour que la méthode remove soit également bloquante lorsque la file est vide.
Pour simplifier, on considère provisoirement le cas « 1 producteur - 1 consommateur » où un seul thread peut ajouter dans la file d'attente. La méthode test5 permet de tester ce cas précis avec une configuration de type pipeline.

On passe ensuite au cas général « n producteurs - m consommateurs ».
La méthode test5 est une adaptation de test3 où une même file est partagée par deux processeurs, ce qui permet déjà de tester le cas avec 2 consommateurs.

Déposez BlockingQueue.java et Processor.java.

« Thread Pool »

On voit avec test5 que la configuration du graphe devient fastidieuse si on doit construire plusieurs nœuds destinés à travailler en parallèle, établir toutes leurs connexions et les lancer. On va définir un type de nœud qui sera vu comme unique pour ces opérations, mais qui sera capable de faire fonctionner en interne l'équivalent de plusieurs nœuds de type Processor.

Gérer un « pool » de processeurs

Écrivez une classe ProcessorPool dont le constructeur prend les mêmes paramètres que le constructeur de Processor avec un paramètre additionnel qui donne le nombre de threads internes. La file d'attente sera partagée en lecture par tous ces threads, les appels à addConnectionTo établiront toutes les connexions requises et un seul appel à start permettra de lancer l'exécution de l'ensemble.
On testera de nouveau avec la méthode test3 où on aura remplacé la construction d'un Processor par celle d'un ProcessorPool (ligne en commentaire).
Pour tester correctement, essayez 20, 50 ou 100 processeurs et diverses tailles de file. Il est intéressant aussi de mettre la ligne source3.start(); en commentaire.

Déposez ProcessorPool.java (et BlockingQueue.java si vous l'avez retouchée).