疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

并发理解(阻塞队列)


 

一.java中阻塞队列如何设计
关于java中的阻塞队列
队列非常适合于生产者/消费者这种业务场景,像rabbitMq,activeMq都是对queue的一个封装的中间件。
 
java中提供了阻塞队列的实现。
Queue是接口,定义了存取这些基本接口:
public interface Queue<E> extends Collection<E> {
//放入到队列,如果放不进则抛错,成功返回true
boolean add(E e);
 
//放入queue,返回成功或失败 
boolean offer(E e);
 
//取并移除队首元素,没有则抛异常
E remove();
 
//取并移除队首元素,没有则返回null
E poll();
 
//取队首元素,但不移除队首元素,没有则抛异常
E element();
 
//取队首元素,但不移除队首元素,没有则返回null
E peek();
}
 
考虑设计一个阻塞队列,需要处理哪些情况?
 
阻塞队列可以是有界或者无界,一般来讲无界队列比较危险,极端情况会使整台服务器JVM由于内存不断增长从而OutOfMemory。
 
所以下面只考虑如何设计有界阻塞队列。
主要关注两个点:
1.队列满了之后如何处理?
2.如何提高生产者将消息放入队列,消费者从队列取消息的效率?
 
对于队列满了的情况,有以下解决方案:
第一种处理方案是阻塞生产者,然后让消费者消费消息。
但是这样子,会使对外服务的线程完全阻塞(假设消息生产者是对外提供用户服务的线程),然后阻塞的用户线程越多,最后服务器也不能再创建线程,然后所有后面用户的请求会阻塞在tcp连接队列中,当tcp连接队列满了后,最终造成整台机子对外停止响应。
 这种方案可以优化的地方是当队列满了后,由放入消息到队列的生产者线程执行这个任务。这样可以缓解一定的消费压力。
第二种方案是将后面放不进去的消息序列化到磁盘,当然这种方案会牺牲性能。
 
第三种,就是抛异常。
第二种就是返回不成功。
 
java线程池实现ThreadPoolExecutor,已经定义了几种常用的队列满后放不进消息的异常情况。
如AbortPolicy,DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy。
其中AbortPolicy如果队列满了放不进去消息,就直接拒绝并抛异常给生产者线程。
DiscardPolicy则默默的丢弃新提交的消息。而DiscardOldestPolicy则将队列中最旧的消息从队列中取走。
CallerRunsPolicy则是上述提到的第一种方案的优化版,即将不能放入满队列的消息任务处理交由生产者call线程自己去执行。
 
设置java线程池对于队列满放不进消息的处理策略代码如下:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1));
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
 
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
 
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
 
 
基本是这几种方案,实际中最好是提高消费者取消息,处理消息能力,尽量避免队列满了的情况。
 
第二个问题,事实是就是提高生产者,消息者的存取消费。提高的途径就是减少竞争。
当然java提供了SyncronziedQueue,它应用场景适合 一个生产者多个消费者的情况。
SyncronziedQueue中生产者每向queue放入一条消息必须要有消费者从queue中将消息取出,生产者才能够继续放另一条消息。
相当于生产者直接将消息交给消费者,内部没有queue实体队列。
Executors的newCachedThreadPool用的就是SyncronziedQueue,当生产者放入消息的速度大于消费者处理消息速度时,如果此时线程数小于等于线程池设置的最大线程数,线程池尝试创建新的消费线程来处理消息。
 
测试代码如下:
public class SynchronousQueueTest {
 
public static void main(String[] args) throws Exception {
SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
for (int i = 0; i < 5; i++) {
Thread t = new ConsumerThread(queue);
t.start();
}
for (int i = 0; i < 10; i++) {
// queue.put(i);
queue.offer(i);
}
}
 
public static class ConsumerThread extends Thread {
private SynchronousQueue<Object> queue;
 
ConsumerThread(SynchronousQueue<Object> queue) {
this.queue = queue;
}
 
@Override
public void run() {
Object item = null;
try {
while ((item = queue.take()) != null) {
System.out.println("get:" + item.toString()
+ " in thread:"
+ Thread.currentThread().getId());
}
} catch (Exception e) {
//
}
System.out.println("end");
}
}
 
}
 
二.双端队列
java中有一个双端队列Deque,它的特点是在队列首部和尾部都可以进行存取操作。
这样可以设计双端窃取应用,即每个消费线程都一个自己的队列,然后如果当前消费线程处理完自己队列消息后再从其它队列尾队取数据。
这里有个概念,大部分java queue实现都遵循存入消息到队尾,取从队首取。
java concurrent in practice讲如果每个线程从自己队列取,这样可以减少多个消费线程锁竞争情况,这点是可以理解的。
事实上在我们项目中,就会建立很多queue(无论用rabbitmq还是activemq),然后每个queue只会有一个消费线程,如:
 建立queue1,queue2。然后queue1的消费线程是线程1,queue2的消费线程是线程2。
当然这里没有窃取这个概念。也就是说当线程1消费完queue1里的消息后不会从queue2去取消息。
 
java concurrent in practice?还有讲到,当当前消费线程消费完消息后,从其它消费线程拥有队列队尾取消息,说这样可以减少竞争,这点比较费解。
假设以下情况:
线程1拥有队列1,线程2拥有队列2
队列2有消息a和b,即: a<->b,a在队首,b在队尾
规定从队首取消息,从队尾放入消息
 
假设这时线程1处理完自己队列1里的消息后,想从线程2拥有的队列2队尾取出消息b进行处理。
同时刻,线程2往自己队列2中放入消息c。
 
这时候会发生一个竞争,如果这时不加锁控制会发生以下情况:
线程1做取消息b操作,它需要将队列2中在消息b前面的消息a的next指针指向null。
但在线程1执行将消息a的next指针指向null语句前,线程2成功将消息c插入到消息b之后,也就是b的next指针指向了指针。
线程2操作完成后,queue2的消息如下,其中 <->是指双向指针:
a<->b<->c
 
这时线程1接着将消息b的next指针指向null。
这样queue2中的消息变成:
a<->b (b指向null)
 
最终发现线程2将消息c放入队列2队尾的操作丢失。