疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java学习笔记并行API


 

 
使用Thread建立多线程程序,必须亲自处理synchronized,对象锁定,wait方法,notify方法,notifyAll方法等细节。如果需要的是线程池,读写锁等高级操作,从JDK5之后提供了Java.util.concurrent包,可基于其中的API建立更稳固的并行应用程序。
 
对于线程池的总结,在后面我会补上,如今我就先总结一下读写锁。
Lock,ReadWriteLock与Condition
 
synchronized要求线程必须取得对象锁定,才执行所标示的区块范围,然而使用synchronized有许多的限制,未取得锁定的线程会直接被阻断,如果我们希望线程能够尝试取得锁定,无法取得锁定时就先做其他事情,直接使用synchronized就会很麻烦。此时java.util.concurrent.locks包中提供的Lock,ReadWriteLcok,condition接口以及操作类就会派上用场。
使用Lock
 
Lock接口的主要操作类是ReentrantLock,可以达到synchronized的作用,我们来看看怎么使用它改写ArrayList为具线程安全的类。
import java.util.Arrays;
import java.util.concurrent.locks.*;
 
public class ArrayList <E>{
    private Lock lock = new ReentrantLock();       //使用ReentrantLock
    private Object[] elems;
    private int next = 0;
 
    public ArrayList(int capacity){
        elems = new Object[capacity];
    }
 
    public ArrayList(){
        this(16);
    }
 
    public void add(E elem){
        lock.lock();    //进行锁定
        try{
            if(next == elems.length){
                elems = Arrays.copyOf(elems, elems.length* 2);
            }
            elems[next++] = elem;
        }finally {
            {
                lock.unlock();       //解除锁定
            }
        }
    }
 
    public E get(int index){
        lock.lock();
        try {
            return (E) elems[index];
        }finally {
            lock.unlock();
        }
    }
 
    public int size(){
        lock.lock();
        try{
            return next;
        }finally {
            lock.unlock();
        }
    }
}
 
ReentrantLock表示如果已经有线程取得Lock对象锁定,尝试再次锁定同一个Lock对象是可以的。想要锁定Lock对象,可以调用其lock方法,只有取得Lock对象锁定的线程,才可以继续往后执行程序代码,要解除其锁定,可以调用Lock对象的unlock方法。
 
为了避免Lock对象的lock方法后,在后续执行流程中抛出异常而无法解除锁定,一定要在finally中调用Lock对象的unlock方法。
 
Lock接口还定义了tryLock方法,如果线程调用tryLock方法成功的取得了锁定会返回true,若无法取得锁定并不会发生阻断,而是返回false,我们可以使用这个方法来解决一下上篇博客出现的线程“死结”问题。
import java.util.concurrent.locks.*;
 
class Resource{
    private ReentrantLock lock = new ReentrantLock();  //操作ReentrantLock
    private String name;
 
    Resource(String name){
        this.name = name;
    }
 
    void cooperate(Resource res){
        while(true){
            try{
                if(lockMeAnd(res)){      //获得目前与传入的Resource的Lock锁定
                    System.out.printf("%s 整合 %s 的资源 ", this.name, res.name);
                    break;
                }
            }finally {
                unLockMeAnd(res);
            }
        }
    }
 
    private boolean lockMeAnd(Resource res){       //当线程同时获得两个锁定时才可以执行下面的程序
        return this.lock.tryLock() && res.lock().tryLock();
    }
 
    private void unLockMeAnd(Resource res){        //同时解除所有的锁
        if(this.lock.isHeldByCurrentThread()){
            this.lock.unlock();
        }
        if(res.lock.isHeldByCurrentThread()){
            res.lock.unlock();
        }
    }
}
 
public class NoDeadLockDemo {
    public static void main(String[] args){
        Resource res1 = new Resource("Resource1");
        Resource res2 = new Resource("Resource2");
 
        Thread thread1 = new Thread(() -> {
            for(int i = 0; i < 10; i++){
                res1.cooperate(res2);
            }
        });
 
        Thread thread2 = new Thread(() -> {
            for(int i = 0; i < 10; i++){
                res2.cooperate(res1);
            }
        });
 
        thread1.start();
        thread2.start();
    }
}
 
对于前面的死结,由于不能同时取得两个锁定而阻断。既然如此,那么如果不能同时取得两个类的锁定,干脆释放已取得的锁定,就可以解决问题。
 
改写后的cooperate方法会在while循环中,执行lockMeAnd(res),在该方法中使用目前Resource的Lock的tryLock()尝试取得锁定,以及被传入Resource的Lock的tryLock方法尝试取得锁定,只有两次tryLock方法返回值都是true,也就是两个Resource都取得锁定之后,才进行资源的整合并离开while循环,无论哪个tryLock方法成功,都要在finally中调用unLockMeAnd(res),在该方法中测试并解除锁定。
 
使用ReadWriteLock
 
前面设计了线程安全的ArrayList,如果由两个线程都想调用get方法和size方法,由于锁定的关系,其中一个线程只能等待另一个线程解除锁定,无法让两个线程同时调用get方法和size方法。但是这两个方法都只是读取对象状态,并没有变更对象状态,如果只是读取操作,可允许线程同时并行的话,那对读取效率将会有所改善。
 
ReadWriteLock接口定义了读取锁和写入锁行为,可以使用readLock方法,writeLock方法返回Lock操作对象。ReentrantReadWriteLock是ReadWriteLock的主要操作类,readLock方法会返回ReentrantReadWriteLock.ReadLock实例,writeLock方法会返回ReentrantReadWriteLock.WriteLock实例。
 
ReentrantReadWriteLock.ReadLock操作了Lock接口,调用其Lock方法,若没有任何ReentrantReadWriteLock.WriteLock调用过Lock方法,也就是没有任何写入锁定时,就可以获得读取锁定。
 
ReentrantReadWriteLock.WriteLock操作了Lock接口,调用其lock方法时,若没有任何ReentrantReadWriteLock.ReadLock或ReentrantReadWriteLock.WriteLock调用过lock方法,也就是没有任何读取和写入锁定时,就可以获得写入锁定。
这些概念都和Linux中的读锁和写锁是非常相近的,只是调用的API不一样。
 
我们可以使用ReadWriteLock改写前面的ArrayList,改进他的读取效率。
import java.util.Arrays;
import java.util.concurrent.locks.*;
 
public class ArrayList2<E> {
    private ReadWriteLock lock = new ReentrantReadWriteLock();       //使用ReentrantLock
    private Object[] elems;
    private int next = 0;
 
    public ArrayList2(int capacity){
        elems = new Object[capacity];
    }
 
    public ArrayList2(){
        this(16);
    }
 
    public void add(E elem){
        lock.writeLock().lock();    //进行锁定
        try{
            if(next == elems.length){
                elems = Arrays.copyOf(elems, elems.length* 2);
            }
            elems[next++] = elem;
        }finally {
            {
                lock.writeLock().unlock();       //解除锁定
            }
        }
    }
 
    public E get(int index){
        lock.readLock().lock();
        try {
            return (E) elems[index];
        }finally {
            lock.readLock().unlock();
        }
    }
 
    public int size(){
        lock.readLock().lock();
        try{
            return next;
        }finally {
            lock.readLock().unlock();
        }
    }
}
 
对于这个程序我不在赘述。
 
使用StampedLock
 
ReadWriteLock在没有任何读取和写入锁定时,才取得了写入锁定,这是悲观读取。
 
当读取线程很多,写入线程很少的时候,使用ReadWriteLock可能会使线程遭受饥饿问题,也就是写入线程可能迟迟无法竞争到锁定而一直处于等待状态。
 
JDK8新增了StampedLock类,可以支持乐观读取,也就是若读取线程很多,写入线程很少的情况下,你可以乐观的认为,写入与读取同时发生的几率很少,因此不悲观的使用完全的读取锁定,程序可以查看数据读取之后,是否遭到写入线程的变更,在采取后续的措施(重新读取变更后的数据或抛出例外)。
 
我们使用StampedLock类来实现ArrayList:
import java.util.Arrays;
import java.util.concurrent.locks.*;
 
public class ArrayList3<E> {
    private StampedLock lock = new StampedLock();
    private Object[] elems;
    private int next = 0;
 
    public ArrayList3(int capacity){
        elems = new Object[capacity];
    }
 
    public ArrayList3(){
        this(16);
    }
 
    public void add(E elem){
        long stamp = lock.writeLock();    //取得写入锁定
        try{
            if(next == elems.length){
                elems = Arrays.copyOf(elems, elems.length* 2);
            }
            elems[next++] = elem;
        }finally {
            {
                lock.unlockWrite(stamp);       //解除锁定
            }
        }
    }
 
    public E get(int index) {
        long stamp = lock.tryOptimisticRead();     //尝试乐观读取锁定
        Object elem = elems[index];
 
        if (!lock.validate(stamp))       //查询是否有排他的锁
        {
            stamp = lock.readLock();     //真正的读取锁定
            try {
                return (E) elems[index];
            } finally {
                lock.unlockRead(stamp);
            }
        }
 
        return (E) elem;
    }
 
    public int size(){
        long stamp = lock.tryOptimisticRead();
        int size = next;
 
        if (!lock.validate(stamp))       
        {
            stamp = lock.readLock();
            try {
                size = next;
            } finally {
                lock.unlockRead(stamp);
            }
        }
 
        return size;
    }
}
 
使用writeLock方法取得写入锁定会返回一个long整数代表锁定戳记(stamp),可用于解除锁定或转换为其他锁定。
 
lock.tryOptimisticRead()不会真正读取锁定,而是返回锁定戳记,如果有其他排他性锁定的话,戳记会是0,validate方法用来验证戳记是不是被其他排他性锁定取得了,如果是的话就返回false,如果戳记是0的话也返回false。然后if验证如果戳记真的被其他排他性锁定取得,则重新使用readLock方法做真正的读取锁定,并在锁定时更新变量,然后解除锁定。如果if不成立,直接返回变量的值。
 
在validate方法之后发生写入而结果不一致是可能发生的,如果你在意它,应采用完全的锁定。
 
使用Condition
 
Condition接口搭配Lock,最基本的用法就是达到Object的wait(),notify(),notifyAll()方法的作用,在之前的生产者,消费者,店员的例子中,店员对象调用wait方法,会造成无论是消费者还是生产者线程都会至店员对象的等待集合。在多个生产者,消费者的情况下,等待集合中会有消费者,生产者线程,调用notify()时,有可能通知到生产者线程,也可能通知到消费者线程,如果在消费者取走产品后,又通知消费者线程,实际上只是让消费者线程再次执行到wait方法而重复进出等待集合罢了。
 
现在我们有Condition对象,一个Condition对象代表一个等待集合,可以重复调用Lock的newCondition方法,取得多个Condition实例,就代表了有多个等待集合。
 
所以我们如果有两个等待集合:一个消费者集合,一个生产者集合,消费者只通知生产者等待集合,生产者只通知消费者等待集合,就会比较有效率。
 
来看一下实际代码:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class Clerk2 {
    private int product = -1;
    private Lock lock = new ReentrantLock();
    private Condition producerCond = lock.newCondition();
    private Condition consumerCond = lock.newCondition();
 
    public void setProduct(int product) throws InterruptedException{
        lock.lock();
        try{
            waitIfFull();       //看看店员有没有空间收产品,没有的话就稍后
            this.product = product;
            System.out.printf("生产者设定 %d ", this.product);
            consumerCond.signal();         //通知等待中的线程(消费者)
        }finally {
            lock.unlock();
        }
    }
 
    private void waitIfFull() throws InterruptedException{
        while(this.product != -1){         //店员由产品,没有空间
            producerCond.await();
        }
    }
 
    public int getProduct() throws InterruptedException{
        lock.lock();
        try{
            waitIfEmpty();              //看看店员有没有货,没有的话就稍后
            int p = this.product;
            this.product = -1;           //表示货物被取走
            System.out.printf("消费者取走 %d ", p);
            producerCond.signal();                  //通知等待集合中的线程(生产者)
 
            return p;
        }finally {
            lock.unlock();
        }
    }
 
    private void waitIfEmpty() throws InterruptedException{
        while(this.product == -1){
            consumerCond.await();
        }
    }
}