Threads
par Karthikeyan Bhargavan, Kaustuv Chaudhuri

 Login :  Mot de passe :

In this TD, we will learn how to use Java threads to perform simple concurrent computations. We will use the java.lang.Thread class to program our threads. We will learn to launch, interrupt, and compose threads. We will, however, not worry too much about synchronization or race conditions, which will be addressed in the next TD.

You are given a file, titles.txt, containing all the page titles in the French Wikipedia. Your goal is to write a multi-threaded program that searches through these titles and presents the results.

Packages and Input Files

To begin with, create a new project TD1, download td1.zip and unzip within the TD1 project. Remember to refresh with “F5” within Eclipse.

The search package contains two classes, Searcher and Manager, which you will modify.

The default package contains a single class Test for testing.

You will be using the input file titles.txt containing Wikipedia page titles from December 13, 2014. If you are working on the workstations in the lab, it is not necessary for you to download this file: it is already installed. If you are working on your own computer, download titles.txt and save it in the project TD1 and modify Test.java accordingly. You can download updated versions of this file directly from WikiMedia, but then the example output in this TD will not necessarily match up.

To read the input file, we will use java.io.BufferedReader which is safe to use from multiple threads. You will only need to use the readLine() method from which reads a line and returns it as a string.

To store the results of our programs, we will use java.util.concurrent.LinkedBlockingQueue, which implements the java.util.Queue interface and is safe to use from multiple threads, and java.util.HashMap.

In this TD, all our threads will be written as classes that extend the java.lang.Thread class. Each thread class must override the run() method that is executed when the thread is started. An alternative way to program threads is to implement the java.lang.Runnable interface, as shown in the lectures.

A Note on Java Generics

The container classes we will use in this TD will use Generics in Java that allows for a kind of parametric polymorphism. (If you have seen languages like OCaml or Haskell before, this is very similar to type parameters.) Here is a quick overview of how to use generics in Java 7.

Programming with Threads

Searcher thread: start, sleep

The first task is to write a class that searches through the input for titles that contain a given substring. The class search.Searcher extends Thread as follows:

public class Searcher extends Thread {
    public static final String EOF = "--EOF--";

    private final BufferedReader data;
    private final String query;
    private final LinkedBlockingQueue<String> result;
    private int processedItems = 0;
    private int matchesFound = 0;

    public Searcher(BufferedReader data,
                    String query,
                    LinkedBlockingQueue<String> result) {...}
    public void printStatus() {...}
    public void run() {
      // your code
    }
}

This class contains the following fields:

The constructor initializes the first three fields, and the printStatus() method summarizes the current state of the search. These have been written already and you do not need to modify them.

In this exercise, you must write the body of the run() method. The goal of the run() method is to read strings from data and, if they match the query, to add the strings to result. When the data has no more input, data.readLine() will return null and the run method should then add Searcher.EOF (which contains the immutable string "--EOF--") to result to indicate the end of the results and then return. If data.readLine() throws an IOException, your code must catch it, add Searcher.EOF to result and return. The run method should also update the processedItems and matchesFound fields as it processes data items.

Recall that you can check whether a string s contains a substring query using s.indexOf(query) >= 0.

The Searcher class is started from the Manager class, where you are already given sample code for the first exercise:

public static LinkedBlockingQueue<String> simpleSearch(BufferedReader data, String query, int num)
                                          throws InterruptedException
{
    LinkedBlockingQueue<String> result = new LinkedBlockingQueue<>();
    Searcher s = new Searcher(data, query, result);
    s.setName("searcher");
    s.start();
    Thread.sleep(100);
    s.printStatus();
    return result;
}

The method simpleSearch() is given:

The method creates a new LinkedBlockingQueue for the search results. It then creates an instance of Searcher, gives it a name (using setName()) and starts it (using start()). The Searcher thread s is now running concurrently with the Manager thread. The Manager thread—which is executing the simpleSearch() method—then sleeps for 100 milliseconds (ms) and then returns with whatever results have been obtained so far. When the search is running, we can print the current status of the Searcher object by calling s.printStatus().

When a thread is sleeping it may be interrupted by a concurrent thread (as we will see in exercise 4 below). Hence, Thread.sleep throws an InterruptedException. More generally, whenever one thread is waiting for another, it may be interrupted by an InterruptedException and this exception must either be handled or passed on to the caller.

Test your code by running Test. You will then see a result that looks like the following (generated by printStatus):

Thread "searcher" processed 606987 items and found 39 matching results.
(29401)_Asterix
Aquarium_(Asterix)
Asterix
Asterix_&_Obelix_:_Mission_Cleopatre
Asterix_&_Obelix_XXL
...

The exact number of results depends on how far the Searcher thread has worked by the time s.printStatus() is called after sleeping for 100ms of execution. This may differ from machine to machine. Note that the Searcher thread will continue running until it has computed all the results.

Submit Searcher.java.

Waiting for thread termination: isAlive, join

The simpleSearch() method above does not wait for the search to terminate. In this exercise, you will write a new search method, pollingSearch(), that uses s.isAlive() to check whether the Searcher thread is still running. In a loop, sleep for 100ms, print the current status (with s.printStatus()), and then check s.isAlive(); if it is false, exit the loop and return results. This kind of repeated checking of conditions is called polling.

Test your code by changing Manager.search() to call pollingSearch() instead of simpleSearch(). You will see an output like:

Thread "searcher" processed 623516 items and found 39 matching results.
Thread "searcher" processed 1544780 items and found 42 matching results.
Thread "searcher" processed 2278635 items and found 50 matching results.
Thread "searcher" processed 3269822 items and found 52 matching results.
Thread "searcher" processed 4126251 items and found 52 matching results.
Thread "searcher" processed 5519722 items and found 74 matching results.
Thread "searcher" processed 6694248 items and found 78 matching results.
Thread "searcher" processed 7003027 items and found 78 matching results.
(29401)_Asterix
Aquarium_(Asterix)
Asterix
Asterix_&_Obelix_:_Mission_Cleopatre
Asterix_&_Obelix_XXL
...
--EOF--

You should also test your code for other queries. For example, if you change the query string in Test.main to "Obelix" you should obtain all 20 titles that contain "Obelix".

A second way to wait for a thread s to terminate is to use the s.join() method. This method waits (blocks) until the thread s has terminated and then returns. This avoids the need for a loop waiting for the thread to terminate and is more suitable than s.isAlive when the current thread has nothing else to do while it waits.

Write a method waitingSearch() in Manager that uses s.join() instead of s.isAlive() to test whether the search has completed. Test your code by changing Manager.search() to call waitingSearch() instead of pollingSearch(). You will see on the output window something like:

Thread "searcher" processed 7003027 items and found 78 matching results.
(29401)_Asterix
...
--EOF--

Submit Manager.java.

Pipelining between two threads

The search methods so far wait until all the results have been obtained before printing them. In this exercise, we will use a separate thread to print the results as they are being produced by the Searcher thread.

Write a class Printer in the seach package as follows:

package search;
public class Printer extends Thread {
    LinkedBlockingQueue<String> in;
    int items;

    public Printer(LinkedBlockingQueue<String> in) {...}
    public void printStatus() {...}
    public void run() {...}
}

The class contains two fields:

Implement the constructor Printer that initializes the two fields, a method run() that reads strings from the queue (using in.take()) and prints them on the standard output, and a method printStatus() that prints out the current status of the thread (similar to printStatus() in the Searcher class). The run() method should terminate when the string read from the queue is Searcher.EOF (remember to use the equals() method to compare strings); until then it keeps waiting for new input. Note that the take() method of LinkedBlockingQueue may throw an InterruptedException if the current thread is interrupted while take is waiting for the next input; so, the run method should catch this exception and return.

Write a new method pipelinedSearch in Manager that starts a Searcher thread s and a Printer thread p, constructing both with the same LinkedBlockingQueue. This method should wait till s and p both terminate (using s.join() and p.join()) and finally call the printStatus() method of both threads.

Test your code by running pipelinedSearch() instead of waitingSearch(). If your implementation is correct, you should not see any output after the output produced by s.printStatus() and p.printStatus().

...
Asterixvader
M2Asterix
Asterix&ObelixXXL.jpg
Asterix_Kieldrecht_logo.jpg
Utilisateur_Asterix
Thread "searcher" processed 7003027 items and found 78 matching results.
Thread "printer" processed 78 items.

Submit Printer.java.

Submit Manager.java.

Interrupting a thread: interrupt, interrupted

When we want to wait for threads to finish their computation, we use the join() method as shown above. Sometimes we may want to stop a thread before it has finished running. To do this, we use the interrupt() method to set the interrupted status on the thread. Note that this does not actually stop the execution of the thread; the thread needs to call Thread.interrupted() to check if its interrupted status has been set.

Our goal in this exercise is to terminate all threads when the number of results exceeds the desired value num. For example, if the user asks for only 10 results, both the Searcher and Printer are terminated after 10 matching titles have been found.

Modify the Printer class to add an integer field num, and add a new constructor that takes the initial value of num as an additional parameter. Modify run so that it terminates (returns) once items >= num.

Write a method interruptingSearch in Manager that starts one Searcher thread s and one Printer thread p. It waits for the Printer thread to terminate using p.join(), and then it terminates the thread s using s.interrupt(). After calling interrupt() it must wait (using s.join()) for the thread to terminate.

You must also modify the method run() method in Searcher to check the interrupted status and terminate if it is set. In the main loop of run(), call Thread.interrupted(): if this returns true, the run method immediately returns, hence terminating the Searcher thread. In addition, if the thread has been interrupted during a call to readLine(), then an InterruptedIOException (a subclass of IOException) is thrown by readLine().

Hence, we have programmed two different ways of terminating threads. The printer thread stops when an internal condition (set by its constructor) is satisfied or when it reads Searcher.EOF in its input queue, whereas the searcher thread responds to an external interrupt. Test your code by changing Manager.search() to call interruptingSearch(), and by changing the number of desired results, say to 10. You will see a result like:

