疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java 并行 并行框架实现


 

 
 
背景:
 
最近,总是在想着,对于n个任务,然后用一个for循环,这样子效率比较低下,能不能通过并行(多线程)去提高执行的效率.
 
一开始的想法:通过jdk5新增加的特性中的CountDownLatch(相当于计数器)这个对象来实现,假设总共有n个线程,执行完成一个线程CountDownLatch.countDown一次(减去1),而主线程进行CountDownLatch.await()直到所有的子任务执行完毕(等待直到,计数器为零,然后唤醒).上网搜索发现网上也有人实现了,
 
但是感觉还要创建锁,这样子,还是挺麻烦的.
 
第二种实现方法.
 
首先创建一个拥有nThread个线程的线程池:
 
ExecutorService service = Executors.newFixedThreadPool(nThread);
 
然后调用:
 
List<Future<T>> java.util.concurrent.ExecutorService.invokeAll(Collection<?extends Callable<T>> tasks) throws InterruptedException
 
给这个线程池n个任务,会自动的并行执行,主线程会阻塞直到n个任务执行完毕.这样就轻松的解决了,并行执行任务了.
 
例子:
 
package com.zwy.task;
import java.util.*;
import java.util.concurrent.*;
import static java.util.Arrays.asList;
 
public class TaskTest {   
    static class Sumimplements Callable<Long> {
        private final longfrom;
        private final long to;
            Sum(long from,long to) {
            this.from = from;
            this.to = to;
        }       
        @Override
        public Long call() {
            long acc = 0;
            for (long i =from; i <= to; i++) {
                acc = acc + i;
            }
            return acc;
        }     
    }   
    public static voidmain(String[] args) throws Exception {       
        ExecutorServiceexecutor = Executors.newFixedThreadPool(2);
        List<Future<Long>> results = executor.invokeAll(asList(
            new Sum(0, 10),new Sum(100, 1000), new Sum(10000, 1000000)
        ));
       executor.shutdown();       
        for(Future<Long> result : results) {
           System.out.println(result.get());
        }  
    } 
}
 
第三种方法:
 
使用Jdk1.7中ForkJoinPool包下的并行框架.主要用到了两个思想:分治和work-stealing算法.
 
分治:将问题分成n个小问题,小问题不断的划分,直到问题足够小,能够直接解决,当前问题等到所有的小问题的结果返回,通过计算得出当前问题的结果.
 
伪代码:
 
Result solve(Problem problem) {
 if (problem is small)  //如果问题足够小,直接进行解决.
             directly solve problem
 else {//问题的规模太大了
    split problem into independent parts //把问题分成n个更小的问题
    fork new subtasks to solve each part//为这n个小问题创建n个线程(fork)
    join all subtasks//等待n个小问题的线程执行完毕(join)
    compose result from subresults  //组合n个小问题的执行结果,得出当前问题的结果
 }
}
 
 
work-stealing算法:
 
work-stealing 是一种任务调度方法,由多个工作线程组成,每个工作线程用一个双端队列维护一组任务。Fork的时候是把任务加到队列的头部,而不像一般的线程池那样是加到任务队列末尾。工作线程选择头部最新的任务来执行。当工作线程没有任务可执行时,它会尝试从其它线程的任务队列尾部窃取一个任务执行。如果没有任务执行了并且窃取其它任务失败,那么工作线程停止。
 
这种方法的优点是减少了争用,因为工作线程从头获取任务,而窃取线程从尾部窃取任务。另一个优点是递归的分治法使得早期产生的是较大的任务单元,而窃取到较大任务会进一步递归分解,因此也减少了尾部窃取的次数。另外,父任务很可能要等待子任务(join),所以从队列头部子任务开始执行也是一种优化。
 
总之,它会使用有限的线程执行大量任务,同时保持各线程的任务都处于繁忙的执行状态,而尽量不让线程处于等待状态。为了做到这点可能会从其它线程的任务队列中窃取任务来执行,所以叫work-stealing。
Jdk1.7中的样例代码:
 
简单例子: 计算斐波那契数列(典型的递归问题)
 
package com.zwy.task;
import java.math.BigInteger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class Fibonacci extends RecursiveTask<BigInteger>{
     int n ;
     public Fibonacci() {
     }
     Fibonacci(int i) {
              this.n = i;
     }
       private BigInteger compute(int small) {
             final int[] results = { 1, 1, 2, 3, 5,8, 13, 21, 34, 55, 89 };
             return new BigInteger(results[small]+"");
         }
     @Override
     protected BigIntegercompute() {
              if(n < 10){ //任务足够小,直接查表返回
                        return  compute(n);
              }
              System.out.println(Thread.currentThread().getName()+"");
              Fibonacci f1=  new Fibonacci(n-1);//创建子任务
              Fibonacci f2=  new Fibonacci(n-2);//创建子任务
              //f1.fork();
              f2.fork();//子任务执行
              returnf1.compute().add(f2.join());//两个子任务结果的合并
     }
     public static voidmain(String[] args) {
              /*线程的数量有限,超过一定的数量会报错,内存溢出*/
              ForkJoinPoolforkJoinPool = new ForkJoinPool();
              Fibonaccifibonacci =  new Fibonacci(50);//创建任务
              Future<BigInteger>result = forkJoinPool.submit(fibonacci);
              try {
                        System.out.println(result.get());
              } catch(InterruptedException e) {
                        e.printStackTrace();
              } catch(ExecutionException e) {
                        e.printStackTrace();
              }                
             
     }
 
}
 
 
复杂例子:
 
package com.zwy.task;
 
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/*
 *
 * 计算1^2+2^2+3^2+4^2+5^2+6^2通过二分来分段计算,而二分法的会将数组分成若干段,然后分别进行计算
 * */
public class FibonacciAction extends RecursiveAction{
     final double[] array;
        final int lo, hi;
        double result;
        FibonacciAction next; // keeps track ofright-hand-side tasks
        FibonacciAction(double[] array, int lo, inthi, FibonacciAction next) {
          this.array = array; this.lo = lo; this.hi= hi;
          this.next = next;
        }
      /*计算数组下表从[l,h)平方和*/
        double atLeaf(int l, int h) {
          double sum = 0;
          for (int i = l; i < h; ++i) // performleftmost base step
            sum += array[i] * array[i];
          return sum;
        }
        /*分治(递归过程)*/
        protected void compute() {
          int l = lo;
          int h = hi;
          FibonacciAction right = null;
          /*二分,并且要求SurplusQueuedTaskCount小于3*/
          while (h - l > 1 &&getSurplusQueuedTaskCount() <= 3) {
             int mid = (l + h) >>> 1;
             /*创建一个子任务*/
             right = new FibonacciAction(array, mid,h, right);
             right.fork();
             h = mid;
          }
          double sum = atLeaf(l, h);
          while (right != null) {
                   /*用剩余时间去执行队列中的其他线程任务,提高效率,使得线程处于忙碌状态*/
             if (right.tryUnfork()) // directlycalculate if not stolen
               sum += right.atLeaf(right.lo,right.hi);
            else {
                     /*等待执行结果*/
               right.join();
               sum += right.result;
             }
             right = right.next;
           }
          result = sum;
        }
    
     public static voidmain(String[] args) {
              System.out.println(sumOfSquares(newForkJoinPool(),new double[]{1,2,3,4,5,6}));
     }
     static  double sumOfSquares(ForkJoinPool pool,double[] array) {
                 int n = array.length;
                 FibonacciAction a = newFibonacciAction(array, 0, n, null);
                 pool.invoke(a);
                 return a.result;
               }
}
 
 
两个函数的说明:
 
getSurplusQueuedTaskCount():可以用来决定是否要fork一个新的子任务.
 
返回当前工作线程所持有的任务的估计数量(fork的数量),扣去工作线程可能会窃取的fork的子任务的值.这个窃取的任务的值启发式来决定是否要fork一个其他任务(有助于提高效率吧).在ForkjoinTask的多种用途中,在稳定状态,每个任务应该保持有一个很小的剩余参数(surplus,例如3),如果超过阈值,应该在本任务做处理(意思是不再fork一个新的任务).
 
tryUnfork
 
尝试的将一个新产生的fork任务取消执行. 取消成功返回true,否则为false.
 
4.改进法2.
 
给k个线程的ExecutorService线程池添加了n个任务,这样意味着需要线程上下文的切换n次,众所皆知,线程的上下文的切换是非常消耗时间的~因此产生了一个想法,只给线程池分配k个大任务,每个任务需要循环执行n/k个任务.这样就可以避免大量的线程的上下文的切换说消耗的时间.通过测试,发现这样对于线程没有阻塞时间的的确比直接执行n个线程任务快.但是如果线程有阻塞可能会跟n个线程任务的差不多.