In this TD, which continues into TD4, we will successively write and experiment with several versions of shared message queues
(«files d'attente»), similar to the LinkedBlockingQueue
from TD1. These will be used to develop a computing
architecture called task dependency graphs, where a certain number of specialized nodes each perform a portion of the desired
computation. The data will then be transmitted between nodes by message passing. The nodes are organized in a static matter to form a
directed acyclic graph whose edges determine the sources and desitinations for the messages.
In order to to be asynchronous, each node contains an input message queue. Sending data to a node therefore consists of adding messages containing the data to its queue. The task of a node is to take the first message from its queue, perform the relevant processing of that data, and then to add it to the input queue of a different node. This task is infinite, that is, when its input queue is empty, the node waits for the arrival of a new message before continuing. Several nodes corresponding to the same “service” may share the same input queue.
We will implement this using different Java threads for each node. Each input queue will be accessed concurrently by the threads which add elements to it and by other threads that consume elements from it.
Our application will be the manipulation of images. Each message will serve to transmit a line of an image encoded as int
ARGB pixels. This TD will not concern itself with efficient implementation
of image manipulation algorithms, but it will serve to load the processors and to generate a number of operational messages for the message
queues. It will also allow us to examine visually if our queues are performing correctly, in particular if certain messages are lost.
Warning: each execution of an incorrect program may give rise to different phenomena, including the absence of apparent errors. It is important to observe a number of correct executions before deciding if a test has succeeded.
Start with the usual procedure.
TD34
to cover both TD3 and TD4.td34.zip
, which contains the core implementations of image manipulation utilities (which
you can ignore), and a framework for images, nodes, etc.TD34
directory, then refresh the project in Eclipse (using F5
).If you did everything right, you will see the class Test3
in the default package, three other packages named
nodes
, data
, and util
, and a directory called imgs
that contain a small number of test
images.
The class data.Message, which you don't have to modify, defines the structure of messages that will be sent from one
node to another. In this TD we will focus on a subclass, data.LineMessage
, that has the following fields.
content
, which is a String
that contains the payload. The encoding of image lines as strings artificially
slows down the operations, which allows us to more easily induce many types of synchronization errors.num
, indicating the line of the image. This field is important to reconstitute and image at the end, since the lines of
the image may be processed in an arbitrary order, and messages may be duplicated or lost.channel
(inherited from Message
), which identifies the computational stream a message belongs to. For
example, we can send concurrently lines from several different images to the same node, and use the channel
field to
correctly separate the lines for one image from those of the other. We will use this field in the tests that follow.The other classes of the data
package concerns the implementation of waiting queues and will be introduced at the point
where they are to be used in the exercises.
The abstract class nodes.Node
forms the base of all nodes. It contains the following methods.
void addConnectionTo(Node dest)
- Used to add
dest
to the collection of outgoing nodes ofthis
.void init()
- This method, which will be modified in exercise 1, serves to initiate the task of the node.
void putInQueue(Message msg)
- Adds
msg
to the input message queue ofthis
. Note that this method is never called bythis
or by any node that outputs tothis
(can you see why?).final void forward(Message msg)
- This method, which cannot be overridden in subclasses, is used to send
msg
to all the outgoing nodes fromthis
.
The other classes in the nodes
package implement particular kinds of Node
. Source
and
AnimatedSource
nodes read files (which are supposed to be images) and output the lines of the images to their outgoing nodes.
Display
nodes reconstitute an image from the lines it received in its incoming message queue. The details of these three
classes are not important for this TD. The remaining classes in this package will be introduced as they become relevant for the TD.
The method Test.processOneImage()
is a very simple example of the use of Node
s. It constructs a single
Source
node (S) and a single Display
node (D), and directly connects the former to the latter.
The init()
method of the Source
is launched in a separate thread, and it uses the sendLines()
and
forward()
methods to send the lines of the image to the Display
node. The init()
method of
Display
does not need to be started explicitly as it runs in the graphical interface thread of Java (which is different from
the main execution thread). The following image summarizes processOneImage()
, where nodes are labelled circles and messages
being sent are represented with blue lines.
In the method processTwoImages()
of Test
, we want to do the same thing as processOneImage()
but
for two images at once.
The second image is not displayed. Why?
The problem is that the method init()
(which by default just calls run()
) of Node
never
terminates. Indeed, it was specified as an infinite task that continually sends lines of an image even if there are no more lines to
send. Obviously, we need to execute different source nodes in separate threads.
Modify the init()
method of nodes.Node
to launch the computation of the node in a separate
Thread
. Keep in mind that Node
is an instance of java.lang.Runnable
; as you have seen in
Amphi 1 and in chapter 2 of the Poly, one of the consturctors of java.lang.Thread
can be used to create new
threads from instances of Runnable
. If in doubt, always consult the API documentation.
Run Test
by suitably modifying its main()
method, you should see the following output in the console:
source1 started source2 started
Submit Node.java.
We will now write a number of variations on message queues that implement the interface data.MessageQueue
. In the
Test
class, we will use the Test.getQueue()
method to create the queues, so it suffices to just change the call
to the constructor in this method to change the implementation of the queue used in a test.
We will also use a new kind of Node
called a Processor
which has the following fields:
queue
: a message queue (instance of an implementation of data.MessageQueue
). Different Processor
s
may use different implementations of message queues. Each MessageQueue
contains the following methods:
boolean add(Message msg)
- This adds the message
msg
to the queue. Returnstrue
if and only if the add succeeded. This method should not block the caller!boolean isEmpty()
- Returns
false
if and only if there are any elements in the queue.Message remove()
- Removes the oldest message from the queue. If there are no messages in the queue, the calling thread should wait, either actively or passively, until new messages arrive. You can use
Node.sleepUninterruptibly()
to sleep for a while (10 ms should be sufficient) if the queue is empty.
processor
: a message processor (instance of an implementation of data.MessageProcessor
) that
provides the method process()
that determines the computation necessary for an incoming message.nodeName
: inherited from nodes.Node
, used for disambiguation and debugging.Two implementations of message processors are provided: data.Identity
and data.ChannelSelector
.
data.Identity
- Simply recreates the input message for its output, that is, it implements the identity operator.
data.ChannelSelector
- Filters certain incoming messages based on its
channel
field. The channel to be selected is specified in the constructor forChannelSelector
.
We assume for now that every Processor
has its own message queue. Therefore, every queue has a single thread that consumes
its messages in its run()
method. You may therefore assume that the size of the queue does not decrease between a call to
isEmpty()
and remove()
. We will relax these assumptions in TD 4, and you will have to fix any code that relies on
it.
In this exercise we will work with the following static task graph, constructed in Test.test2P1C()
.
Two source nodes, source1
(S1 ) and source2
(S2 ), will both send
messages to an identity node, operator
(I). This node node will then forward its messages to two channel selector
nodes, filter1
(F1 ) and filter2
(F2 ), each of which will in turn filter
out a particular channel and send it to the corresponding display node, display1
(D1 ) or
display2
(D2 ).
The most important element of the graph is the 2-producers-1-consumer shape of the first stage of the graph, where the same
input message queue of operator
is concurrently accessed by two source nodes. It is therefore important for the data structure
used to construct message queues to allow for concurrent accesses.
You are provided a trivial and unsafe implementation, data.ListQueue
, created using linked lists. If you modify
Test.main()
to call Test.test2P1C()
, you may observe that:
IllegalArgumentException
is raised from a method of the Processor
class when a
remove()
returns a null message.Why do we see these errors? Because the linked list implementation is not designed for concurrent accesses. In our task graph, both
sources can call the add()
method simultaneously on the same list, causing one of the underlying concurrent writes to be lost.
Likewise, the operator
thread will call remove()
simultaneously with the source threads calling
add()
, which causes a race condition.
Duplicate the data.ListQueue
class to data.LockedListQueue
and fix the implementation so that it supports
concurrent reads and writes. You should use use java.util.concurrent.locks.ReentrantLock
as shown in the Amphi (cf. chapter 5.2 of the poly).
Remember to unlock any locks after exiting a critical section. If you exit a section using return
, then you can guarantee
the unlocking by using the following pattern:
myLock.lock(); try { // critical section which may include: return 42; // or whatever } finally { myLock.unlock(); }
This works because the finally
block is always executed after entering its corresponding try
.
Finally, modify Test.getQueue()
to return a LockedListQueue
instead of a ListQueue
and rerun
Test
. You should see the two images displayed correctly.
Submit LockedListQueue.java.
Now consider the following task graph, built in Test.test3P1C()
, that adds an additional source, increasing the number of
concurrent writes to the queue.
Running with LockedListQueue
will appear to be apparently correct, but this is deceiving. Assuming all threads run at the
same speed and only one thread per node, the number of additions to the queue of the operator
will eventually far exceed the
number of removals, so the queue will grow unboundedly. Eventually the Java virtual machine will throw an OutOfMemoryException
— by this time, the program may have become unresponsive, so click on the red “stop” button in Eclipse to kill the program..
An obvious solution would be to bound the size of the input queues, using a technique called circular buffers («buffer circulaire»). A simple implementation is
provided in data.BoundedQueue
. For example, to test it with a queue of size 5, we would modify Test.getQueue()
to
return new data.BoundedQueue(5)
.
It should be obvious that the implementation of BoundedQueue
is not safe for concurrent accesses. Indeed, when you run
Test.test3P1C()
, you will see plenty of errors.
Duplicate BoundedQueue
to data.LockedBoundedQueue
that fixes the add()
(and, if necessary, the
remove()
) methods. Explain in a comment what the problem is with BoundedQueue
, how your solution fixes the
problem, and an informal proof of its correctness.
Remember to test your code by running Test
several times (at least 10). A single attempt may not trigger any
synchronization errors.
Submit LockedBoundedQueue.java.
Depending on how you implemented the locked versions of the above queues, you may observe that the queue is locked far too often. The
getters and putters of the queue work in different portions of the array and there is no need for add()
s to block
remove()
s if they are non-interfering.
In this exercuse you will use what you have seen of the Java memory model in TD 3a and the atomic primitives compare-and-swap
(CAS) and get-and-set (you saw these in Amphi 3) to implement a variant of ListQueue
that does not use any locks at
all. Instead, we will use the fact that java.util.concurrent.atomic.AtomicReference
gives us access to references that support CAS. This class is explained in detail in chapter 4.2 of the poly. Here are the
operations of importance for an AtomicReference<E>
:
E get()
- Returns the current value of the reference. Note that the current value may become stale immediately afterwards, but it is guaranteed to be a value that everyone agrees was present in the past as long as you only use the next two operations to set the reference.
E getAndSet(E newValue)
- Atomically gets the old value of the reference and sets it to
newValue
. Returns the old value.boolean compareAndSet(E oldValue, E newValue)
- Atomically compares the current value to
oldValue
(using==
, which checks that the underlying Java objects are physically identical in memory), and if so updates it tonewValue
in the same atomic step. Returnstrue
if and only if the reference was modified.
Create a variant of ListQueue
named AtomicListQueue
that uses AtomicReference
s to the
first
and last
cells of the list instead of ordinary references. Note that you should not use any locks or the
synchronized
keyword of Java in this exercise.
volatile
.do { // (re)load old value oldVal = atomicRef.get(); // compute new value, possibly using oldVal newVal = ...; } while(!atomicRef.compareAndSet(oldVal, newVal));In this way, threads will race with each other to modify
atomicRef
, and only the thread that commits the change will be
able to exit the loop.
add
s and remove
s should never race to update the same atomic reference. The only safe races are
add
s with other add
s, and remove
s with other remove
s. (Can you see why?)add()
Create the new last element, and then use getAndSet()
to update the last
reference atomically.
If multiple putters try to add()
at the same time, they will all succeed in an arbitrary order.
remove()
Use compareAndSet()
to move the first
reference to its own next
field. For the
thread that successfully moves the reference, the message to remove is the one in the next.data
field of the old contents of
first
. Note that first
always points to a node before the one with the next element in the queue, set to
a dummy sentinel
element initially. Remember to set the data
field of the Cell
to null
after you remove the element! This will avoid a space leak in some cases and is important for the correctness of
isEmpty()
.
isEmpty()
Since you are never asked to add null
s to the queue, and after a remove()
the
data
field is set to null
, you can just write a simple loop that tries to move this.first
(using
compareAndSet()
) as long as next
is not null
and data
is null
.
Then, the queue is empty iff this.first
points to a Cell
with next == null
. Note that this will not
be a O(1) procedure!
(Can you see why there may be multiple Cells
with data
fields set to null
?)
If you are especially adventurous, make sure that add()
and remove()
delete Cell
s with
data
fields set to null
.
This is a challenging exercise. You may find it useful to draw parallel timelines on a piece of paper to make sure that you
respect the order of events. Every queue operation is a barrier – all threads must globally agree on the order of successful
add
s (and also on the order of successful remove
s).
Submit AtomicListQueue.java
In the next TD, we will relax the assumption that each node has exactly a single thread. Here is what you can look forward to:
Processor
nodes.Executor
s and use it to create thread pools of dozens or hundreds of threads. This
will allow us to gerneralize from acyclic task graphs to cyclic task graphs.BoundedQueue
. This is very challenging even
for experts!