...
Asterix_(satellite)
Asterix_Kieldrecht
Asterix_and_The_Great_Rescue
Asterix_aux_Jeux_Olympiques
Asterix_aux_Jeux_Olympiques_(film)
Thread "searcher" processed 241798 items and found 39 matching results.
Thread "printer" processed 10 items.

Submit Printer.java.

Submit Manager.java.

Submit Searcher.java.

Managing multiple searcher threads

Write a method concurrentSearch() that creates two Searcher threads with the same arguments. They both read data from the same BufferedReader and produce results in the same LinkedBlockingQueue. Your method should then create a Printer thread to print the search results, start all three threads, wait for the printer to terminate, and then interrupt both searcher threads. Finally, print all their statuses.

Test your code by changing Manager.search() to call concurrentSearch() (again limiting the number of desired results to 10). You will see a result like:

...
Asterix_(satellite)
Asterix_and_The_Great_Rescue
Asterix_aux_Jeux_Olympiques
Asterix_aux_Jeux_Olympiques_(film)
Asterix_chez_Rahazade
Thread "searcher1" processed 131081 items and found 19 matching results.
Thread "searcher2" processed 110214 items and found 20 matching results.
Thread "printer" processed 10 items.

Submit Manager.java.

Map-Reduce Application: Counting the frequency of matches

We will now use the various thread synchronization primitives we have learned above to implement a multi-threaded application that uses the Map-Reduce pattern that we will revisit in much more detail later in the course and in a future TD. It starts n “Map” threads that transform input data into intermediate results, and then uses 1 “Reduce” thread to transform the intermediate results into the final result.

The goal of the application is to count the number of occurrences (frequency) of a list of strings in our Wikipedia title dataset. For example, given the list [Asterix, Obelix] we would like to compute the number of titles that include the string "Asterix" and the number of titles that include "Obelix".

Write a class MapFrequency similar to Searcher that extends Thread, and instead of a single query string accepts an array of query strings queries:

package search;
public class MapFrequency extends Thread {
    private BufferedReader data;
    private String[] queries;
    private LinkedBlockingQueue<String> result;
    private int processedItems;
    private int matchesFound;

    public MapFrequency(BufferedReader data, String[] queries, LinkedBlockingQueue<String> result) {
      // your code
    }

    public void printStatus() {
      // your code
    }

    @Override
    public void run() {
      // your code
    }
}

The run() method reads a line from data and matches it against each string in queries; whenever it finds a match, it adds the matched query string to result. So, for example, if queries is the array {"Asterix","Obelix"}, then:

Write a class ReduceFrequency similar to Printer that extends Thread, but, instead of printing the input strings, it counts the number of occurrences of each input string using a Map (implemented for example using HashMap, initialized in the constructor).

package search;
public class ReduceFrequency extends Thread {
    private LinkedBlockingQueue<String> in;
    public Map<String,Integer> count;
    private int items;

    public ReduceFrequency(LinkedBlockingQueue<String> in) {
      // your code
    }

    public void printStatus() {
      // your code
    }

    @Override
    public void run() {
      // your code
    }
}

In the run() method, read a string from in and add 1 to its corresponding entry in count; if the string does not occur in count it adds a new entry initialized to 1.

Write a method count() in the Manager class:

public static Map<String,Integer> count(BufferedReader data, String[] queries){
  ...
}

This method should create two MapFrequency threads and a single ReduceFrequency thread, suitably pipelined. When both MapFrequency threads have terminated, and all their results have been processed by ReduceFrequency, it returns the count map from the ReduceFrequency thread.

To test this, modify the main() method in Test by uncommenting the code for the Map-Reduce application, and commenting the rest. You should see output like this.

Thread "map1" processed 3482055 items and found 58 matching results.
Thread "map2" processed 3520972 items and found 40 matching results.
Thread "reduce" processed 98 items.
"Obelix" occurs 20 time(s)
"Asterix" occurs 78 time(s)

The result may differ depending on how your program processes titles that match more than one query, but the frequency count should remain the same.

To make MapFrequency more efficient, you may consider increasing the number of MapFrequency threads to use all the processors on your machine (see java.lang.Runtime.getRuntime().availableProcessors()). You may also consider compiling the set of queries into a Pattern (see java.util.regex.Pattern, especially the methods compile() and matcher()).

Submit MapFrequency.java.

Submit ReduceFrequency.java.

Submit Manager.java.

Extra

Note: Avoiding Race Conditions

The early part of this TD uses some concurrent programming patterns that are not safe in general and may lead to unpredictable behavior. For example, in the first two questions we call the printStatus method on a running thread, and this may cause a race condition on the local variables of the thread (e.g. matchesFound), when they are read and updated concurrently. In general, threads should only communicate via safe mechanisms like join, interrupt and exchange messages through thread-safe data structures such as LinkedBlockingQueue. In the next TD, we shall learn how to implement our own concurrent data structures using various synchronization mechanisms.