In this TD, we will learn how to use Java threads to perform simple concurrent computations.
We will use the java.lang.Thread
class to program our threads. We will learn to launch, interrupt, and compose threads. We will,
however, not worry too much about synchronization or race conditions, which will be addressed in
the next TD.
You are given a file, titles.txt
, containing all the
page titles in the French Wikipedia. Your goal is to write a multi-threaded program that searches
through these titles and presents the results.
To begin with, create a new project TD1
, download td1.zip
and unzip within the TD1 project. Remember to refresh with “F5” within Eclipse.
The search
package contains two classes, Searcher
and
Manager
, which you will modify.
The default
package contains a single class Test
for testing.
You will be using the input file titles.txt
containing Wikipedia page titles from
December 13, 2014. If you are working on the workstations in the lab, it is not necessary for you
to download this file: it is already installed. If you are working on your own computer, download titles.txt
and save it in the project TD1
and
modify Test.java
accordingly. You can download updated versions of this file directly from
WikiMedia, but then the example output in this TD will not necessarily match up.
To read the input file, we will use java.io.BufferedReader
which is safe to use from multiple threads. You will only need to use the readLine()
method from which reads a line and returns it as a string.
To store the results of our programs, we will use java.util.concurrent.LinkedBlockingQueue
,
which implements the java.util.Queue
interface and is safe to use from multiple threads, and java.util.HashMap
.
In this TD, all our threads will be written as classes that extend the java.lang.Thread
class. Each thread class must override the run()
method that is executed when the
thread is started. An alternative way to program threads is to implement the java.lang.Runnable
interface, as shown in the lectures.
The container classes we will use in this TD will use Generics in Java that allows for a kind of parametric polymorphism. (If you have seen languages like OCaml or Haskell before, this is very similar to type parameters.) Here is a quick overview of how to use generics in Java 7.
T
, we write
<T>
following the class name. Then, in the body of the class, we can use
T
just like any another class. For example:
class Node<T> { private final T cur; private Node<T> next; public Node (T cur, Node<T> next) { this.cur = cur; this.next = next; } }
Integer
in place of
T
, we can use either of the following two forms.
Node<Integer> n = new Node<Integer>(42, null); Node<Integer> n = new Node<>(42, null);The second form, which lets you avoid repeating the class parameter
Integer
, was
added in Java 7. Note that the <>
in the second form is essential; if it is
omitted, we would get an unchecked conversion warning and the generated code will be
considerably slower.
Node
.
public static <T> Node<T> makeNode(T thing) { return new Node<T>(thing, null); }Such a function can be invoked with:
Node.makeNode(42)
; most of the time the
class parameters will be inferred. We will not need polymorphic methods in this TD, but they are
good to know about.
start
, sleep
The first task is to write a class that searches through the input for titles that contain a
given substring. The class search.Searcher
extends Thread
as
follows:
public class Searcher extends Thread { public static final String EOF = "--EOF--"; private final BufferedReader data; private final String query; private final LinkedBlockingQueue<String> result; private int processedItems = 0; private int matchesFound = 0; public Searcher(BufferedReader data, String query, LinkedBlockingQueue<String> result) {...} public void printStatus() {...} public void run() { // your code } }
This class contains the following fields:
data
, which contains the input. Use the readLine()
method to read a single line of input.query
, which is the string being searched for.result
, which stores the results as they are found. Use the add()
method to add new results.processedItems
, which is the total number of items read from data
so far in the current instance of the class.matchesFound
, which is the total number of matches found so far in the current
instance of the class.The constructor initializes the first three fields, and the printStatus()
method
summarizes the current state of the search. These have been written already and you do not need
to modify them.
In this exercise, you must write the body of the run()
method. The goal of the
run()
method is to read strings from data
and, if they match the query,
to add the strings to result
. When the data
has no more input,
data.readLine()
will return null
and the run
method should
then add Searcher.EOF
(which contains the immutable string "--EOF--"
)
to result
to indicate the end of the results and then return. If
data.readLine()
throws an IOException
,
your code must catch it, add Searcher.EOF
to result
and return. The
run
method should also update the processedItems
and
matchesFound
fields as it processes data items.
Recall that you can check whether a string s
contains a substring
query
using s.indexOf(query)
>= 0
.
The Searcher
class is started from the Manager
class, where you are
already given sample code for the first exercise:
public static LinkedBlockingQueue<String> simpleSearch(BufferedReader data, String query, int num) throws InterruptedException { LinkedBlockingQueue<String> result = new LinkedBlockingQueue<>(); Searcher s = new Searcher(data, query, result); s.setName("searcher"); s.start(); Thread.sleep(100); s.printStatus(); return result; }
The method simpleSearch()
is given:
data
, which contains the lines of input.query
, the string being searched for.num
, which is the maximum number of results desired by the user (ignore this
argument for now).The method creates a new LinkedBlockingQueue
for the search results. It then creates an instance of Searcher
, gives it a name
(using setName()
)
and starts it (using start()
).
The Searcher
thread s
is now running concurrently with the
Manager
thread. The Manager
thread—which is executing the
simpleSearch()
method—then sleeps for 100 milliseconds (ms) and then returns with
whatever results have been obtained so far. When the search is running, we can print the current
status of the Searcher
object by calling s.printStatus()
.
When a thread is sleeping it may be interrupted by a concurrent thread (as we will see in
exercise 4 below). Hence, Thread.sleep
throws an InterruptedException
.
More generally, whenever one thread is waiting for another, it may be interrupted by an
InterruptedException
and this exception must either be handled or passed on to the
caller.
Test your code by running Test
. You will then see a result that looks like the
following (generated by printStatus
):
Thread "searcher" processed 606987 items and found 39 matching results. (29401)_Asterix Aquarium_(Asterix) Asterix Asterix_&_Obelix_:_Mission_Cleopatre Asterix_&_Obelix_XXL ...
The exact number of results depends on how far the Searcher
thread has worked by
the time s.printStatus()
is called after sleeping for 100ms of execution. This may
differ from machine to machine. Note that the Searcher
thread will continue running
until it has computed all the results.
Submit Searcher.java
.
isAlive
, join
The simpleSearch()
method above does not wait for the search to terminate. In
this exercise, you will write a new search method, pollingSearch()
, that uses
s.isAlive()
to check whether the Searcher
thread is still running. In a loop, sleep for 100ms,
print the current status (with s.printStatus()
), and then check
s.isAlive()
; if it is false
, exit the loop and return
results
. This kind of repeated checking of conditions is called
polling.
Test your code by changing Manager.search()
to call pollingSearch()
instead of simpleSearch()
. You will see an output like:
Thread "searcher" processed 623516 items and found 39 matching results. Thread "searcher" processed 1544780 items and found 42 matching results. Thread "searcher" processed 2278635 items and found 50 matching results. Thread "searcher" processed 3269822 items and found 52 matching results. Thread "searcher" processed 4126251 items and found 52 matching results. Thread "searcher" processed 5519722 items and found 74 matching results. Thread "searcher" processed 6694248 items and found 78 matching results. Thread "searcher" processed 7003027 items and found 78 matching results. (29401)_Asterix Aquarium_(Asterix) Asterix Asterix_&_Obelix_:_Mission_Cleopatre Asterix_&_Obelix_XXL ... --EOF--
You should also test your code for other queries. For example, if you change the query string
in Test.main
to "Obelix"
you should obtain all 20 titles that contain
"Obelix"
.
A second way to wait for a thread s
to terminate is to use the s.join()
method.
This method waits (blocks) until the thread s
has terminated and then returns. This
avoids the need for a loop waiting for the thread to terminate and is more suitable than
s.isAlive
when the current thread has nothing else to do while it waits.
Write a method waitingSearch()
in Manager
that uses
s.join()
instead of s.isAlive()
to test whether the search has
completed. Test your code by changing Manager.search()
to call
waitingSearch()
instead of pollingSearch()
. You will see on the output
window something like:
Thread "searcher" processed 7003027 items and found 78 matching results. (29401)_Asterix ... --EOF--
Submit Manager.java
.
The search methods so far wait until all the results have been obtained before printing them.
In this exercise, we will use a separate thread to print the results as they are being produced
by the Searcher
thread.
Write a class Printer
in the seach
package as follows:
package search; public class Printer extends Thread { LinkedBlockingQueue<String> in; int items; public Printer(LinkedBlockingQueue<String> in) {...} public void printStatus() {...} public void run() {...} }
The class contains two fields:
in
, which is a queue of results.items
, which is a count of the number of items printed by this thread.Implement the constructor Printer
that initializes the two fields, a method
run()
that reads strings from the queue (using in.take()
)
and prints them on the standard output, and a method printStatus()
that prints out
the current status of the thread (similar to printStatus()
in the
Searcher
class). The run()
method should terminate when the string read
from the queue is Searcher.EOF
(remember to use the equals()
method to compare strings); until then it keeps waiting for new input. Note that the
take()
method of LinkedBlockingQueue
may throw an
InterruptedException
if the current thread is interrupted while take
is
waiting for the next input; so, the run
method should catch this exception and
return.
Write a new method pipelinedSearch
in Manager
that starts a
Searcher
thread s
and a Printer
thread p
,
constructing both with the same LinkedBlockingQueue
. This method should wait till
s
and p
both terminate (using s.join()
and
p.join()
) and finally call the printStatus()
method of both
threads.
Test your code by running pipelinedSearch()
instead of
waitingSearch()
. If your implementation is correct, you should not see any output
after the output produced by s.printStatus()
and p.printStatus()
.
... Asterixvader M2Asterix Asterix&ObelixXXL.jpg Asterix_Kieldrecht_logo.jpg Utilisateur_Asterix Thread "searcher" processed 7003027 items and found 78 matching results. Thread "printer" processed 78 items.
Submit Printer.java
.
Submit Manager.java
.
interrupt
, interrupted
When we want to wait for threads to finish their computation, we use the join()
method as shown above. Sometimes we may want to stop a thread before it has finished running. To
do this, we use the interrupt()
method to set the interrupted status on the thread. Note that this does not actually
stop the execution of the thread; the thread needs to call Thread.interrupted()
to check if its interrupted status has been set.
Our goal in this exercise is to terminate all threads when the number of results exceeds the
desired value num
. For example, if the user asks for only 10 results, both the
Searcher
and Printer
are terminated after 10 matching titles have been
found.
Modify the Printer
class to add an integer field num
, and add a new
constructor that takes the initial value of num
as an additional parameter. Modify
run
so that it terminates (returns) once items >= num
.
Write a method interruptingSearch
in Manager
that starts one
Searcher
thread s
and one Printer
thread p
.
It waits for the Printer
thread to terminate using p.join()
, and then
it terminates the thread s
using s.interrupt()
.
After calling interrupt()
it must wait (using s.join()
) for the thread
to terminate.
You must also modify the method run()
method in Searcher
to check
the interrupted status and terminate if it is set. In the main loop of run()
, call
Thread.interrupted()
:
if this returns true, the run
method immediately returns, hence terminating the
Searcher
thread. In addition, if the thread has been interrupted during a call to
readLine()
, then an InterruptedIOException
(a subclass of
IOException
) is thrown by readLine()
.
Hence, we have programmed two different ways of terminating threads. The printer thread stops
when an internal condition (set by its constructor) is satisfied or when it reads
Searcher.EOF
in its input queue, whereas the searcher thread responds to an external
interrupt. Test your code by changing Manager.search()
to call
interruptingSearch()
, and by changing the number of desired results, say to 10. You
will see a result like:
... Asterix_(satellite) Asterix_Kieldrecht Asterix_and_The_Great_Rescue Asterix_aux_Jeux_Olympiques Asterix_aux_Jeux_Olympiques_(film) Thread "searcher" processed 241798 items and found 39 matching results. Thread "printer" processed 10 items.
Submit Printer.java
.
Submit Manager.java
.
Submit Searcher.java
.
Write a method concurrentSearch()
that creates two Searcher
threads
with the same arguments. They both read data from the same BufferedReader
and
produce results in the same LinkedBlockingQueue
. Your method should then create a
Printer
thread to print the search results, start all three threads, wait for the
printer to terminate, and then interrupt both searcher threads. Finally, print all their
statuses.
Test your code by changing Manager.search()
to call
concurrentSearch()
(again limiting the number of desired results to 10). You will
see a result like:
... Asterix_(satellite) Asterix_and_The_Great_Rescue Asterix_aux_Jeux_Olympiques Asterix_aux_Jeux_Olympiques_(film) Asterix_chez_Rahazade Thread "searcher1" processed 131081 items and found 19 matching results. Thread "searcher2" processed 110214 items and found 20 matching results. Thread "printer" processed 10 items.
Submit Manager.java
.
We will now use the various thread synchronization primitives we have learned above to implement a multi-threaded application that uses the Map-Reduce pattern that we will revisit in much more detail later in the course and in a future TD. It starts n “Map” threads that transform input data into intermediate results, and then uses 1 “Reduce” thread to transform the intermediate results into the final result.
The goal of the application is to count the number of occurrences (frequency) of a list of
strings in our Wikipedia title dataset. For example, given the list [Asterix, Obelix] we would
like to compute the number of titles that include the string "Asterix"
and the
number of titles that include "Obelix"
.
Write a class MapFrequency
similar to Searcher
that extends
Thread
, and instead of a single query string accepts an array of query strings
queries
:
package search; public class MapFrequency extends Thread { private BufferedReader data; private String[] queries; private LinkedBlockingQueue<String> result; private int processedItems; private int matchesFound; public MapFrequency(BufferedReader data, String[] queries, LinkedBlockingQueue<String> result) { // your code } public void printStatus() { // your code } @Override public void run() { // your code } }
The run()
method reads a line from data
and matches it against each
string in queries
; whenever it finds a match, it adds the matched query string to
result
. So, for example, if queries
is the array
{"Asterix","Obelix"}
, then:
"Notre_dame_de_Paris"
, nothing is inserted into
result
"Asterix_et_la_surprise_de_Cesar"
, only the string
"Asterix"
is inserted into result
."Comment_Obelix_est_tombe_dans_la_marmite_du_druide_quand_il_etait_petit"
, only
the string "Obelix"
is inserted into result
."Asterix_et_Obelix_contre_Cesar"
, both the string
"Asterix"
and the string "Obelix"
are inserted into
result
.Write a class ReduceFrequency
similar to Printer
that extends
Thread
, but, instead of printing the input strings, it counts the number of
occurrences of each input string using a Map
(implemented for example using
HashMap
, initialized in the constructor).
package search; public class ReduceFrequency extends Thread { private LinkedBlockingQueue<String> in; public Map<String,Integer> count; private int items; public ReduceFrequency(LinkedBlockingQueue<String> in) { // your code } public void printStatus() { // your code } @Override public void run() { // your code } }
In the run()
method, read a string from in
and add 1 to its
corresponding entry in count
; if the string does not occur in count
it
adds a new entry initialized to 1.
Write a method count()
in the Manager
class:
public static Map<String,Integer> count(BufferedReader data, String[] queries){ ... }
This method should create two MapFrequency
threads and a single
ReduceFrequency
thread, suitably pipelined. When both MapFrequency
threads have terminated, and all their results have been processed by
ReduceFrequency
, it returns the count
map from the
ReduceFrequency
thread.
To test this, modify the main()
method in Test
by uncommenting the
code for the Map-Reduce application, and commenting the rest. You should see output like
this.
Thread "map1" processed 3482055 items and found 58 matching results. Thread "map2" processed 3520972 items and found 40 matching results. Thread "reduce" processed 98 items. "Obelix" occurs 20 time(s) "Asterix" occurs 78 time(s)
The result may differ depending on how your program processes titles that match more than one query, but the frequency count should remain the same.
To make MapFrequency
more efficient, you may consider increasing the number of
MapFrequency
threads to use all the processors on your machine (see java.lang.Runtime.getRuntime().availableProcessors()
).
You may also consider compiling the set of queries into a Pattern
(see java.util.regex.Pattern
,
especially the methods
compile()
and
matcher()
).
Submit MapFrequency.java
.
Submit ReduceFrequency.java
.
Submit Manager.java
.
Searcher
or MapFrequency
threads, each of
them inserts its own Searcher.EOF
into the result LinkedBlockingQueue
shared between all these threads. Hence, the corresponding Printer
or
ReduceFrequency
thread will terminate on seeing the first
Searcher.EOF
. This leaves the possibility of a race condition on the last output.
That is, in some cases, one thread may output Searcher.EOF
before another outputs
the last result string. Fix this race condition by modifying Searcher
and
MapFrequency
so that only the thread that reads the last line from the input
BufferedReader
outputs Searcher.EOF
, all others silently terminate
when the BufferedReader
is empty (that is, when readLine
returns null
or an exception.)MapFrequency
threads above share the same BufferedReader
.
However, the BufferedReader
class has not been optimized for this usage. A more
efficient design would be to implement a new Reader
thread that reads from the
BufferedReader
and outputs each line as a string to a
LinkedBlockingQueue
(which is designed to be used by multiple threads.) Each
MapFrequency
thread would then read from this queue instead of reading directly
from the BufferedReader
. Hence, we would have one Reader
thread,
multiple MapFrequency
threads, and one ReduceFrequency
thread, all
running concurrently.ReduceFrequency
thread. Can you
think of what you would need to change to use more than one ReduceFrequency
thread? For example, the result HashMap
would then need to be safely shared
between these threads.Thread
s than the available processors in your
computer?The early part of this TD uses some concurrent programming patterns that are not safe in
general and may lead to unpredictable behavior. For example, in the first two questions we call
the printStatus
method on a running thread, and this may cause a race condition on
the local variables of the thread (e.g. matchesFound
), when they are read and
updated concurrently. In general, threads should only communicate via safe mechanisms like
join
, interrupt
and exchange messages through thread-safe data
structures such as LinkedBlockingQueue
. In the next TD, we shall learn how to
implement our own concurrent data structures using various synchronization mechanisms.