Threads
par Karthikeyan Bhargavan

In this TD, we learn how to use Java threads to perform simple concurrent computations. We will use the Thread class to program our threads. We will learn to launch, interrupt, and compose threads. We will, however, not worry too much about synchronization or race conditions, which will be addressed in the next TD.

You are given a file containing all the page titles in the French Wikipedia. Your goal is to write a multi-threaded program that searches through these titles and presents the results.

Packages and Input Files

To begin with, create a new project TD15, download td15.zip and unzip within the TD15 project. (If your project has a src folder, then unzip td.zip within this folder.) This will create a package search and add a file to the default package (remember to refresh "F5" within Eclipse).

The search package contains two classes, Searcher and Manager, which you will modify.

The default package contains a single class Test for testing.

You will be using the input file titles.txt containing wikipedia page titles from March 17, 2011. If you are working on the workstations in the lab, it is not necessary for you to download this file, it is already installed. If you are working on your own computer, download this file from here and save it in the project TD15 and modify Test.java accordingly. (You can download updated versions of this file and other Wikipedia files from here.)

To read the input file, we will use the BufferedReader class which is safe to use from multiple threads. You will only need to use the readLine() method from this class which reads a line and returns it as a string.

To store the results of our programs, we will use the classes LinkedBlockingQueue (which is similar to the regular Queue but safe to use from multiple threads) and HashMap.

In this TD, all our threads will be written as classes that extend the Thread class. Each thread class must override the run method that is executed when the thread is started. (An alternative way to program threads is to implement the Runnable interface, as shown in the lectures.)

Programming with Threads

Searcher thread: start, sleep

The first task is to write a class that searches through the datafile for titles that contain a given substring. The class Searcher extends Thread as follows:

public class Searcher extends Thread {
        private final BufferedReader data;
        private final String query;
        private final LinkedBlockingQueue<String> result;
        private int processedItems = 0;
        private int matchesFound = 0;
        
        public Searcher(BufferedReader data, String query, LinkedBlockingQueue<String> result){...}
        public void printStatus() {}
	public void run(){...}
}

The class contains five fields: a BufferedReader data, a query string, a LinkedBlockingQueue result, and two integers processedItems (the total number of items read so far) and matchesFound (the total number of matches found so far).

In this exercise, you must write the body of the run method. The goal of the run method is to read strings from data and, if they match the query, to add the strings to the result (using result.add(...)). When the BufferedReader data has no more data, data.readLine() will return null and the run method adds a string "--EOF--" to the result (to indicate the end of the results) and then returns. If data.readLine() throws an IOException, your code must catch it, add string "--EOF--" to the result and return. The run method should also update the processedItems and matchesFound fields as it processes data items.

Recall that you can check whether a string s contains a substring query using s.indexOf(query) >= 0.

The Searcher class is started from the Manager class, where you are already given sample code for the first exercise:

public static LinkedBlockingQueue<String> search1(BufferedReader data, String query, int num)
        throws InterruptedException{
                LinkedBlockingQueue<String> result = new LinkedBlockingQueue<String>();
                Searcher s = new Searcher(data,query,result);
                s.setName("1");
                s.start();
                Thread.sleep(100);
                s.printStatus();
                return result;
        }

The method search1 is given a BufferedReader, a query string, and the maximum number num of results desired by the user (ignore num for now, it will be used later). The method body creates a new LinkedBlockingQueue for the search results. It then creates a new Searcher, gives it a name (s.setName("1")) and starts it (s.start()). The Searcher thread s is now running in parallel. The method search1 then sleeps for 100ms and then returns with whatever results have been obtained so far. When the search is running, we can print the current status of the Searcher thread by calling s.printStatus().

When a thread is sleeping it may be interrupted by a concurrent thread (as we will see in exercise 4 below). Hence, Thread.sleep throws an InterruptedException. More generally, whenever one thread is waiting for another, it may be interrupted by an InterruptedException and this exception must either be handled or passed on to the caller.

Test your code by running Test (you can also run the code from the command-line, see the commented out code in Test.java). You will then see a result that looks like the following (generated by printStatus):

Thread 1 processed 296756 items and found 43 matching results.
(29401)_Asterix
Aquarium_(Asterix)
Asterix
Asterix_&_Obelix_:_Mission_Cleopatre
Asterix_&_Obelix_XXL
...
The exact number of results depends on how far the Searcher thread has worked within the 100 first ms of execution; this may differ from machine to machine.

Déposez Searcher.java.

Waiting for thread termination: isAlive, join

