TD 3b – Synchronization 1 – Message Queues
by Philippe Chassignet, Kaustuv Chaudhuri

 Login :  Mot de passe :

Introduction

In this TD, which continues into TD4, we will successively write and experiment with several versions of shared message queues («files d'attente»), similar to the LinkedBlockingQueue from TD1. These will be used to develop a computing architecture called task dependency graphs, where a certain number of specialized nodes each perform a portion of the desired computation. The data will then be transmitted between nodes by message passing. The nodes are organized in a static matter to form a directed acyclic graph whose edges determine the sources and desitinations for the messages.

In order to to be asynchronous, each node contains an input message queue. Sending data to a node therefore consists of adding messages containing the data to its queue. The task of a node is to take the first message from its queue, perform the relevant processing of that data, and then to add it to the input queue of a different node. This task is infinite, that is, when its input queue is empty, the node waits for the arrival of a new message before continuing. Several nodes corresponding to the same “service” may share the same input queue.

We will implement this using different Java threads for each node. Each input queue will be accessed concurrently by the threads which add elements to it and by other threads that consume elements from it.

Our application will be the manipulation of images. Each message will serve to transmit a line of an image encoded as int ARGB pixels. This TD will not concern itself with efficient implementation of image manipulation algorithms, but it will serve to load the processors and to generate a number of operational messages for the message queues. It will also allow us to examine visually if our queues are performing correctly, in particular if certain messages are lost.

Warning: each execution of an incorrect program may give rise to different phenomena, including the absence of apparent errors. It is important to observe a number of correct executions before deciding if a test has succeeded.

Preparation

Start with the usual procedure.

If you did everything right, you will see the class Test3 in the default package, three other packages named nodes, data, and util, and a directory called imgs that contain a small number of test images.

Nodes

The class data.Message, which you don't have to modify, defines the structure of messages that will be sent from one node to another. In this TD we will focus on a subclass, data.LineMessage, that has the following fields.

The other classes of the data package concerns the implementation of waiting queues and will be introduced at the point where they are to be used in the exercises.

The abstract class nodes.Node forms the base of all nodes. It contains the following methods.

void addConnectionTo(Node dest)
Used to add dest to the collection of outgoing nodes of this.
void init()
This method, which will be modified in exercise 1, serves to initiate the task of the node.
void putInQueue(Message msg)
Adds msg to the input message queue of this. Note that this method is never called by this or by any node that outputs to this (can you see why?).
final void forward(Message msg)
This method, which cannot be overridden in subclasses, is used to send msg to all the outgoing nodes from this.

The other classes in the nodes package implement particular kinds of Node. Source and AnimatedSource nodes read files (which are supposed to be images) and output the lines of the images to their outgoing nodes. Display nodes reconstitute an image from the lines it received in its incoming message queue. The details of these three classes are not important for this TD. The remaining classes in this package will be introduced as they become relevant for the TD.

The method Test.processOneImage() is a very simple example of the use of Nodes. It constructs a single Source node (S) and a single Display node (D), and directly connects the former to the latter. The init() method of the Source is launched in a separate thread, and it uses the sendLines() and forward() methods to send the lines of the image to the Display node. The init() method of Display does not need to be started explicitly as it runs in the graphical interface thread of Java (which is different from the main execution thread). The following image summarizes processOneImage(), where nodes are labelled circles and messages being sent are represented with blue lines.

S D

One Thread per Node

In the method processTwoImages() of Test, we want to do the same thing as processOneImage() but for two images at once.

S1 S2 D1 D2

The second image is not displayed. Why?

The problem is that the method init() (which by default just calls run()) of Node never terminates. Indeed, it was specified as an infinite task that continually sends lines of an image even if there are no more lines to send. Obviously, we need to execute different source nodes in separate threads.

Modify the init() method of nodes.Node to launch the computation of the node in a separate Thread. Keep in mind that Node is an instance of java.lang.Runnable ; as you have seen in Amphi 1 and in chapter 2 of the Poly, one of the consturctors of java.lang.Thread can be used to create new threads from instances of Runnable. If in doubt, always consult the API documentation.

Run Test by suitably modifying its main() method, you should see the following output in the console:

source1 started
source2 started

Submit Node.java.

Message Queues

We will now write a number of variations on message queues that implement the interface data.MessageQueue. In the Test class, we will use the Test.getQueue() method to create the queues, so it suffices to just change the call to the constructor in this method to change the implementation of the queue used in a test.

We will also use a new kind of Node called a Processor which has the following fields:

Two implementations of message processors are provided: data.Identity and data.ChannelSelector.

data.Identity
Simply recreates the input message for its output, that is, it implements the identity operator.
data.ChannelSelector
Filters certain incoming messages based on its channel field. The channel to be selected is specified in the constructor for ChannelSelector.

We assume for now that every Processor has its own message queue. Therefore, every queue has a single thread that consumes its messages in its run() method. You may therefore assume that the size of the queue does not decrease between a call to isEmpty() and remove(). We will relax these assumptions in TD 4, and you will have to fix any code that relies on it.

Synchronizing Message Queue Accesses

In this exercise we will work with the following static task graph, constructed in Test.test2P1C().

S1 S2 I F1 F2 D1 D2

Two source nodes, source1 (S1 ) and source2 (S2 ), will both send messages to an identity node, operator (I). This node node will then forward its messages to two channel selector nodes, filter1 (F1 ) and filter2 (F2 ), each of which will in turn filter out a particular channel and send it to the corresponding display node, display1 (D1 ) or display2 (D2 ).

The most important element of the graph is the 2-producers-1-consumer shape of the first stage of the graph, where the same input message queue of operator is concurrently accessed by two source nodes. It is therefore important for the data structure used to construct message queues to allow for concurrent accesses.

You are provided a trivial and unsafe implementation, data.ListQueue, created using linked lists. If you modify Test.main() to call Test.test2P1C(), you may observe that:

Why do we see these errors? Because the linked list implementation is not designed for concurrent accesses. In our task graph, both sources can call the add() method simultaneously on the same list, causing one of the underlying concurrent writes to be lost. Likewise, the operator thread will call remove() simultaneously with the source threads calling add(), which causes a race condition.

Duplicate the data.ListQueue class to data.LockedListQueue and fix the implementation so that it supports concurrent reads and writes. You should use use java.util.concurrent.locks.ReentrantLock as shown in the Amphi (cf. chapter 5.2 of the poly).

Remember to unlock any locks after exiting a critical section. If you exit a section using return, then you can guarantee the unlocking by using the following pattern:

myLock.lock();
try {
  // critical section which may include:
  return 42; // or whatever
} finally {
  myLock.unlock();
}

This works because the finally block is always executed after entering its corresponding try.

Finally, modify Test.getQueue() to return a LockedListQueue instead of a ListQueue and rerun Test. You should see the two images displayed correctly.

Submit LockedListQueue.java.

Bounded Queues with Active Waiting

Now consider the following task graph, built in Test.test3P1C(), that adds an additional source, increasing the number of concurrent writes to the queue.

S1 S2 S3 I F1 F2 D1 D2

Running with LockedListQueue will appear to be apparently correct, but this is deceiving. Assuming all threads run at the same speed and only one thread per node, the number of additions to the queue of the operator will eventually far exceed the number of removals, so the queue will grow unboundedly. Eventually the Java virtual machine will throw an OutOfMemoryException — by this time, the program may have become unresponsive, so click on the red “stop” button in Eclipse to kill the program..

An obvious solution would be to bound the size of the input queues, using a technique called circular buffers («buffer circulaire»). A simple implementation is provided in data.BoundedQueue. For example, to test it with a queue of size 5, we would modify Test.getQueue() to return new data.BoundedQueue(5).

It should be obvious that the implementation of BoundedQueue is not safe for concurrent accesses. Indeed, when you run Test.test3P1C(), you will see plenty of errors.

Duplicate BoundedQueue to data.LockedBoundedQueue that fixes the add() (and, if necessary, the remove()) methods. Explain in a comment what the problem is with BoundedQueue, how your solution fixes the problem, and an informal proof of its correctness.

Remember to test your code by running Test several times (at least 10). A single attempt may not trigger any synchronization errors.

Submit LockedBoundedQueue.java.

Optional Extra: Atomic (Lock-Free) Queues

Depending on how you implemented the locked versions of the above queues, you may observe that the queue is locked far too often. The getters and putters of the queue work in different portions of the array and there is no need for add()s to block remove()s if they are non-interfering.

In this exercuse you will use what you have seen of the Java memory model in TD 3a and the atomic primitives compare-and-swap (CAS) and get-and-set (you saw these in Amphi 3) to implement a variant of ListQueue that does not use any locks at all. Instead, we will use the fact that java.util.concurrent.atomic.AtomicReference gives us access to references that support CAS. This class is explained in detail in chapter 4.2 of the poly. Here are the operations of importance for an AtomicReference<E>:

E get()
Returns the current value of the reference. Note that the current value may become stale immediately afterwards, but it is guaranteed to be a value that everyone agrees was present in the past as long as you only use the next two operations to set the reference.
E getAndSet(E newValue)
Atomically gets the old value of the reference and sets it to newValue. Returns the old value.
boolean compareAndSet(E oldValue, E newValue)
Atomically compares the current value to oldValue (using ==, which checks that the underlying Java objects are physically identical in memory), and if so updates it to newValue in the same atomic step. Returns true if and only if the reference was modified.

Create a variant of ListQueue named AtomicListQueue that uses AtomicReferences to the first and last cells of the list instead of ordinary references. Note that you should not use any locks or the synchronized keyword of Java in this exercise.

General hints
Hint about add()

Create the new last element, and then use getAndSet() to update the last reference atomically. If multiple putters try to add() at the same time, they will all succeed in an arbitrary order.

Hint about remove()

Use compareAndSet() to move the first reference to its own next field. For the thread that successfully moves the reference, the message to remove is the one in the next.data field of the old contents of first. Note that first always points to a node before the one with the next element in the queue, set to a dummy sentinel element initially. Remember to set the data field of the Cell to null after you remove the element! This will avoid a space leak in some cases and is important for the correctness of isEmpty().

Hint about isEmpty()

Since you are never asked to add nulls to the queue, and after a remove() the data field is set to null, you can just write a simple loop that tries to move this.first (using compareAndSet()) as long as next is not null and data is null. Then, the queue is empty iff this.first points to a Cell with next == null. Note that this will not be a O(1) procedure!

(Can you see why there may be multiple Cells with data fields set to null?)

If you are especially adventurous, make sure that add() and remove() delete Cells with data fields set to null.

This is a challenging exercise. You may find it useful to draw parallel timelines on a piece of paper to make sure that you respect the order of events. Every queue operation is a barrier – all threads must globally agree on the order of successful adds (and also on the order of successful removes).

Submit AtomicListQueue.java

Continuation: TD 4

In the next TD, we will relax the assumption that each node has exactly a single thread. Here is what you can look forward to: