疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java中的Fork/Join框架


 

 
阅读目录
 
什么是fork/join框架
基本使用方法
例子-图像模糊
 JDK中使用fork/join的例子
看了下Java Tutorials中的fork/join章节,整理下。
 
什么是fork/join框架
 
  fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。
 
  fork/join框架与其它ExecutorService的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。
 
  fork/join框架的核心是ForkJoinPool类,该类继承了AbstractExecutorService类。ForkJoinPool实现了工作窃取算法并且能够执行 ForkJoinTask任务。
 
基本使用方法
 
  在使用fork/join框架之前,我们需要先对任务进行分割,任务分割代码应该跟下面的伪代码类似:
 
if (任务足够小){
  直接执行该任务;
}else{
  将任务一分为二;
  执行这两个任务并等待结果;
}
  首先,我们会在ForkJoinTask的子类中封装以上代码,不过一般我们会使用更加具体的ForkJoinTask类型,如 RecursiveTask(可以返回一个结果)或RecursiveAction。
 
  当写好ForkJoinTask的子类后,创建该对象,该对象代表了所有需要完成的任务;然后将这个任务对象传给ForkJoinPool实例的invoke()去执行即可。
 
例子-图像模糊
 
  为了更加直观的理解fork/join框架是如何工作的,可以看一下下面这个例子。假定我们有一个图像模糊的任务需要完成,原始图像数据可以用一个整型数组表示,每一个整型元素包含了一个像素点的颜色值(RBG,存放在整型元素的不同位中)。目标图像同样是由一个整型数组构成,每个整型元素包含RBG颜色信息;
 
  执行模糊操作需要遍历原始图像整型数组的每个元素,并对其周围的像素点做均值操作(RGB均值),然后将结果存放到目标数组中。由于图像是一个大数组,这个处理操作会花费一定的时间。但是有了fork/join框架,我们可以充分利用多核处理器进行并行计算。如下是一个可能的代码实现(图像做水平方向的模糊操作):
 
Tips:该例子仅仅是阐述fork/join框架的使用,并不推荐使用该方法做图像模糊,图像边缘处理也没做判断
复制代码
public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.
 
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }
 
    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }
 
            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
...
复制代码
  现在,我们开始编写compute()的实现方法,该方法分成两部分:直接执行模糊操作和任务的划分;一个数组长度阈值sThreshold可以帮助我们决定任务是直接执行还是进行划分;
 
复制代码
    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
 
        int split = mLength / 2;
 
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split, 
                mDestination));
    }
复制代码
接下来按如下步骤即可完成图像模糊任务啦:
 
1、创建图像模糊任务
 
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
2、创建ForkJoinPool
 
ForkJoinPool pool = new ForkJoinPool();
3、执行图像模糊任务
 
pool.invoke(fb);
完整代码如下:
 
 
复制代码
/*
* Copyright (c) 2010, 2013, Oracle and/or its affiliates. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
*   - Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*
*   - Redistributions in binary form must reproduce the above copyright
*     notice, this list of conditions and the following disclaimer in the
*     documentation and/or other materials provided with the distribution.
*
*   - Neither the name of Oracle or the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
 
import java.awt.image.BufferedImage;
import java.io.File;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import javax.imageio.ImageIO;
 
/**
 * ForkBlur implements a simple horizontal image blur. It averages pixels in the
 * source array and writes them to a destination array. The sThreshold value
 * determines whether the blurring will be performed directly or split into two
 * tasks.
 *
 * This is not the recommended way to blur images; it is only intended to
 * illustrate the use of the Fork/Join framework.
 */
public class ForkBlur extends RecursiveAction {
    private static final long serialVersionUID = -8032915917030559798L;
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
    private int mBlurWidth = 15; // Processing window size, should be odd.
 
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }
 
    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
                bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
            }
 
            // Re-assemble destination pixel.
            int dpixel = (0xff000000)
                    | (((int) rt) << 16)
                    | (((int) gt) << 8)
                    | (((int) bt) << 0);
            mDestination[index] = dpixel;
        }
    }
    protected static int sThreshold = 10000;
 
    @Override
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
 
        int split = mLength / 2;
 
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLength - split, 
                mDestination));
    }
 
    // Plumbing follows.
    public static void main(String[] args) throws Exception {
        String srcName = "C:\test6.jpg";
        File srcFile = new File(srcName);
        BufferedImage image = ImageIO.read(srcFile);
         
        System.out.println("Source image: " + srcName);
         
        BufferedImage blurredImage = blur(image);
         
        String dstName = "C:\test6_out.jpg";
        File dstFile = new File(dstName);
        ImageIO.write(blurredImage, "jpg", dstFile);
         
        System.out.println("Output image: " + dstName);
         
    }
 
    public static BufferedImage blur(BufferedImage srcImage) {
        int w = srcImage.getWidth();
        int h = srcImage.getHeight();
 
        int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
        int[] dst = new int[src.length];
 
        System.out.println("Array size is " + src.length);
        System.out.println("Threshold is " + sThreshold);
 
        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");
 
        ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
 
        ForkJoinPool pool = new ForkJoinPool();
 
        long startTime = System.currentTimeMillis();
        pool.invoke(fb);
        long endTime = System.currentTimeMillis();
 
        System.out.println("Image blur took " + (endTime - startTime) + 
                " milliseconds.");
 
        BufferedImage dstImage =
                new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
        dstImage.setRGB(0, 0, w, h, dst, 0, w);
 
        return dstImage;
    }
}
复制代码
测试了一下,执行效果如下:
 
Source image: C: est6.jpg
Array size is 120000
Threshold is 10000
4 processors are available
Image blur took 10 milliseconds.
Output image: C: est6_out.jpg