In TD 3b we saw how to write versions of the queue data structure that allowed concurrent reads and writes, but we used the simplifying assumption of a single consumer. We used it to implement task graphs to display multiple images line by line.
In this TD
BoundedQueue
to multiple-producer-multiple-consumer (MPMC) queues. We will also change
active waiting to blocking using condition variables.Processor
nodes by allowing multiple threads per processor.TD34
project from TD 3b, but you can also create a new project if you want.td34_4.zip
and unzip it in your TD34
project. This will update many of the
source files from TD 3b.If you did everything right, you will see a new class Test4
in the default package, some new classes (which we will cover
below), and two new images in the imgs
directory.
[You can skip ahead to question 1 and come back here later.]
In TD 3b we only had one kind of message: image lines. In this TD, we will add three more kinds of messages:
data.PixelMessage
x
, y
).data.PixelReferenceMessage
util.PixelBuffer
), which also gives access to the neighbors of a pixel.
The pixel buffer must not be modified!data.TileMessage
PixelBuffer
the tile is a part of. Pixels outside the tile
should not be modified!You do not need to understand the structure of these messages to finish this TD, but it is useful to look at the implementation of
util.PixelBuffer
.
We also add two new kinds of Source
nodes:
nodes.PixelSource
nodes.TileSource
TileMessage
s.We add a few new kinds of MessageProcessors
:
data.PixelDistorter
PixelDistorter.gaussianBlur
: which blurs a pixel based on its neighboring pixelsPixelDistorter.edgeDetect
: which replaces pixels by their intensity gradients, which has the effect of highlighting
“edges”.PixelDistorter.sharpen
: which applies a “sharpen” convolutionPixelDistorter.emboss
: which displaces pixels along intensity gradients, which has the effect of making the image appear
to be carved into the canvas.PixelDistorter.luminosity
: which replaces each pixel by its luminosity, which is how bright the pixel would appear to the
rod cells in the average human eye.Test4.setupMultiDistort()
.
data.LineDistorter
LineDistorter.rotate1
: rotates a line, shifting all but the rightmost pixel to the right and placing the rightmost pixel
at the left edge.LineDistorter.rotateN(int n)
: rotate a line n
times.LineDistorter.randomize
: randomly shuffles the pixels in the line.Finally, we add one new kind of filtering node, ColorSelector
s, to select particular colors in a message. As with the
distorters above, we restrict the instances to the following: ColorSelector.red
, ColorSelector.green
, and
ColorSelector.blue
.
Active waiting wastes processor cycles; in the modern world of mobile computing, wasted cycles amount to wasted battery time and excess heat, both undesirable.
Write the class data.BlockingBoundedQueue
which is like data.BoundedQueue
but is thread-safe and both
add()
and remove()
are (potentially) blocking. More precisely:
boolean add(Message msg)
true
. If the queue is currently full, it will block the calling thread until space becomes
available.Message remove()
You will implement blocking using condition variables. As you saw in Amphi 4, condition variables are instances of java.util.concurrent.lock.Condition
and are attached to a java.util.concurrent.locks.Lock
.
They are created using lock.newCondition()
. In order to use
the variable, you will use a loop of the following kind:
while(negation of condition) conditionVar.awaitUninterruptibly();
The test in the while
loop should be the negation of the semantics of the condition variable. For instance, if the condition variable
represents emptiness, then you would have:
Condition empty = lock.newCondition(); ... lock.lock(); ... while (!this.isEmpty()) empty.awaitUninterruptibly();
We use awaitUninterruptibly()
instead of await()
to avoid dealing with thread interruptions (the
InterruptedException
exception). If your threads are interrupted during a queue operation, the semantics is unspecified and
you may implement any failure method you wish.
Recall from Amphi 4 that the await()
and awaitUninterruptibly()
methods release the lock associated with the
condition variable and puts the calling thread to sleep; when a signal on the variable re-awakens the thread, it reacquires the lock.
To signal a condition, you have to use conditionVar.signal()
or signalAll()
.
Test your implementation by uncommenting one of the following in Test4.main()
: Test4.setupSplit()
,
Test4.setupDistort()
or Test4.setupMultiDistort1()
. Remember to change Test4.getQueue()
as well.
Submit BlockingBoundedQueue.java.
As you saw in TD 3b, the single thread in a processor node can get overwhelmed, leading to either unbounded space consumption in the
input queue or a slowing down of the entire task graph. In this exercise, you will create a variant of nodes.Processor
that
will allow for a fixed number (≥ 1) of threads.
Create a class nodes.ProcessorN
that inherits from node.Processor
, with:
ProcessorN(int numThreads, MessageQueue q, MessageProcessor p, String name)
numThreads
is used to indicate the number of threads this processor node should run. To call a
constructor of a superclass, you need to use super()
, which has to be the very first statement in the constructor body. So,
start your class as follows:
package nodes; public class ProcessorN extends Processor { ProcessorN(int numThreads, MessageQueue q, MessageProcessor p, String name){ super(q, p, name); // your code } }
public void init()
numThreads
threads, each of which executes this.run()
(which is inherited from
Processor
).Test this by running Test4.setupMultiDistortN()
in Test4.main()
.
Submit ProcessorN.java.
Sometimes we may not want to create and run all the threads at the same time. Instead, we may want simply to bound the
concurrency of the processor, allowing it to run at most n
threads. This would mean that we need to be able to spawn
new threads when needed, as long as we do not exceed the concurrency limit.
Java provides a convenient framework for managing such thread pools using Executors, which are implementations of
java.util.concurrent.Executor
.
The most important method in an executor is the execute()
method that takes an instance of a Runnable
and
schedules its computation. For example, here is how we can schedule a computation that prints "bonjour"
:
exec.execute(new Runnable(){ @Override public void run(){ System.out.println("bonjour"); } });
This creates an anonymous class that is an instance of Runnable
; like all instances of Runnable
,
there must be an implementation of the run()
method, which is the computation that the executor will run. Note the similarity
to the Thread
class: the above code should (eventually) have the same effect as running:
new Thread(new Runnable(){ @Override public void run(){ System.out.println("bonjour"); } }).start();
Where and when the computation runs depends on what kind of executor it is. In this exercise, you should focus on the following four executors:
Executors.newSingleThreadExecutor()
Executors.newCachedThreadPool()
Executors.newFixedThreadPool(int
nThreads)
nThreads
threads to execute submitted computations. If more computations are submitted than available
threads, they are placed in an internal unbounded waiting queue.Executors.newWorkStealingPool(int
parallelism)
(Java 1.8+)parallelism
. If the system is overloaded, meaning that parallelism
number of threads are already created, then new computations will be distributed to separate queues per thread in some
fair manner. If a thread finishes a computation early, it can steal a computation from the waiting queue of a different thread in
the pool.Create the class nodes.ProcessorPool
that has the same interface as ProcessorN
and uses an executor of your
choice. Here is a skeleton that you can start from:
package nodes; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import data.MessageProcessor; import data.MessageQueue; import data.Message; public class ProcessorPool extends Processor { private final ExecutorService exec; public ProcessorPool(int concur, MessageQueue q, MessageProcessor p, String name) { super(q, p, name); this.exec = Executors.newCachedThreadPool(); // for example } @Override public void run() { while (true) { Message msg = this.queue.remove(); // your code } } }
In the run()
method, submit a suitable Runnable
instance that will process msg
. You can make use
of processMessage()
inherited from Processor
.
Test your code by running Test4.setupMultiDistortPool()
in Test4.main()
.
Submit ProcessorPool.java.
Executors are well suited for an unorganized pool of threads, but sometimes our threads have a hierarchy. You saw this in the Fibonacci
example in TD 2. Java provides an alternative for these cases called java.util.concurrent.ForkJoinPool
,
which we are now going to use to process an image by divide an conquer.
You already have a processor called data.TileDistorter
that is intended to perform a distortion on
PixelBuffer
s. Your job is to write the processTile()
method. You will perform the following steps:
java.util.concurrent.RecursiveAction
,
which itself extends java.util.concurrent.ForkJoinTask
.PixelBuffer
and a destination pixel array in addition to any
book-keeping information to keep track of which tile you are working on.compute()
method to compute the pixels of the destination buffer based on the source tile. You will use the
process()
method of the PixelDistorter
that was supplied to the TileDistorter
, available in the
field distorter
.invokeAll()
to recursively invoke them. Note that invokeAll()
accepts a variable number of parameters which are the tasks to execute. You
can also put the tasks in a Collection
, such as a LinkedList
, and call invokeAll()
on the list.process()
method of TileDistorter
, create (1) a new ForkJoinPool
, (2) an
instance of the class you wrote above with the correct parameters to process the entire tile, and then (3) use the invoke()
method of the ForkJoinPool
to run the computation.Test your code by running Test4.setupDistortFrame()
and Test4.setupDistortAnimation()
in
Test4.main()
. Also experiment with different thresholds (definitely try a threshold of 1 pixel), and with numbers of threads
assigned to the ForkJoinPool
, which you can set in its constructor argument. If you are feeling adventurous, try running this
on some large images, such as those from:
Submit TileDistorter.java.
Write a lock-free version of LockedBoundedQueue
called AtomicBoundedQueue
that uses atomic primitives instad
of locks, condition variables, or blocking. This queue should not block in add()
.
add()
and remove()
steps respectively. These
counters should strictly increase.AtomicIntegerArray
or just create a special class for queue cells with an AtomicInteger
.add()
in a cell with sequence number \(k\) can succeed if and only if \(n = k\), in which case \(k\) should be
incremented. If \(n < k\) then there is no more space in the queue. If \(n > k\), then some other add()
snuck in and
added something before you (and incremented \(k\)), so you need to try again.remove()
should succeed for a cell with sequence number \(k\) if and only if \(n + 1 = k\),
in which case the value of \(k\) should be incremented by the size of the queue (so that a future add()
can succeed on this
entry). If \(n + 1 > k\), then there is nothing to remove from the queue, so the calling thread must wait. If \(n + 1 < k\), then
some other remove()
already took what was there, so you need to try again.Submit AtomicBoundedQueue.java.