BlockingQueue example:
a background logger thread
A simple use of a BlockingQueue is where we want one or more threads
to pass jobs to some "background" or "processing" thread. The processing thread will sit
waiting for jobs and execute them one at a time. On a server, for example, we might want
to perform "lazy logging":
- we want a busy thread to be able to add a string to the queue of "things to be logged";
- at moments when the server is less busy, a logger thread will then
pick strings off the queue and actually log them to the console (or disk or "place where things
are logged"...).
With a BlockingQueue, the task becomes simple. The logger thread holds a
queue instance, and its run() method affectively sits waiting for things to log
until it is told to shut down. Then from other threads, we call a method to add an item
to the queue. The BlockingQueue implementation will handle thread-safety and
the actual notification to the logger thread when an item is added to the queue.
Creating the thread
Our logger will run in its own thread and so our logger class will be an extension of Thread.
We won't dwell too much on thread creation in Java, which is
covered in a separate section of this web site. However, we will mention that we want our
logger to be a singleton: there'll be a maximum of one static instance of it.
So the outer shell of the class looks as follows:
public class LoggerThread extends Thread {
private static final LoggerThread instance = new LoggerThread();
public static LoggerThread getLogger() {
return instance;
}
private LoggerThread() {
start();
}
}
With this pattern, all accesses to the logger thread must be made via the
getLogger() method. And the first caller to getLogger() will actually
cause the logger thread to be started.
Inside the LoggerThread constructor, we could consider setting options such
as the thread name and thread priority though, as discussed in the latter
article, thread priority actually means different things on different systems.
Constructing the queue
First, we need to create our queue.
There are two flavours of "normal" blocking queue: a LinkedBlockingQueue, which uses a
linked list as its internal structure and thus has no fixed capacity,
and an ArrayBlockingQueue, which uses a fixed array
and thus has a maximum capacity. In this case, we go for a fixed capacity, but that's just
an arbitrary design decision. We'll create a queue with space for up to 100 strings:
public class LoggerThread extends Thread {
private BlockingQueue<String> itemsToLog =
new ArrayBlockingQueue<String>(100);
...
}
In this case, if the queue filled up
quicker than the logger thread could process the strings, then further threads trying to add strings to the
queue would either hang until the logger caught up, or just drop the surplus strings. Which
behaviour is adopted depends on which method we decide to call when adding a string to the queue, and is
thus a design decision we have to make. Arguably, ArrayBlockingQueue is also slightly
more efficient in terms of object overhead, though that's not such a great concern here: we won't be
logging so many things per second!
Notice that the queue, like much of the collections framework, is parametrised:
thanks to Java 5 generics, we can declare it as a queue of Strings and from
then on avoid ugly casts.
Notice that, like blocking methods in Java in general, the take() method could be
interrupted. If that ever happened, we just let
the interruption cause the thread's run method to terminate.
Pulling items off the queue
Now we need a run() method with a loop that continually takes the next item off the queue
and logs it. We need to solve two main issues:
- when there's no item on the queue, we need wo wait for one to appear;
- we need a mechanism for shutting down the logger.
First, the problem of waiting. Recall that in a non-blocking queue,
the possibilities would have been remove() or poll(). But these methods return immediately (either
with an item if there's one on the queue, or else a null return value or exception if the queue is empty).
To get round this, the BlockingQueue provides a take() method.
This method will wait for an item to appear if there is none on the queue. It then
returns the item at the head of the queue.
At some point, we assume that the logger will be shut down "cleanly" in response to the user quitting the
application. At that moment, we want the logger to log all pending strings on the queue before shutting down.
One way to handle this is to post a special object to the queue that is a signal for the
logger thread to shut down. Our run() method then looks as follows:
public class LoggerThread extends Thread {
private static final String SHUTDOWN_REQ = "SHUTDOWN";
private volatile boolean shuttingDown, loggerTerminated;
...
// Sit in a loop, pulling strings off the queue and logging
public void run() {
try {
String item;
while ((item = itemsToLog.take()) != SHUTDOWN_REQ) {
System.out.println(item);
}
} catch (InterruptedException iex) {
} finally {
loggerTerminated = true;
}
}
}
So in each iteration of the while loop, we wait for and take the next item on the queue.
If that's the special "shutdown request" string, then we exit the while loop and let the
run() method exit (thus terminating the logger thread). Otherwise, we print the string.
When the logger eventually does terminate, we set a flag to say so, which we'll come back to in a moment.
(See the separate section for information on why this flag is declared as a volatile variable.)
Adding items to the queue
Now we need to write our log() method, which we can call from any thread to add
a string to the queue for subsequent logging.
The method we use on the blocking queue for this is put(). This adds an item immediately to the queue
if it has space, else waits for space to become available:
public void log(String str) {
if (shuttingDown || loggerTerminated) return;
try {
itemsToLog.put(str);
} catch (InterruptedException iex) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unexpected interruption");
}
}
When we add an item to the queue, the logic of BlockingQueue will automatically
handle "waking up" the logger thread (or any thread waiting for a job on the queue). This means
that some time in the future, when the logger thread is scheduled
in, it will pick up the item that we added to the queue (and if already running, say, on
another processor, it is likely to pick it up immediately).
Again, because it's a blocking operation, we must handle the possibility of put()
being interrupted. We could declare our log() method to throw InterruptionException.
But requiring every log operation to handle this exception makes for a slightly messy API for what
will be an unlikely scenario in practice. So we just re-cast the exception as a RuntimeException
(which we don't have to explicitly declare and which the caller doesn't have to explicitly catch),
remembering to re-mark
the current thread as interrupted, in case the handler that eventually catches the
RuntimeException wants to know this. (For more details on why this is good practice, see the section on
thread interruption.)
Notice the first line of the method, where we check if the logger has shut down:
if it has, then nothing is going to pull jobs off the queue, and the put() method would
block forever if the queue was empty.
Shutting down the queue
We've really already dealt with this issue: we saw that our run() method
waits for a special "shutdown" notification to be posted to the queue. So we just need our shutdown
method to post this object to the queue:
public void shutDown() throws InterruptedException {
shuttingDown = true;
itemsToLog.put(SHUTDOWN_REQ);
}
We also set a "shutting down" flag, checked from the log() method,
so that more messages won't be posted to the queue from this moment onwards.
In the design we've chosen, we assume that there'll be a well-defined "shutdown routine" in our
application which will call the logger's shutDown() method, presumably towards the end
of the shutdown process. Other things we could think about in a more sophisticated implementation
include:
- what happens if a string is posted to the queue after a shutdown request
has been issued? (in our implementation, such strings will effectively be ignored)
- what if the application is shut down at various arbitrary moments?: we could consider
using a shutdown hook, and/or making the logger thread a daemon thread.
If you enjoy this Java programming article, please share with friends and colleagues. Follow the author on Twitter for the latest news and rants.
Editorial page content written by Neil Coffey. Copyright © Javamex UK 2021. All rights reserved.