疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java-BlockingQueue的使用


 

 
最近项目中有个对比的需求,需要从日志文件中获取到参数,然后调用不同的API,进行结果的对比。但是不知用什么方式比较好,于是查了下jdk的手册,发现了BlockingQueue这个好东西。
 
关于BlockingQueue的介绍,大家有兴趣的可以自己看下:
 
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
 
 
 
需求呢其实很简单就是将参数放置到Queue中,然后交由下一个策略去消费。刚开始时是通过不同的线程往队列中存放数据,然后返回给下个服务一个BlockingQueue的对象,下一个策略从队列中消费,code如下:
 
+ View Code
  可是在实际运行时,由于日志比较大,下一个策略可能要等1hour或更长的时间才能开始处理,这明显是不符合要求的,于是又优化了下,将BlockingQueue改为全局static的,然后下一个策略可以直接监控这个队列中是否有值,有值就消费,没值就阻塞线程等待或者超时等其他处理。
 
改进后的code:
 
1、新建一个队列类:
 
public class ParameterQueue extends LinkedBlockingQueue<InputOutputPrameters> {
 
    /**
    *@Fields serialVersionUID:
    */
    private static final long serialVersionUID = 6032356446145302484L;
     
    private static BlockingQueue<InputOutputPrameters> queue = new 
 
LinkedBlockingQueue<InputOutputPrameters>();
 
    /**
     * @Fields log: 日志记录
     */
    private static final Logger log = LoggerFactory
            .getLogger(ParameterQueue.class);
     
    /**
     * 获取队列中的对象
     * @Method:getParameter
     * @Description: 获取队列中的对象
     * @return 获取到的对象信息
    */
    public static InputOutputPrameters getParameter(){
        InputOutputPrameters result = null;
        try {
            result  = (InputOutputPrameters)queue.take();
        } catch (Exception e) {
            log.error("获取队列异常,异常信息:" + e);
        }
        return result;
    }
     
    /**
     * 获取队列的数量
     * @Method:getQueueSize
     * @Description: 获取队列的数量
     * @return 数量
    */
    public static Integer getQueueSize() {
        return queue.size();
    }
     
    /**
     * 放置参数到队列中
     * @Method:putParameter
     * @Description: 放置参数到队列中
     * @param parameter 要放置的对象
    */
    public static void putParameter(InputOutputPrameters parameter) {
        try {
            queue.put(parameter);
        } catch (Exception e) {
            log.error("插入队列异常,异常信息:" + e);
        }
    }
}
  2、读取文件时,直接操作该队列,往队列中put值,下一个策略从该队列中get值,put的code如下:
 
 
public void getSource(String path) {
        try {
            File file = new File(path);
            BufferedReader reader = null;
            String tempStr = null;
            try {
                reader = new BufferedReader(new FileReader(file));
                while ((tempStr = reader.readLine()) != null) {
                    final InputOutputPrameters parameter = new InputOutputPrameters();
                    String[] list = tempStr.split(";");
                    if (list != null && list.length > 0) {
                        parameter.setInputParamters(list[0]);
                        parameter.setOutputParameters(list[1]);
                    }
                    putInQueue(parameter);
                }
                reader.close();
            } catch (FileNotFoundException  e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                 if (reader != null) {
                     try {
                         reader.close();
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
        } catch (Exception e) {
            log.error("系统异常: " + e);
        }
    }
 
    /**
     * 将参数存放至队列中
     * @Method:putInQueue
     * @Description: 将参数存放至队列中
     * @param parameter 要存放的对象
    */
    private void putInQueue(final InputOutputPrameters parameter) {
        new Thread(){
            public void run(){
                try {  
                    Thread.sleep((long)(Math.random()*100));
                    log.info("开始存入数据!");
                    ParameterQueue.putParameter(parameter);
                    log.info("已经存入数据,目前队列中有 " + ParameterQueue.getQueueSize() +" 个
 
队列!输入参数:"+ parameter.getInputParamters() + "; 输出参数:" + 
 
parameter.getOutputParameters());
                } catch (Exception e) {
                    log.error("系统异常:" + e);
                }
            }
        }.start();
    }
  
 
于是这个要求就达到了。记录下这个小需求,方便以后查阅。
 
简要说下,BlockingQueue是线程安全的,常用的是ArrayBlockingQueue、LinkedBlockingQueue
 
ArrayBlockingQueue需要制定容量,而LinkedBlockingQueue不需要
 
同时在消费时,take()是会阻塞线程的,如果是单线程跑时,take()不到时整个线程就卡了
 
所以看具体环境需求,是用take还是其他的,我一般用poll,因为可以制定超时时间。