概述
基于Thread类和Runnable接口编程很容易实现基本的并行编程任务,但实现复杂的并行程序就会比较困难,因为要详细考虑资源的同步访问以及设计必要数据结构支持同步访问。从Java5后,Java平台提供了java.util.concurrent包以及HighLevelAPI简化并行编程模型,并提供了很多支持同步访问数据结构满足编程需要。具体来讲,该HighLevelAPI提供了以下内容,在后续段落我将分别介绍。
Lock(锁对象):相对与Thread模型的隐式的锁对象,Lock提供了显式的锁操作从而简化应用程序。
Executors:提供了一组HighLevelAPI用来执行和管理并行任务。
ConcurrentCollections(并行集合):包含了一组支持并行处理的数据结构,大大简化了并行编程难度。
AtomicVariables(原子变量):减少了同步操作并且避免数据不一致问题。
Fork/Join框架:提供了进程操作的支持。
Lock(锁对象)
Lock在java.util.concurrent.locks包里面,是一个用来同步对共享资源的访问的工具,其实际效果和Synchronize 比较像。Synchronize用起来很容易,但是Synchroize也有一些限制:当Synchronize以一定顺序获得多个锁的时候,必须以相反的顺序释放锁,并且必须在与获得锁相同的代码scope中释放锁。Lock则完全克服了上述限制:能够以任意的顺序获得和释放所,而且不需要收到代码 scope的限制。使用Lock也很简单,通常会使用以下代码使用Lock:
方式一:
Lockl=…;
l.lock();
try{
//访问共享资源
}finally{
l.unlock()
}
方式二:
Lockl=…;
if(l.tryLock()){
try{
//访问共享资源
finally{
l.unlock();
}
else{
//没能够获得锁,做其它事情
}
ConcurrentCollections(并行集合):
java.util.concurrent包还提供了一系列的数据结构,使得并行程序的开发人员可以更少地关注同步操作。
BlockingQueue是一个支持先入先出(First-in-first-out,FIFO)的数据结构,对于队列的push和poll等操作都采取阻塞的方式保证只有在获得资源锁的时候才可以执行。
ConcurrentMap是一个跟Map类似的存储key/value映射的数据结构,该数据结构保证了Map操作的原子性。
ConcurrentNavigableMap是一个跟TreeMap类似的数据结构,允许基于key进行树状遍历。
下面给出BlockingQueue的基本用法:
BlockingQueuequeue=newSynchronousQueue();
queue.put(“helloworld”);
queue.poll();//返回helloworld
AtomicVariables(原子变量):
java.util.concurrent.atomic包提供了一些数据结构用来支持对变量的原子操作,这些数据结构都必须支持get和set方法从而支持对变量的读写。下面是一个例子来证明如何使用AtomicInteger的。
importjava.util.concurrent.atomic.AtomicInteger;
classAtomicCounter{
privateAtomicIntegerc=newAtomicInteger(0);
publicvoidincrement(){
c.incrementAndGet();
}
publicvoiddecrement(){
c.decrementAndGet();
}
publicintvalue(){
returnc.get();
}
}
Fork/Join框架
Fork/Join是自JDK7以后才有的对ExecutorService接口的一个实现,用来支持在多核处理器上进行多进程操作。该框架适合于那些能够被递归地分解成更小任务的任务。该框架最牛的地方在于采用了work-stealing算法,也就是空闲的worker可以从繁忙的worker那里偷(steal)任务过来执行,从而充分利用计算资源使得任务执行更快。
Fork/Join的核心是ForkJoinPool这个类,它是AbstractExecutorService的子类并实现了work-stealing算法,其被用来执行ForkJoinTask.
ForkJoinTask实现类有RecursiveTask核ResursiveAction,通常实现的逻辑为:
if(当前任务足够小)
直接执行任务
else
将当前任务分为更小的两个任务
执行这两个更小的任务并等待结果
下面的代码示意对Fork/Join框架的使用:
public class MyListPrinter extends RecursiveAction {
private List task;
public MyListPrinter(List list)
task = list;
}
protected void printList() {
// 遍历并打印task中的元素
}
protected void compute() {
if (task.length() <= 1000) {
printList();
} else{
List first_half_list = ……
List second_half_list = ……
MyListPrinter printer1 = new MyListPrinter(first_half_list);
MyListPrinter printer2 = new MyListPrinter(second_half_list);
invokeAll(printer1, printer2);
}
}
}
MyListPrinterprinter=newMyListPrinter(aHugeList);
ForkJoinPoolpool=newForkJoinPool();
pool.invoke(printer);