TD 4 – Synchronization 2 – Thread Management
by Kaustuv Chaudhuri

 Login :  Mot de passe :

Introduction

In TD 3b we saw how to write versions of the queue data structure that allowed concurrent reads and writes, but we used the simplifying assumption of a single consumer. We used it to implement task graphs to display multiple images line by line.

In this TD

  1. We will generalize our BoundedQueue to multiple-producer-multiple-consumer (MPMC) queues. We will also change active waiting to blocking using condition variables.
  2. We will generalize our Processor nodes by allowing multiple threads per processor.
  3. We will look at the Executors framework provided by Java for managing unstructured pools of threads.
  4. We will look at Fork/Join pools, which are collections of threads with hierarchical structure.
  5. Finally, as an (optional) challenge, we will implement bounded queues using atomic primitives.

Preparation

If you did everything right, you will see a new class Test4 in the default package, some new classes (which we will cover below), and two new images in the imgs directory.

Images and Messages

[You can skip ahead to question 1 and come back here later.]

In TD 3b we only had one kind of message: image lines. In this TD, we will add three more kinds of messages:

data.PixelMessage
A single pixel, along with its source co-ordinates (x, y).
data.PixelReferenceMessage
A position in a pixel buffer (an instance of util.PixelBuffer), which also gives access to the neighbors of a pixel. The pixel buffer must not be modified!
data.TileMessage
A rectangular array (“tile”) of pixels. Contains the actual PixelBuffer the tile is a part of. Pixels outside the tile should not be modified!

You do not need to understand the structure of these messages to finish this TD, but it is useful to look at the implementation of util.PixelBuffer.

We also add two new kinds of Source nodes:

nodes.PixelSource
Sends the pixels of the message. The pixels are sent top to bottom, left to right.
nodes.TileSource
Sends entire frames of an image as TileMessages.

We add a few new kinds of MessageProcessors:

data.PixelDistorter
A processor that distorts the source pixels of an image. You cannot construct instances of this class directly; there are a fixed number of instances of this class, including: These distorters can be tested using Test4.setupMultiDistort().
data.LineDistorter
A processor to distort lines. Again, only the following instances are allowed:

Finally, we add one new kind of filtering node, ColorSelectors, to select particular colors in a message. As with the distorters above, we restrict the instances to the following: ColorSelector.red, ColorSelector.green, and ColorSelector.blue.

Bounded Blocking Queues

Condition Variables

Active waiting wastes processor cycles; in the modern world of mobile computing, wasted cycles amount to wasted battery time and excess heat, both undesirable.

Write the class data.BlockingBoundedQueue which is like data.BoundedQueue but is thread-safe and both add() and remove() are (potentially) blocking. More precisely:

boolean add(Message msg)
This method will always return true. If the queue is currently full, it will block the calling thread until space becomes available.
Message remove()
This method will remove the first message in the queue and return it. If the queue is currently empty, it will block the calling thread until a new message arrives.

You will implement blocking using condition variables. As you saw in Amphi 4, condition variables are instances of java.util.concurrent.lock.Condition and are attached to a java.util.concurrent.locks.Lock. They are created using lock.newCondition(). In order to use the variable, you will use a loop of the following kind:

while(negation of condition)
  conditionVar.awaitUninterruptibly();

The test in the while loop should be the negation of the semantics of the condition variable. For instance, if the condition variable represents emptiness, then you would have:

  Condition empty = lock.newCondition();
  ...
  lock.lock();
  ...
  while (!this.isEmpty())
    empty.awaitUninterruptibly();

We use awaitUninterruptibly() instead of await() to avoid dealing with thread interruptions (the InterruptedException exception). If your threads are interrupted during a queue operation, the semantics is unspecified and you may implement any failure method you wish.

Recall from Amphi 4 that the await() and awaitUninterruptibly() methods release the lock associated with the condition variable and puts the calling thread to sleep; when a signal on the variable re-awakens the thread, it reacquires the lock.

To signal a condition, you have to use conditionVar.signal() or signalAll().

Test your implementation by uncommenting one of the following in Test4.main(): Test4.setupSplit(), Test4.setupDistort() or Test4.setupMultiDistort1(). Remember to change Test4.getQueue() as well.

Submit BlockingBoundedQueue.java.

Multiple Consumers

A Fixed Number of Threads Per Processor

As you saw in TD 3b, the single thread in a processor node can get overwhelmed, leading to either unbounded space consumption in the input queue or a slowing down of the entire task graph. In this exercise, you will create a variant of nodes.Processor that will allow for a fixed number (≥ 1) of threads.

Create a class nodes.ProcessorN that inherits from node.Processor, with:

Constructor: ProcessorN(int numThreads, MessageQueue q, MessageProcessor p, String name)
The additional argument numThreads is used to indicate the number of threads this processor node should run. To call a constructor of a superclass, you need to use super(), which has to be the very first statement in the constructor body. So, start your class as follows:
package nodes;

public class ProcessorN extends Processor {
   ProcessorN(int numThreads, MessageQueue q, MessageProcessor p, String name){
     super(q, p, name);
     // your code
   }
}
public void init()
This method should spawn numThreads threads, each of which executes this.run() (which is inherited from Processor).

Test this by running Test4.setupMultiDistortN() in Test4.main().

Submit ProcessorN.java.

Thread Pools using Executors

Sometimes we may not want to create and run all the threads at the same time. Instead, we may want simply to bound the concurrency of the processor, allowing it to run at most n threads. This would mean that we need to be able to spawn new threads when needed, as long as we do not exceed the concurrency limit.

