疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java基础并发实用工具


 

 
1.Fork/Join框架简介
其实不难发现,我们之前用的所有并发编程实际上都真的是『并发』编程,而不是并行编程。先介绍一下并发和并行的区别:并发:在一段时间内,几个任务同时执行;并行:在一个时间点上,几个任务同时执行。由定义可知,如果CPU是单核心的,想并行是不可能的,这也是Java多线程编程的一般执行方式:并发而非并行,而现在处理器已经步入多核时代,如果还只是之前的Java多线程执行方式,是不能够很好的利用CPU的多核心进行真正的并行编程,那Fork/Join框架就是用来充分利用多核处理器的。
Fork/Join框架是在JDK7之后加入的,其自动使用多处理器进行程序执行,然而丑话说在前面,这个框架虽然是真正利用了多核心处理器的优势,但是使用场景是很局限的,这里先打一个预防针。另外,虽然称其为框架,其实就是一堆API,而且这堆API的背后核心思想是:分而治之(devide and conquer)
2.Fork/Join框架API分析
ForkJoinTask<V>:用来定义多线程任务的抽象类
ForkJoinPool:用于管理ForkJoinTask的执行
RecursiveTask<V>:抽象类,是ForkJoinTask<V>的子类,用于定义有返回值的多线程任务
RecursiveAction:抽象类,是ForkJoinTask<V>的子类,用于定义无返回值的多线程任务
大致就这四个类,对,你没看错,Recursive的中文翻译是递归。
ForkJoinTask<V>
对于这个抽象类的使用,一般来说就是调用以下其已经定义好的几个方法。而不是自己写一个子类用来扩展这个抽象类,这个类用来定义多线程任务,但是,实际中并不是扩展这个类用来进行多线程任务定义的,理由一会儿说。先说一下它的几个重要方法:
①final ForkJoinTask<V> fork()/②final V join()/③final V invoke()/④static void invokeAll(ForkJoinTask<?>...taskList)
①:调用这个方法会使定义的多线程任务以异步的方式进行执行,有点类似Thread类的start方法,就是让线程启动的,和start内部调用了run方法中的内容一样,fork只是一个开关,具体的内容并不在这个方法中定义,你也没法定义了,人家是final的,具体的任务是定义在compute方法中的,然而这个类并没有这个方法,他的两个子类有,这也就是为什么在实际开发中并不是扩展这个类定义线程任务而是扩展他的两个子类。
②:等待线程结束,和Thread类的Join一样,只是,这个等待结束后,可以有返回值,只是可以有,也可以没有,取决于你定义多线程任务的时候用的是那两个子类中哪一个
③:相当于把①和②揉和到一起了,调用这个会启动线程并等待
④:开启子线程任务,相当于调用传入Task的invoke方法,是实现递归和分而治之的关键方法,用法在代码中有体现
对于上面的这几个方法,在JDK8之前,必须是在一个ForkJoinPool的内部调用的,而不能在外部调用(不懂这一点没关系,先跳过去,看到代码和后面对ForkJoinPool的介绍后会豁然开朗的,我不是为了教别人什么而写的,这些博文都是我的笔记,只是怕看到的同学看不动,所以说一下),而在JDK8之后,新增的公共池放松了这样的要求。
RecursiveAction和RecursiveTask<V>
这两个抽象类是ForkJoinTask的子类,这两个抽象类都有一个compute抽象方法得实现,用于定义多线程任务。Action结尾定义的任务没有返回值(protected abstract void compute()),Task结尾定义的任务有返回值(protected abstract V compute())
ForkJoinPool
类似执行器(Executor),并行任务的执行都是在这个ForkJoinPool中执行的。获取这个pool有两种方式,一种是自己new一个,构造方法为:ForkJoinPool()/ForkJoinPool(int pLevel),参数指定了并行级别,也就是要利用的处理器核心数,不指定使用默认的数量:系统中可用的处理的数量;指定的话,大于0小于实际数量。重申一下,这里的并行级别是能利用的处理器的数量,而不是能并行处理的任务的多少。另外一种是使用公共池,调用ForkJoinPool的静态方法commonPool()返回一个ForkJoinPool对象(公共池)。
这个pool的方法和Executor差不多,反正都是用来启动线程任务的执行的。这个pool有两个方法用来干这个事儿:<T> T invoke(ForkJoinTask<T> task)、void execute(ForkJoinTask<?> task),invoke方法是以同步的方式运行,就是说开启任务执行的线程开启任务之后等待这个任务执行完成后再往下走,而execute是异步执行的方式,开启后,开启任务的线程就接着往下走了,不等任务有没有执行完。
3.分而治之策略
OK,就如你想到的一样,使用Fork/Join框架的基本原则就是使用基于递归的分而治之的策略。
主要的思路是这样的:在compute方法中进行问题的拆分和递归。譬如对一个数组中的所有元素乘2,你可以遍历一下对每个元素乘以2,但是你也可以充分利用多核处理器进行并行编程,对数组进行划分,直到这样一个临界点:再开启线程的消耗要高于开始线性处理的消耗。什么意思呢?譬如有一个100000个元素的数组,进行上面的操作,一直二分二分(第一次分为5000、5000,第二次分为2500、2500、2500、2500),恰好到了2500的时候,处理器线性处理一下这2500个元素的消耗要低于再开启一个任务所需的消耗(在并发编程基础中提到过,线程的创建及上下文切换也是一笔不小的消耗哦),就不再分了,开始处理。
OK,使用Fork/Join框架的基本思想就是这样,对一个任务进行细分,直到一个界点,停止细分,开始利用多处理器进行处理。到了这里,你应该已经察觉出来了这个框架的局限性了,关于局限性,一会儿谈,现在先看看怎么利用分而治之策略使用ForkJoin框架进行并行编程。
编写无返回值的多线程任务,实例代码如下:
[java] view plain copy 在CODE上查看代码片派生到我的代码片
import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.RecursiveAction;  
  
