疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

callable和CompletionService接口试用


 

 
CompletionService接口定义为Interface CompletionService<V>接口定它在java7中只有一个实现ExecutorCompletionService,这个接口内部集成了一个BlockingQueue,因此可以实现对多线程运行结果的收集工作。为了更好的测试该接口,我使用了两个测试,第一个测试是自己定义一个外部BlockingQueue来接收callable返回的数据。第二个测试是用CompletionService对executor进行装饰,使得返回的CompletionService对象能直接submit任务。
 
但是我发现它submit的后并没有马上调用executor的submit,而是对它进行了封装,因此出现了一点点延迟。如果在submit之后使用shutdown()命令结束的话,实际上该task可能还没有 放到executor的taskpool中。所以这一点值得注意。
 
复制代码
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
 
 
public class testCallable {
    public static void main(String[] args) {
        try {
            futureCount();
            completionServiceCount();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    /**
     * 使用自定义阻塞队列得到任务执行结果
     * 
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static void futureCount() throws InterruptedException,
            ExecutionException {
        BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>();
        ExecutorService executorService = Executors.newCachedThreadPool();
        int threadNum = 5;
        for (int i = 0; i < threadNum; i++) {
            Future<Integer> future = executorService.submit(getTask());
            queue.put(future);
        }
        int sum = 0;
        int temp = 0;
        while(!queue.isEmpty()){
            temp = queue.take().get();
            sum += temp;
            System.out.print(temp + " ");
        }
        System.out.println("BlockingQueue all is : " + sum);
        executorService.shutdown();
    }
 
    /**
     * 使用completionService收集callable结果
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void completionServiceCount() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                executorService);
        int threadNum = 5;
        for (int i = 0; i < threadNum; i++) {
            completionService.submit(getTask());
        }
        int sum = 0;
        int temp = 0;
        for(int i=0;i<threadNum;i++){
            temp = completionService.take().get();
            sum += temp;
            System.out.print(temp + " ");
        }
        System.out.println("CompletionService all is : " + sum);
        executorService.shutdown();
    }
 
    public static Callable<Integer> getTask() {
        final Random rand = new Random();
        Callable<Integer> task = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int num = 0;
                for (int i = 0; i < 10; i++) {
                    num = num + rand.nextInt(10);
                }
                return num;
            }
        };
        return task;
    }
}