The method search1 above does not wait for the search to terminate. In this exercise, you will write a new search method search2 that uses s.isAlive() to check whether the Searcher thread is still running: if yes, print the current status (by calling printStatus, sleep for 100ms, and try again; if no, then return the results. Ensure to display the final status once the search has been completed.

Test your code by changing Manager.search to call search2 instead of search1. You will see on the output window something like:

 
Thread 1 processed 281075 items and found 43 matching results.
Thread 1 processed 822125 items and found 43 matching results.
Thread 1 processed 1352803 items and found 44 matching results.
Thread 1 processed 1921918 items and found 53 matching results.
Thread 1 processed 2462644 items and found 53 matching results.
Thread 1 processed 2652605 items and found 54 matching results.
Thread 1 processed 2652605 items and found 54 matching results.
(29401)_Asterix
Aquarium_(Asterix)
Asterix
Asterix_&_Obelix_:_Mission_Cleopatre
Asterix_&_Obelix_XXL
...
--EOF--

You should also test your code for other queries. For example, if you change the query string in Test.main to "Obelix" you should obtain all the 9 titles that contain "Obelix". Note that you can also test the program from the terminal window by uncommenting the indicated portions of the Test.main function.

A second way to wait for a thread s to terminate is to use the s.join() method. The method s.join() waits (blocks) until the thread s has terminated and then returns. This avoids the need for a loop waiting for the thread to terminate and is more suitable than s.isAlive when the current thread has nothing else to do while it waits.

Write a method search2b that uses join instead of isAlive to test whether the search has been completed.

Test your code by changing Manager.search to call search2b instead of search2. You will see on the output window something like:

Thread 1 processed 2652605 items and found 54 matching results.

Déposez Manager.java.

Pipelining between two threads

The search methods so far wait till all the results have been obtained before printing them. In this exercise, we will use a Printer thread that prints the results as they are being produced by the Searcher thread.

Write a class Printer as follows:

package search;
public class Printer extends Thread {
    BlockingQueue<String> in;
    int items;
    public Printer(BlockingQueue<String> in){...}
    public void printStatus(){...}
    public void run(){...}
}
The class contains two fields, a LinkedBlockingQueue of strings, and the number of items processed so far. Implement the constructor Printer that initializes the two fields, a method run that reads strings from the queue (using in.take()) and prints them on the standard output, and a method printStatus that prints out the current status of the thread (similar to Searcher.printStatus). The run method terminates when the string read from the queue is "--EOF--"; until then it keeps waiting for new input. Note that the take method of LinkedBlockingQueue may throw an InterruptedException if the current thread is interrupted (by say the Manager thread) while take is waiting for the next input; so, the run method should catch this exception and return.

Write a new method search3 in Manager that starts a Searcher thread s and a Printer thread p, such that the LinkedBlockingQueue output of the Searcher is passed as input to Printer. This method should wait till s and p both terminate (using s.join and p.join) and finally print the status of the two threads.

Test your code by running search3 instead of search2b. Modify the main method in Test so that it no longer prints the result at the end. The program should now print results as they are found, and end by printing out the status of the two threads:

...
Liste_des_camps_romains_d'Asterix_le_gaulois
Liste_des_personnages_d'Asterix_le_Gaulois
Parc_Asterix
Petibonum_(Asterix)
Zaza_(personnage_d'Asterix)
Thread 1 processed 2652605 items and found 54 matching results.
Thread 2 processed 54 items.

Déposez Printer.java.

Déposez Manager.java.

Interrupting a thread: interrupt,interrupted

When we want to wait for threads to completely terminate, we use the join method as shown above. To terminate a thread while it is running, we will use interrupt in this question. Our goal is to terminate all threads when the number of results exceeds a desired value num. So, for example, if the user asks for only 10 results, the search is terminated after 10 matching titles have been found.

Modify the Printer class to add an integer field num, and add a new constructor that takes the initial value of num as an additional parameter. Modify run so that it terminates (returns) once items >= num.

Write a method search4 in Manager that starts one Searcher thread s and one Printer thread p. It waits for the Printer thread to terminate using p.join(), and then it terminates the thread s using s.interrupt(). After calling interrupt() it must wait (using s.join()) for the thread to terminate.

You must also modify the method run in the Searcher class to handle the new interrupt. In the main loop of run, check if the thread has been interrupted by calling Thread.interrupted(): if this returns true, the run method immediately returns, hence terminating the Searcher thread. In addition, if the thread has been interrupted during a call to readLine(), then an InterruptedIOException (a subclass of IOException) is thrown.

Hence, we have programmed two different ways of terminating threads. The printer thread stops when an internal condition (set by its constructor) is satisfied or when it reads the string "--EOF--" in its input queue, whereas the searcher thread responds to an external interrupt. Test your code by changing Manager.search to call search4, and by changing the number of desired results, say to 10. You will see a result like:

...
Asterix_(satellite)
Asterix_Kieldrecht
Asterix_and_The_Great_Rescue
Asterix_aux_Jeux_Olympiques
Thread 1 processed 216787 items and found 43 matching results.
Thread 2 processed 10 items.

Déposez Printer.java.

Déposez Manager.java.

Déposez Searcher.java.

Managing multiple searcher threads

Write a method search5 that creates two Searcher threads with the same arguments. They both read data from the same BufferedReader and produce results in the same LinkedBlockingQueue. Your method should then create a Printer thread to print the search results, start all three threads, wait for the printer to terminate, and then interrupt both searcher threads.

Test your code by changing Manager.search to call search5 (again limiting the number of desired results to 10). You will see a result like:

...
Asterix_Kieldrecht
Asterix_and_The_Great_Rescue
Asterix_aux_Jeux_Olympiques
Asterix_&_Obelix_XXL
Thread 1 processed 104627 items and found 21 matching results.
Thread 2 processed 111677 items and found 22 matching results.
Thread 3 processed 10 items.

Déposez Manager.java.

Map-Reduce Application: Counting the frequency of matches

We will now use the various thread synchronization primitives we have learned above to implement a multi-threaded application that uses the so-called Map-Reduce pattern. It starts n Map threads that transform input data into intermediate results, and then uses 1 Reduce thread to transform the intermediate data into the final result.

The goal of the application is to count the number of occurrences (frequency) of a list of strings in our Wikipedia title dataset. For example, given the list "Asterix,Obelix" we would like to compute the number of titles that include the string "Asterix" and the number of titles that include "Obelix".

Write a class MapFrequency similar to Searcher that extends Thread, and instead of a single query string accepts an array of query strings String[] queries:

package search;
public class MapFrequency extends Thread {
	BufferedReader data;
	String[] queries;
	LinkedBlockingQueue<String> result;
	int processedItems;
	int matchesFound;
        ...
}
The constructor should take parameters in this order: BufferedReader, String[] and LinkedBlockingQueue<String>.
Write a run method that reads a line from data and matches it against each string in queries; whenever it finds a match, it adds the matched query string to result. So, for example if queries is the array {"Asterix","Obelix"}, the result will be a sequence of "Asterix" and "Obelix", one for each occurrence of the corresponding string.

Write a class ReduceFrequency similar to Printer that extends Thread, and instead of printing the input strings, it counts the number of occurrences of each input string using a Map (implemented for example using HashMap).

package search;
public class ReduceFrequency extends Thread {
    LinkedBlockingQueue<String> in;
    Map<String,Integer> count;
    int items;
    ...
}
Write a run method that reads a string from in and adds 1 to its corresponding entry in count; if the string does not occur in count it adds a new entry initialized to 1.

Write a method count in the Manager class as follows:

public static Map<String,Integer> count(BufferedReader data, String[] queries){
  ...
}
The method creates two MapFrequency threads and a single ReduceFrequency thread. When both MapFrequency threads have terminated, and all their results have been processed by ReduceFrequency, it prints out the result:
Thread 1 processed 1143618 items and found 33 matching results.
Thread 2 processed 981751 items and found 30 matching results.
Thread 3 processed 63 items.
Asterix occurs 54 times.
Obelix occurs 9 times.
The result may differ depending on how your program processes titles that match more than one query, but the frequency count should remain the same.

To make MapFrequency more efficient, you may consider increasing the number of MapFrequency threads to use all the processors on your machine (see java.lang.Runtime.getRuntime().availableProcessors()). You may also consider compiling the set of queries into a Pattern (see java.util.regex.Pattern, especially the methods Pattern.compile(query) and p.matcher(s)).

Déposez MapFrequency.java.

Déposez ReduceFrequency.java.

Déposez Manager.java.

Extra

Note: Avoiding Race Conditions

The early part of this TD uses some concurrent programming patterns that are not safe in general and may lead to unpredictable behavior. For example, in the first two questions we call the printStatus method on a running thread, and this may cause a race condition on the local variables of the thread (e.g. matchesFound), when they are read and updated concurrently. In general, threads should only communicate via safe mechanisms like join, interrupt and exchange messages through thread-safe data structures such as LinkedBlockingQueue. In the next TD, we shall learn how to implement our own concurrent data structures using various synchronization mechanisms.