高并发编程-队列-BlockingQueue-LinkedBlockingQueue
一、LinkedBlockingQueue简介
LinkedBlockingQueue是一个基于链表的阻塞队列,该队列在创建时候,默认大小为Integer.MAX_VALUE,这个数值很大的,所以可以说LinkedBlockingQueue的大小没有限制的,业界有个比较专业的词汇,把它叫做无界队列。但是这也带来了一些问题,比如当JVM内存比较小的时候,可能就会出现OOM的情况。所以建议大家在使用的时候根据实际情况设置一个大小。
LinkedBlockingQueue的内部是单向链表,在链表的内部只能next,也就是说查找元素只能从head查起,从tail插入。
LinkedBlockingQueue的采用两把锁的分离技术,实现了出队和入队的锁分离,也就是说LinkedBlockingQueue的读写是分离的,提高了它得并发处理能力
二、LinkedBlockingQueue的特点
队列特点:无界阻塞队列,可以指定容量,默认为Integer.MAX_VALUE,先进先出 |
数据结构: 链表结构 |
锁特点:读写锁分离,操作各自的node对象。takeLock 取node节点保证了顺序不会乱,putlock存数据时保证了有序 |
条件阻塞:出队时候,如果队列的count=0,证明队列里面没有元素的,notEmpty条件队列会被阻塞。入队时候,如果队列的count=capacity,证明队列已满,这个时候入队线程需要阻塞,notFull需要阻塞。 |
入队:从tail入队。出队:从head出队 |
三、LinkedBlockingQueue的简单实用
//创建一个有界队列,这个需要指定大写 BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(100) //默认指定的Queue为无界队列 BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(); //建议大家在使用的时候指定队列大小,防止出现OOM
四、LinkedBlockingQueue的源码
数据参数
//指定队列容量大小,不指定默认 Integer.MAX_VALUE private final int capacity; //统计内部元素数量,这里使用了CAS的原子性操作 private final AtomicInteger count = new AtomicInteger(); //定义链表的头部,初始化时 head=null transient Node<E> head; //定义链表的尾部 private transient Node<E> last; //定义take poll使用的锁,也就是消费者使用的锁 private final ReentrantLock takeLock = new ReentrantLock(); //等待队列的条件队列,当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒 private final Condition notEmpty = takeLock.newCondition(); //定义 put, offer,使用的锁,也就是生产者使用的锁 private final ReentrantLock putLock = new ReentrantLock(); //等待入队的条件队列,当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒 private final Condition notFull = putLock.newCondition(); //单链表结构 static class Node<E> { E item;//存储元素 Node<E> next;//后续节点,单链表结构 Node(E x) { item = x; } }
构造器
//默认的构造器 public LinkedBlockingQueue() { // 如果没传容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } //指定容量大小的构造器 public LinkedBlockingQueue(int capacity) { //如果指定容量大小小于等于0抛出异常 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //定义初始的head和last节点,初始为空节点 last = head = new Node<E>(null); }
入队put方法
//入队 public void put(E e) throws InterruptedException { //如果入队元素为空,则抛出异常 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; //定义一个新节点 Node<E> node = new Node<E>(e); //加锁操作 count原子性操作 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* *如果元素总数和空间总数相等,生产者阻塞 */ while (count.get() == capacity) { notFull.await(); } //入队 enqueue(node); //统计数据加1,返回原数据 c = count.getAndIncrement(); //如果数据总数小于空间总数,唤醒生产者 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程 signalNotEmpty(); } //直接在尾部入队 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //如果队列中加入元素,唤醒消费者 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
出队take方法
public E take() throws InterruptedException { E x; int c = -1; //获取队列中数据总数 final AtomicInteger count = this.count; 加锁 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果队列中数据为空,阻塞 while (count.get() == 0) { notEmpty.await(); } //获取队列元素 x = dequeue(); //队列总数减一 c = count.getAndDecrement(); if (c > 1) //如果队列总数大于1,唤醒消费线程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //如果取数据前数据总数和空间总数相等,可以唤醒生产者线程 signalNotFull(); return x; } //出队操作 private E dequeue() { //得到头结点 Node<E> h = head; //获取第一个元素 Node<E> first = h.next; //第一个元素变成头结点 h.next = h; // help GC //头结点放入第一个元素 head = first; //拿到第一个元素 E x = first.item; //第一个元素制空 first.item = null; return x; } //唤醒生产者 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { //唤醒生产者 notFull.signal(); } finally { putLock.unlock(); } }
五、LinkedBlockingQueue与ArrayBlockingQueue对比
LinkedBlockingQueue是一个无界的队列,默认大小为Integer.MAX_VALUE,LinkedBlockingQueue也是一个阻塞队列,它有两个锁,一个生产者的putLock和一个消费者的takeLock。它和ArrayBlockingQueue的不同点如下
- 有界和无界之分:ArrayBlockingQueue 在定时时候必须指定队列大小,如果不指定会报错。LinkedBlockingQueue可以指定大小也可以不指定大小,默认为(Integer.MAX_VALUE),LinkedBlockingQueue在生产者速度大于消费者的情况下会有OOM的风险
- 存储结构不同: ArrayBlockingQueue采用的是数组的存储结构,所以在查询上速度较快,删除等操作上速度较慢,但是不会产生额外的对象。LinkedBlockingQueue 采用的是链表的结构,删除操作上速度较快。在高并发下回产生大量的游离对象,GC压力大
- 加锁方式不同:ArrayBlockingQueue只有一个lock锁,入队和出队都使用这个锁,所以并发能力有限。LinkedBlockingQueue采用两个锁,入队和出队各用各的锁,并发处理能力高