public class ForkJoinFrameUsingRecusiveAction {  
  
    public static void main(String[] args) {  
        double[] data = new double[100000];  
        for(int i = 0;i<data.length;i++){  
            data[i] = (double)i;  
        }  
        for(int i = 0;i<10;i++){  
            System.out.format("%.1f ",data[i]);  
        }  
        System.out.println();  
        //使用默认的并行级别  
        ForkJoinPool forkJoinPool = new ForkJoinPool();  
        //使用异步的方式,主线程调用之后,接着往下走,这也是为什么会出现下面两种运行结果的原因  
        //出现第一种和第三种结果表示任务已经开始,然而由于是多处理器执行,并不知道什么时候执行完成,而且即使是分到同一块中的内容,也不是按照顺序执行的,看,结果1和结果3前几个元素还没有被运算到  
        //出现第二种结果表示:虽然任务已经设置了要开始执行了,但是主线程并没有等这些任务执行玩,只是设置执行后就往下走了,而要输出运行结果的时候,任务并没有开始执行呢  
        //出现第四种结果是我们想要的,但是试了好久,使用execute(异步)出现这种结果的可能性很小,结果4是我改用invoke(同步)调用后出现的结果:也就是把execute改成invoke  
        forkJoinPool.invoke(new MyRecursiveAction(data, 0, data.length));  
        for(int i = 0;i<10;i++){  
            System.out.format("%.1f ",data[i]);  
        }  
//      运行结果:  
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0   
//      0.0 1.0 2.0 3.0 8.0 10.0 12.0 14.0 16.0 18.0   
//      运行结果:  
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0   
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0   
//      运行结果:  
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0   
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 16.0 18.0   
//      运行结果:  
//      0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0   
//      0.0 2.0 4.0 6.0 8.0 10.0 12.0 14.0 16.0 18.0   
          
    }  
}  
class MyRecursiveAction extends RecursiveAction{  
    private static final long serialVersionUID = 1L;  
    //这个里面是要实际运行的内容,也是在这个里面运用分而治之思想对计算过程进行多处理器并行计算,最常见的就是对集合(数组)的操作  
    double[] data;  
    int start;  
    int end;  
    int threshold = 1000;  
    public MyRecursiveAction(double[] data,int start,int end) {  
        this.data = data;  
        this.start = start;  
        this.end = end;  
    }  
    @Override  
    protected void compute() {  
        //这里的任务是对一个数组中的所有元素乘2  
        if(end-start>threshold){  
            //切分小任务  
            int mid = (start+end)/2;  
            //同fork、invoke一样,invokeAll实际上也是调用了computer方法内的内容  
            invokeAll(new MyRecursiveAction(data, start, mid), new MyRecursiveAction(data, mid, end));  
        }else{  
            //进行操作  
            for(int i = start;i<end;i++){  
                data[i] = data[i]*2;  
            }  
        }  
    }  
      
}  
编写有返回值的多线程任务,实例代码如下:
[java] view plain copy 在CODE上查看代码片派生到我的代码片
import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.RecursiveTask;  
  
