疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java并发编程 之 Queue的一些总结


 

 
什么是队列
队列是一种特殊的线性表,它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。
 
在队列这种数据结构中,最先插入的元素将是最先被删除的元素;反之最后插入的元素将是最后被删除的元素,因此队列又称为“先进先出”(FIFO—first in first out)的线性表。
 
在java5中新增加了java.util.Queue接口,用以支持队列的常见操作。该接口扩展了java.util.Collection接口。
 
Queue使用时要尽量避免Collection的add()和remove()方法,而是要使用offer()来加入元素,使用poll()来获取并移出元素。它们的优点是通过返回值可以判断成功与否,add()和remove()方法在失败的时候会抛出异常。 如果要使用前端而不移出该元素,使用element()或者peek()方法。
 
常用的Queue实现类
非线程安全的
 
LinkedList
 
值得注意的是LinkedList类实现了Queue接口,因此我们可以把LinkedList当成Queue来用。
 
线程安全的
 
非阻塞队列
 
ConcurrentLinkedQueue
 
复制代码
public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
 
                if (item != null && p.casItem(item, null)) { // CAS算法
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
复制代码
ConcurrentLinkedQueue使用CAS算法保证线程安全,但它不支持阻塞。
 
阻塞队列
 
ArrayBlockingQueue
 
源码片段:
 
复制代码
public E poll() {
        final ReentrantLock lock = this.lock;// 这个类中共用同一个锁
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }
复制代码
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象。 
队列初始化时需要指定数组大小。
 
LinkedBlockingQueue
 
代码片段:
复制代码
 public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;// 写锁
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
 
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;// 读锁
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
复制代码
基于链表的阻塞队列,因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 
但值得注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
 
ArrayBlockingQueue与LinkedBlockingQueue的不同之处: 
1、锁的实现不同 
ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁,由此也意味着两者无法真正并行运行。 
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock。 
2、内部元素操作不同 
ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的,即不会产生任何额外的对象实例。 
LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移除,这在长时间内需要高效并发地处理大批量数据的系统中,对GC和性能会有一定影响。 
3、 队列初始化方式不同 
ArrayBlockingQueue实现的队列中必须指定队列的大小。 
LinkedBlockingQueue实现的队列中可以不指定队列的大小,默认是Integer.MAX_VALUE。
 
PriorityBlockingQueue
 
代码片段:
 
复制代码
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);// 支持优先级排序
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
复制代码
PriorityBlockingQueue里面存储的对象必须是实现Comparable接口。队列通过这个接口的compare方法确定对象的priority。 
规则是:当前和其他对象比较,如果compare方法返回负数,那么在队列里面的优先级就比较高。 
值得注意的是: 
如果将PriorityBlockingQueue队列中的全部元素循环打印出来,你会发现里面的元素并不是全部按优先级排序的,但是队列头部元素的优先级肯定是最高的。
 
复制代码
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
        Task task1 = new Task();
        task1.setId(2);
 
        Task task2 = new Task();
        task2.setId(5);
 
        Task task3 = new Task();
        task3.setId(9);
 
        Task task4 = new Task();
        task4.setId(1);
 
        Task task5 = new Task();
        task5.setId(13);
 
        Task task6 = new Task();
        task6.setId(10);
 
        Task task7 = new Task();
        task7.setId(30);
 
        queue.add(task1);
        queue.add(task2);
        queue.add(task3);
        queue.add(task4);
        queue.add(task5);
        queue.add(task6);
        queue.add(task7);
 
        for (Iterator iterator = queue.iterator(); iterator.hasNext();) {
            Task task = (Task) iterator.next();
            System.out.println(task);
        }
复制代码
结果:
 
复制代码
Task [id=30]
Task [id=9]
Task [id=13]
Task [id=1]
Task [id=2]
Task [id=5]
Task [id=10]
复制代码
取完队列头时候,后面的剩余的元素不是排序的,岂不是不符合要求了。阅读源码可以发现每取一个头元素时候,都会对剩余的元素做一次调整,这样就能保证每次队列头的元素都是优先级最高的元素。