Java provides a convenient framework for managing such thread pools using Executors, which are implementations of java.util.concurrent.Executor. The most important method in an executor is the execute() method that takes an instance of a Runnable and schedules its computation. For example, here is how we can schedule a computation that prints "bonjour":

exec.execute(new Runnable(){
  @Override
  public void run(){
    System.out.println("bonjour");
  }
});

This creates an anonymous class that is an instance of Runnable; like all instances of Runnable, there must be an implementation of the run() method, which is the computation that the executor will run. Note the similarity to the Thread class: the above code should (eventually) have the same effect as running:

new Thread(new Runnable(){
  @Override
  public void run(){
    System.out.println("bonjour");
  }
}).start();

Where and when the computation runs depends on what kind of executor it is. In this exercise, you should focus on the following four executors:

Executors.newSingleThreadExecutor()
Uses exactly one thread to execute all compuations.
Executors.newCachedThreadPool()
Will run a computation in a new thread unless an earlier computation finished, in which case it will reuse that thread. This amortizes the cost of constructing new threads. If your task never needs more than n concurrent threads, then this executor will create no more than n threads.
Executors.newFixedThreadPool(int nThreads)
Runs a maximum of nThreads threads to execute submitted computations. If more computations are submitted than available threads, they are placed in an internal unbounded waiting queue.
Executors.newWorkStealingPool(int parallelism) (Java 1.8+)
Runs enough threads to achieve a desired parallelism. If the system is overloaded, meaning that parallelism number of threads are already created, then new computations will be distributed to separate queues per thread in some fair manner. If a thread finishes a computation early, it can steal a computation from the waiting queue of a different thread in the pool.

Create the class nodes.ProcessorPool that has the same interface as ProcessorN and uses an executor of your choice. Here is a skeleton that you can start from:

package nodes;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import data.MessageProcessor;
import data.MessageQueue;
import data.Message;

public class ProcessorPool extends Processor {
    private final ExecutorService exec;

    public ProcessorPool(int concur, MessageQueue q, MessageProcessor p, String name) {
        super(q, p, name);
        this.exec = Executors.newCachedThreadPool(); // for example
    }

    @Override
    public void run() {
        while (true) {
            Message msg = this.queue.remove();
            // your code
        }
    }
}

In the run() method, submit a suitable Runnable instance that will process msg. You can make use of processMessage() inherited from Processor.

Test your code by running Test4.setupMultiDistortPool() in Test4.main().

Submit ProcessorPool.java.

Fork/Join Thread Pools

Executors are well suited for an unorganized pool of threads, but sometimes our threads have a hierarchy. You saw this in the Fibonacci example in TD 2. Java provides an alternative for these cases called java.util.concurrent.ForkJoinPool, which we are now going to use to process an image by divide an conquer.

You already have a processor called data.TileDistorter that is intended to perform a distortion on PixelBuffers. Your job is to write the processTile() method. You will perform the following steps:

  1. Write a class that extends java.util.concurrent.RecursiveAction, which itself extends java.util.concurrent.ForkJoinTask.
  2. In the constructor for this class, accept a source PixelBuffer and a destination pixel array in addition to any book-keeping information to keep track of which tile you are working on.
  3. Override its compute() method to compute the pixels of the destination buffer based on the source tile. You will use the process() method of the PixelDistorter that was supplied to the TileDistorter, available in the field distorter.
  4. Divide up the computation as follows: if the number of pixels in the source tile is less than a given threshold (say 64 pixels), then just do the computation. Otherwise, divide the task into at least two smaller tasks, which will be new instances of the same class, and use invokeAll() to recursively invoke them. Note that invokeAll() accepts a variable number of parameters which are the tasks to execute. You can also put the tasks in a Collection, such as a LinkedList, and call invokeAll() on the list.
  5. Finally, in the process() method of TileDistorter, create (1) a new ForkJoinPool, (2) an instance of the class you wrote above with the correct parameters to process the entire tile, and then (3) use the invoke() method of the ForkJoinPool to run the computation.

Test your code by running Test4.setupDistortFrame() and Test4.setupDistortAnimation() in Test4.main(). Also experiment with different thresholds (definitely try a threshold of 1 pixel), and with numbers of threads assigned to the ForkJoinPool, which you can set in its constructor argument. If you are feeling adventurous, try running this on some large images, such as those from:

Submit TileDistorter.java.

Extra

Optional: Atomic Bounded Queues

Write a lock-free version of LockedBoundedQueue called AtomicBoundedQueue that uses atomic primitives instad of locks, condition variables, or blocking. This queue should not block in add().

HINTS
  1. Use two global atomic counters to count the number of successful add() and remove() steps respectively. These counters should strictly increase.
  2. For each cell in the queue, you need to store a sequence number that also strictly increases. You can use a separate AtomicIntegerArray or just create a special class for queue cells with an AtomicInteger.
  3. The \(n\)th add() in a cell with sequence number \(k\) can succeed if and only if \(n = k\), in which case \(k\) should be incremented. If \(n < k\) then there is no more space in the queue. If \(n > k\), then some other add() snuck in and added something before you (and incremented \(k\)), so you need to try again.
  4. Similarly, the \(n\)th remove remove() should succeed for a cell with sequence number \(k\) if and only if \(n + 1 = k\), in which case the value of \(k\) should be incremented by the size of the queue (so that a future add() can succeed on this entry). If \(n + 1 > k\), then there is nothing to remove from the queue, so the calling thread must wait. If \(n + 1 < k\), then some other remove() already took what was there, so you need to try again.

Submit AtomicBoundedQueue.java.