public class ForkJoinFrameUsingRecursiveTask {  
  
    public static void main(String[] args) {  
        double[] data = new double[10000];  
        for(int i = 0;i<data.length;i++){  
            data[i] = (double)(i+1);  
        }  
        //这里使用公共池JDK8加的,如果不是JDK8,需升级到8之后才能正常运行  
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();  
        //使用同步的方式,调用之后,等待所有任务完成后再往下走的  
        double d = forkJoinPool.invoke(new MyRecursiveTask(data, 0, data.length));  
        System.out.format("result: %.1f",d);  
        /* 
         * 运行结果:result: 50005000.0 
         */  
    }  
  
}  
class MyRecursiveTask extends RecursiveTask<Double>{  
    private static final long serialVersionUID = 1L;  
    double[] data;  
    int start;  
    int end;  
    int threshold = 1000;  
    public MyRecursiveTask(double[] data,int start,int end) {  
        this.data = data;  
        this.start = start;  
        this.end = end;  
    }  
      
    @Override  
    protected Double compute() {  
        double sum = 0;  
        if(end-start>threshold){  
            //devide and conquer  
            int mid = (end+start)/2;  
            sum = new MyRecursiveTask(data, start, mid).invoke()+new MyRecursiveTask(data, mid, end).invoke();  
        }else{  
            double temp_sum = 0;  
            for(int i = start;i<end;i++){  
                temp_sum+=data[i];  
            }  
            return temp_sum;  
        }  
        return sum;  
    }  
}  
4.Fork/Join框架使用注意事项
1.首先Fork/Join框架的使用远比我上面写的哪些代码复杂的多,这里仅仅是介绍框架的使用,关于深入学习,欢迎参考Java深度学习的相关博文
2.很明显,这个框架中虽然很好的利用了多处理器,但是在API中并没有发现线程间同步和通信的方法,也就是说,这个框架更多的是用来对计算任务的分而治之,充分利用处理器资源进行计算嘛,而不用于有线程间同步和通信的多线程编程
3.你完全可以不在Pool中执行这些任务,可以在外面调用invoke或者fork方法,毕竟pool就只是启动一下任务,后面的递归是自己进行的,你大可自己启动而不用在pool启动,是的,既然你知道调用invoke和Fork方法是开启任务,的确可以在外面进行开启任务,但是前提是你的JDK版本是8之后的,JDK8之后加入了公共池,就是说你不显式的在pool中调用启动任务的invoke或者fork方法,你并不是没有用pool,而是JVM帮你放到了公共池中了。是不是又疑问了?干嘛非得放到池子中,在外面执行不是挺好的吗。一句话:池子内能让你的递归充分利用多处理器,而池子外的,还是一个处理器核心哦。
4.池子的关闭:公共池关闭不了。自己new的池子可以调用shutdown方法关闭,调用了这个方法后,会等已经开启的任务执行完,不再开启新的任务。如果想立即关闭,可以使用shutdownNow方法