高并发编程-队列-BlockingQueue-LinkedBlockingQueue


高并发编程-队列-BlockingQueue-LinkedBlockingQueue

一、LinkedBlockingQueue简介

  LinkedBlockingQueue是一个基于链表的阻塞队列,该队列在创建时候,默认大小为Integer.MAX_VALUE,这个数值很大的,所以可以说LinkedBlockingQueue的大小没有限制的,业界有个比较专业的词汇,把它叫做无界队列。但是这也带来了一些问题,比如当JVM内存比较小的时候,可能就会出现OOM的情况。所以建议大家在使用的时候根据实际情况设置一个大小。

  LinkedBlockingQueue的内部是单向链表,在链表的内部只能next,也就是说查找元素只能从head查起,从tail插入。

  LinkedBlockingQueue的采用两把锁的分离技术,实现了出队和入队的锁分离,也就是说LinkedBlockingQueue的读写是分离的,提高了它得并发处理能力

二、LinkedBlockingQueue的特点

队列特点:无界阻塞队列,可以指定容量,默认为Integer.MAX_VALUE,先进先出
数据结构: 链表结构
锁特点:读写锁分离,操作各自的node对象。takeLock 取node节点保证了顺序不会乱,putlock存数据时保证了有序
条件阻塞:出队时候,如果队列的count=0,证明队列里面没有元素的,notEmpty条件队列会被阻塞。入队时候,如果队列的count=capacity,证明队列已满,这个时候入队线程需要阻塞,notFull需要阻塞。
入队:从tail入队。出队:从head出队

三、LinkedBlockingQueue的简单实用

//创建一个有界队列,这个需要指定大写
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(100)
//默认指定的Queue为无界队列
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();

//建议大家在使用的时候指定队列大小,防止出现OOM

四、LinkedBlockingQueue的源码

数据参数

//指定队列容量大小,不指定默认 Integer.MAX_VALUE
private final int capacity;
//统计内部元素数量,这里使用了CAS的原子性操作
private final AtomicInteger count = new AtomicInteger();
//定义链表的头部,初始化时 head=null
transient Node<E> head;
//定义链表的尾部
private transient Node<E> last;
//定义take poll使用的锁,也就是消费者使用的锁
private final ReentrantLock takeLock = new ReentrantLock();
//等待队列的条件队列,当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
//定义 put, offer,使用的锁,也就是生产者使用的锁
private final ReentrantLock putLock = new ReentrantLock();
//等待入队的条件队列,当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();


//单链表结构
 static class Node<E> {
        E item;//存储元素

        Node<E> next;//后续节点,单链表结构

        Node(E x) { item = x; }
    }

构造器

//默认的构造器 
public LinkedBlockingQueue() {
// 如果没传容量,就使用最大int值初始化其容量
        this(Integer.MAX_VALUE);
    }

//指定容量大小的构造器
 public LinkedBlockingQueue(int capacity) {
    //如果指定容量大小小于等于0抛出异常
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
//定义初始的head和last节点,初始为空节点
        last = head = new Node<E>(null);
    }

 入队put方法

//入队
public void put(E e) throws InterruptedException {
        //如果入队元素为空,则抛出异常 
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        //定义一个新节点
        Node<E> node = new Node<E>(e);
        //加锁操作 count原子性操作
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
              *如果元素总数和空间总数相等,生产者阻塞
              */
            while (count.get() == capacity) {
                notFull.await();
            }
           //入队
            enqueue(node);
            //统计数据加1,返回原数据
            c = count.getAndIncrement();
            //如果数据总数小于空间总数,唤醒生产者
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程

            signalNotEmpty();
    }

//直接在尾部入队
 private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
//如果队列中加入元素,唤醒消费者
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

出队take方法

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        //获取队列中数据总数
        final AtomicInteger count = this.count;
        加锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           //如果队列中数据为空,阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
           //获取队列元素
            x = dequeue();
           //队列总数减一
            c = count.getAndDecrement();
            if (c > 1)
            //如果队列总数大于1,唤醒消费线程
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
         //如果取数据前数据总数和空间总数相等,可以唤醒生产者线程
            signalNotFull();
        return x;
    }
//出队操作
  private E dequeue() {
     //得到头结点
        Node<E> h = head;
      //获取第一个元素
        Node<E> first = h.next;
     //第一个元素变成头结点
        h.next = h; // help GC
     //头结点放入第一个元素
        head = first;
        //拿到第一个元素
        E x = first.item;
       //第一个元素制空
        first.item = null;
        return x;
    }

//唤醒生产者
 private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
        //唤醒生产者
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

五、LinkedBlockingQueue与ArrayBlockingQueue对比

     LinkedBlockingQueue是一个无界的队列,默认大小为Integer.MAX_VALUE,LinkedBlockingQueue也是一个阻塞队列,它有两个锁,一个生产者的putLock和一个消费者的takeLock。它和ArrayBlockingQueue的不同点如下

  • 有界和无界之分:ArrayBlockingQueue 在定时时候必须指定队列大小,如果不指定会报错。LinkedBlockingQueue可以指定大小也可以不指定大小,默认为(Integer.MAX_VALUE),LinkedBlockingQueue在生产者速度大于消费者的情况下会有OOM的风险
  • 存储结构不同: ArrayBlockingQueue采用的是数组的存储结构,所以在查询上速度较快,删除等操作上速度较慢,但是不会产生额外的对象。LinkedBlockingQueue 采用的是链表的结构,删除操作上速度较快。在高并发下回产生大量的游离对象,GC压力大
  • 加锁方式不同:ArrayBlockingQueue只有一个lock锁,入队和出队都使用这个锁,所以并发能力有限。LinkedBlockingQueue采用两个锁,入队和出队各用各的锁,并发处理能力高

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM