疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java同步工具类FutureTask


 

 
FutureTask也可以用作闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。”执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。当FutureTask进入完成状态后,它会停止在这个状态上。
 
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞知道任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。
 
我们通过一个不断完善的例子来看看FutureTask的魅力
 
在下面的代码清单中,Computable(A, V)接口声明了一个函数Computable,其输入的类型为A,输出类型为V。在ExpensiveFunction中实现的Computablle,需要很长的时间来计算结果,我们将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来 
代码[初始化缓存]:
 
public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}
 
public class ExpensiveFunction implements Computable<String, BigInteger> {
    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        return new BigInteger(arg);
    }
}
 
public class Memoizer1<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new HashMap<A, V>();
    private final Computable<A, V> c;
    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if(result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}
 
我们使用synchronized来确保不会有两个线程同时访问HashMap,但是这也导致了Memoizer1糟糕的并发性。
 
下面我们用ConcurrentHashmap替换HashMap
 
public class Memoizer2<A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
    private final Computable<A, V> c;
    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if(result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}
 
Memoizer2的问题在于可能有多个线程同时计算同一个值的结果[如果这个计算时间很长就会导致这个问题]。
 
现在我们用FutureTask来解决上面的问题
 
public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache = 
            new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer3(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg, ft);
            ft.run(); //在这里将调用c.compute
        }
        try{
            return f.get(); //
        } catch (ExecutionException e) {
 
        }
        return null;
    }
}
 
这个代码仍然存在问题,因为复合操作(“若没有则添加”)是在底层的Map对象上执行的,而这个对象无法通过加锁来保证原子性。我们使用ConcurrenMap中的原子方法putIfAbsent,避免Memoizer3的漏洞。
import jdk.nashorn.internal.codegen.CompilerConstants;
 
import java.util.Map;
import java.util.concurrent.*;
 
/**
 * Created by hms on 2017/4/12.
 */
public class Memoizer3<A, V> implements Computable<A, V> {
    private final Map<A, Future<V>> cache =
            new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    public Memoizer3(Computable<A, V> c) {
        this.c = c;
    }
    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = cache.putIfAbsent(arg, ft);
            if(f==null) {
                f = ft;
                ft.run(); //在这里将调用c.compute
            }
        }
        try{
            return f.get(); //
        }
        catch (CancellationException e) {
            cache.remove(arg, f);
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}