Implementing thread pools with Java
Introduction
Mr Doug Lea is a distinguished professor who can claim, other than his many professional success cases, to have contributed to the creation of an entire package of utilities which got into the official Java distribution, from version 1.5 (or 5.0): we're talking about the java.util.concurrent utilities package.
Frustrated (as many others!) by the limitations and lack of flexibility of Java language structures in support of concurrent programming, instead of complaining he just rolled up his sleeves and created an entire library of classes to handle concurrency, which comprises a wide set of utilities like:
- many lock implementations;
- barriers;
- atomic variables;
- multi-threading oriented collections;
- ...
Engineers at Sun admittedly learned the lesson and decided to include them in the standard package, with someslight modifications: Generics (so they found a place to use them), name changes and so on.
I will not cover the whole story, which you can easily read somewhere else, and will try to show you some practical uses of these classes.
A simple thread pool
To implement manually a thread pool is, in the best case, a true nightmare! It's not enough to create a Thread and start it, there's a whole lot of checks, synchronizations, error handling routines to do that, if you're not a professor of mr. Lea's experience, will reveal as buggy and will block your application. Granted.
We'll see a very practical example: an RSS feed aggregator. Here are the specifications:
- class name: FeedAggregator;
- a single (synchoronous) method, aggregateFeeds() which returns a list of strings, i.e. the result of the RSS query;
- capable of processing n threads at once, specified by configuration;
- concurrent calls must not saturate the JVM with threads, instead only n threads can be simultaneously active.
We could directly use java.util.concurrent.ThreadPoolExecutor class, but an easier way exists and it is to use java.util.concurrent.Executors; in fact, method Executors.newFixedThreadPool(int nThreads) is all we need to create such a fixed pool of (at most) n threads.
Since every task should return a result (a string with retrieved data) we must implement java.util.concurrent.Callable instead of java.lang.Runnable. Then, we'll submit our task to ExecutorService.submit() method, which will give us back a java.util.concurrent.Future object. This object will be the final synchronization point, because a call to its get() method will have to wait for the end of task completion. This functionality is similar to what Thread.join() does, but it's not tied to Thread implementation and is way more flexible.
So, here 's the implementation of our RSS feed aggregator:
package it.megadix.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class FeedAggregator {
private ExecutorService executorService;
public FeedAggregator(int threadPoolSize) {
executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public List<String> aggregateFeeds() throws Exception {
List<String> result = new ArrayList<String>();
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < 20; i++) {
futures.add(executorService.submit(new LongRunningTask(i)));
}
for (Future<String> future : futures) {
result.add(future.get());
}
return result;
}
private void destroy() {
executorService.shutdown();
}
public static void main(String[] args) {
final FeedAggregator ag = new FeedAggregator(5);
Thread th1, th2, th3;
try {
// chiama aggregateFeeds() da diversi thread contemporaneamente
Runnable r = new Runnable() {
public void run() {
try {
System.out.println(ag.aggregateFeeds());
} catch (Exception e) {
e.printStackTrace();
}
}
};
th1 = new Thread(r);
th2 = new Thread(r);
th3 = new Thread(r);
th1.start();
th2.start();
th3.start();
th1.join();
th2.join();
th3.join();
} catch (Exception e) {
e.printStackTrace();
}
ag.destroy();
}
}
class LongRunningTask implements Callable<String> {
private int id;
public LongRunningTask(int id) {
this.id = id;
}
public String call() throws Exception {
System.out.println("[" + id + "] Start...");
StringBuffer sb = new StringBuffer();
sb.append("Feed [");
sb.append(id);
sb.append("] Bla bla bla");
try {
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
}
System.out.println("RESULT:\n[" + id + "] ... finish!");
return sb.toString();
}
}
Things to notice:
- FeedAggregator() method has a parameter specifying how many threads the pool will handle;
- long operations are simulated with Thread.sleep();
- concurrent calls don't produce more than the n specified threads;
- code inside main uses old-style thread handling, only for didactical purposes!
- dimitri's blog
- Aggiungi un commento
- 4643 letture
-
- Italian
